亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

數據流中的動態目標問題

數據流中的動態目標問題

繁星點點滴滴 2022-10-12 15:53:18
我有一個 Dataflow 作業,它從 pubsub 讀取數據并根據時間和文件名將內容寫入 GCS,其中文件夾路徑基于 YYYY/MM/DD。這允許根據日期在文件夾中生成文件,并使用 apache beamFileIO和Dynamic Destinations.大約兩周前,我注意到未確認消息的異常堆積。重新啟動 df 作業后,錯誤消失了,新文件正在 GCS 中寫入。幾天后,寫入再次停止,除了這一次,有錯誤聲稱處理被卡住了。經過一些可靠的 SO 研究后,我發現這可能是由于 Beam 2.90 之前的死鎖問題造成的,因為它使用 Conscrypt 庫作為默認安全提供程序。所以,我從 Beam 2.8 升級到 Beam 2.11。再一次,它起作用了,直到它沒有。我更仔細地查看了這個錯誤,并注意到它有一個 SimpleDateFormat 對象的問題,它不是線程安全的。因此,我轉而使用線程安全的 Java.time 和 DateTimeFormatter。它一直有效,直到它沒有。但是,這一次,錯誤略有不同,并沒有指向我的代碼中的任何內容:錯誤如下所示。Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process  at sun.misc.Unsafe.park(Native Method)  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)  at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)  at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)此錯誤在作業部署后大約 5 小時開始發生,并且隨著時間的推移而增加。寫作在 24 小時內顯著減慢。我有 60 名工人,我懷疑每次出現錯誤時都會有一名工人失敗,這最終會殺死工作。
查看完整描述

2 回答

?
慕田峪4524236

TA貢獻1875條經驗 獲得超5個贊

錯誤“處理卡住...”表示某些特定操作花費的時間超過 5m,而不是作業永久卡住。但是,由于步驟 FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles 是卡住并且作業被取消/終止的步驟,所以我會在作業寫入臨時文件時考慮一個問題。

我發現了與用于寫入臨時文件的第二粒度時間戳 (yyyy-MM-dd_HH-mm-ss) 相關的BEAM-7689問題。發生這種情況是因為多個并發作業可以共享同一個臨時目錄,這可能導致其中一個作業在其他作業完成之前將其刪除。

根據之前的鏈接,為緩解此問題,請升級到 SDK 2.14。并讓我們知道錯誤是否消失。


查看完整回答
反對 回復 2022-10-12
?
飲歌長嘯

TA貢獻1951條經驗 獲得超3個贊

自從發布這個問題以來,我已經優化了數據流作業以避開瓶頸并增加并行化。就像 rsantiago 解釋的那樣,處理卡住不是錯誤,而只是數據流傳達的一種方式,即一個步驟比其他步驟花費的時間要長得多,這本質上是一個瓶頸,無法用給定的資源清除。我所做的更改似乎已經解決了這些問題。新代碼如下:


public void streamData() {


        try {

            Pipeline pipeline = Pipeline.create(options);


            pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))

            .apply(options.getWindowDuration() + " Window",

                    Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))

                          .triggering(AfterWatermark.pastEndOfWindow()) 

                          .discardingFiredPanes()

                          .withAllowedLateness(parseDuration("24h")))

            .apply(FileIO.<String,PubsubMessage>writeDynamic()

                    .by(new datePartition(options.getOutputFilenamePrefix()))

                    .via(Contextful.fn(

                            (SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),

                            TextIO.sink())

                    .withDestinationCoder(StringUtf8Coder.of())

                    .to(options.getOutputDirectory())

                    .withNaming(type -> new CrowdStrikeFileNaming(type))

                    .withNumShards(options.getNumShards())

                    .withTempDirectory(options.getTempLocation()));


            pipeline.run();

        }


        catch(Exception e) {


            LOG.error("Unable to deploy pipeline");

            LOG.error(e.toString(), e);

        }


    }

最大的變化涉及刪除 extractMsg() 函數并將分區更改為僅使用元數據。這兩個步驟都強制對消息進行反序列化/重新序列化并嚴重影響性能。


此外,由于我的數據集是無限的,我必須設置一個非零數量的分片。我想簡化我的文件命名策略,所以我將它設置為 1 卻不知道它對性能的影響有多大。從那時起,我為我的工作找到了工人/碎片/機器類型的良好平衡(不幸的是,主要基于猜測和檢查)。


盡管在足夠大的數據負載下仍然可能觀察到瓶頸,但盡管負載很重(每天 3-5 tb),管道仍然表現良好。這些更改還顯著改善了自動縮放,但我不知道為什么。數據流作業現在對峰值和谷值的反應要快得多。


查看完整回答
反對 回復 2022-10-12
  • 2 回答
  • 0 關注
  • 118 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號