亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

NFS(Netapp 服務器)-> Flink -> s3

NFS(Netapp 服務器)-> Flink -> s3

Helenr 2023-07-13 18:14:47
我是 flink (java) 的新手,并嘗試將作為文件路徑安裝的 netapp 文件服務器上的 xml 文件移動到安裝了 flink 的服務器上。如何實時進行批處理或流處理以獲取進入文件夾的文件并使用 s3 接收它。我在 flink-starter 中找不到任何從本地文件系統讀取文件的示例,flink 至少是這個用例的正確選擇嗎?如果是這樣,我在哪里可以找到資源來監聽文件夾和管理檢查點/保存點?
查看完整描述

2 回答

?
料青山看我應如是

TA貢獻1772條經驗 獲得超8個贊

如果您的目標只是將文件復制到 s3,那么有更簡單、更合適的工具。也許同步是合適的。

假設使用 Flink 有意義(例如,因為您想要對數據執行一些有狀態轉換),則所有任務管理器(工作人員)都可以使用相同的 URI 訪問要處理的文件。為此,您可以使用 file:// URI。

您可以執行以下操作來監視目錄并在新文件出現時攝取它們:

StreamExecutionEnvironment env =? ??

? StreamExecutionEnvironment.getExecutionEnvironment();


// monitor directory, checking for new files

// every 100 milliseconds


TextInputFormat format = new TextInputFormat(

? new org.apache.flink.core.fs.Path("file:///tmp/dir/"));


DataStream<String> inputStream = env.readFile(

? format,?

? "file:///tmp/dir/",

? FileProcessingMode.PROCESS_CONTINUOUSLY,?

? 100,?

? FilePathFilter.createDefaultFilter());

請注意文檔中的此警告:

如果 watchType 設置為 FileProcessingMode.PROCESS_CONTINUOUSLY,則修改文件時,將完全重新處理其內容。這可能會破壞“僅一次”語義,因為在文件末尾附加數據將導致其所有內容被重新處理。

這意味著您應該自動將準備好攝取的文件移動到正在監視的文件夾中。

您可以使用流文件接收器寫入S3。Flink 的寫入操作(例如 )writeUsingOutputFormat()不參與檢查點,因此在這種情況下這不是一個好的選擇。


查看完整回答
反對 回復 2023-07-13
?
烙印99

TA貢獻1829條經驗 獲得超13個贊

此問題的完整工作代碼位于以下鏈接中。您需要啟用檢查點以將 .inprogress 文件移動到實際文件

// 每 1000 毫秒啟動一個檢查點 env.enableCheckpointing(1000);

StreamingFileSink 未將數據提取到 s3


查看完整回答
反對 回復 2023-07-13
  • 2 回答
  • 0 關注
  • 204 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號