亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

將 JavaRDD<Status> 轉換為 JavaRDD<String> 的問題

將 JavaRDD<Status> 轉換為 JavaRDD<String> 的問題

人到中年有點甜 2023-06-08 20:51:09
我正在嘗試將推文從 twitter 保存到 MongoDb 數據庫。我有RDD<Status>,我正在嘗試借助 ObjectMapper 將其轉換為 JSON 格式。但是這種轉換存在一些問題(public class Main {    //set system credentials for access to twitter    private static void setTwitterOAuth() {        System.setProperty("twitter4j.oauth.consumerKey", TwitterCredentials.consumerKey);        System.setProperty("twitter4j.oauth.consumerSecret", TwitterCredentials.consumerSecret);        System.setProperty("twitter4j.oauth.accessToken", TwitterCredentials.accessToken);        System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterCredentials.accessTokenSecret);    }    public static void main(String [] args) {        setTwitterOAuth();        SparkConf conf = new SparkConf().setMaster("local[2]")                                        .setAppName("SparkTwitter");        JavaSparkContext sparkContext = new JavaSparkContext(conf);        JavaStreamingContext jssc = new JavaStreamingContext(sparkContext, new Duration(1000));        JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);        //Stream that contains just tweets in english        JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));        enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());        enTweetsDStream.print();        jssc.start();        jssc.awaitTermination();    }    static void saveRawTweetsToMondoDb(JavaRDD<Status> rdd,JavaSparkContext sparkContext) {     try {            ObjectMapper objectMapper = new ObjectMapper();            SQLContext sqlContext = new SQLContext(sparkContext);            JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));            DataFrame dataFrame = sqlContext.read().json(tweet);        } catch (Exception e) {            System.out.println("Error saving to database");        }    }JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));這是一個問題。需要不兼容的類型JavaRDD<String>,但地圖被推斷為javaRDD<R>
查看完整描述

1 回答

?
慕標琳琳

TA貢獻1830條經驗 獲得超9個贊

不幸的是,Java 類型推斷并不總是非常聰明,所以我在這些情況下所做的是提取我的 lambda 的所有位作為變量,直到我找到一個 Java 無法為其提供準確類型的位。然后我給表達式我認為它應該具有的類型,看看為什么 Java 會抱怨它。有時它只是編譯器的一個限制,您必須顯式地將表達式“轉換”為所需的類型,有時您會發現代碼存在問題。在你的情況下,代碼對我來說很好,所以一定有別的東西。

然而,我有一個評論:在這里你支付一次 JSON 序列化(從StatusJSON 字符串)然后反序列化(從 JSON 字符串到Row)的成本。另外,您沒有向您提供任何架構Dataset,因此它必須兩次傳遞數據(或根據您的配置對其進行采樣)以推斷架構。如果數據很大,所有這些都可能非常昂貴。如果性能是一個問題并且相對簡單,我建議您直接編寫從Status到的轉換。RowStatus

另一個“順便說一句”:您正在隱式序列化您的ObjectMapper,很可能您不想這樣做??雌饋碓擃惔_實支持 Java 序列化,但具有特殊的邏輯。由于 Spark 的默認配置是使用 Kryo(其性能比 Java 序列化好得多),我懷疑它在使用默認FieldSerializer.?您有以下三種選擇:

  • 使對象映射器靜態化以避免序列化它

  • 配置您的 Kryo 注冊器以ObjectMapper使用 Java 序列化序列化/反序列化類型的對象。那會起作用,但不值得付出努力。

  • 到處使用 Java 序列化而不是 Kryo。餿主意!它很慢并且占用大量空間(內存和磁盤取決于序列化對象的寫入位置)。


查看完整回答
反對 回復 2023-06-08
  • 1 回答
  • 0 關注
  • 143 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號