2 回答

TA貢獻1875條經驗 獲得超5個贊
錯誤“處理卡住...”表示某些特定操作花費的時間超過 5m,而不是作業永久卡住。但是,由于步驟 FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles 是卡住并且作業被取消/終止的步驟,所以我會在作業寫入臨時文件時考慮一個問題。
我發現了與用于寫入臨時文件的第二粒度時間戳 (yyyy-MM-dd_HH-mm-ss) 相關的BEAM-7689問題。發生這種情況是因為多個并發作業可以共享同一個臨時目錄,這可能導致其中一個作業在其他作業完成之前將其刪除。
根據之前的鏈接,為緩解此問題,請升級到 SDK 2.14。并讓我們知道錯誤是否消失。

TA貢獻1951條經驗 獲得超3個贊
自從發布這個問題以來,我已經優化了數據流作業以避開瓶頸并增加并行化。就像 rsantiago 解釋的那樣,處理卡住不是錯誤,而只是數據流傳達的一種方式,即一個步驟比其他步驟花費的時間要長得多,這本質上是一個瓶頸,無法用給定的資源清除。我所做的更改似乎已經解決了這些問題。新代碼如下:
public void streamData() {
try {
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(options.getWindowDuration() + " Window",
Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(parseDuration("24h")))
.apply(FileIO.<String,PubsubMessage>writeDynamic()
.by(new datePartition(options.getOutputFilenamePrefix()))
.via(Contextful.fn(
(SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
TextIO.sink())
.withDestinationCoder(StringUtf8Coder.of())
.to(options.getOutputDirectory())
.withNaming(type -> new CrowdStrikeFileNaming(type))
.withNumShards(options.getNumShards())
.withTempDirectory(options.getTempLocation()));
pipeline.run();
}
catch(Exception e) {
LOG.error("Unable to deploy pipeline");
LOG.error(e.toString(), e);
}
}
最大的變化涉及刪除 extractMsg() 函數并將分區更改為僅使用元數據。這兩個步驟都強制對消息進行反序列化/重新序列化并嚴重影響性能。
此外,由于我的數據集是無限的,我必須設置一個非零數量的分片。我想簡化我的文件命名策略,所以我將它設置為 1 卻不知道它對性能的影響有多大。從那時起,我為我的工作找到了工人/碎片/機器類型的良好平衡(不幸的是,主要基于猜測和檢查)。
盡管在足夠大的數據負載下仍然可能觀察到瓶頸,但盡管負載很重(每天 3-5 tb),管道仍然表現良好。這些更改還顯著改善了自動縮放,但我不知道為什么。數據流作業現在對峰值和谷值的反應要快得多。
添加回答
舉報