我正在嘗試從GCS存儲桶中讀取XML文件的集合,并對其進行處理,其中集合中的每個元素都是代表整個文件的字符串,但是我找不到如何實現此目的的示例,我也無法理解它來自 Apache Beam 文檔,主要是關于 Java 版本。我當前的管道如下所示:p = beam.Pipeline(options=PipelineOptions(pipeline_args))(p | 'Read from a File' >> beam.io.Read(training_files_folder) | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) | 'Write to BigQuery' >> beam.io.Write( beam.io.BigQuerySink( known_args.output, schema='title:STRING,text:STRING,id:STRING', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))p.run().wait_until_finish()解決了第一個問題:事實證明這不適用于DirectRunner,將運行器更改為DataFlowRunner并將Read替換為ReadFromText可以解決此異常:p = beam.Pipeline(options=PipelineOptions(pipeline_args))(p | 'Read from a File' >> beam.io.ReadFromText(training_files_folder) | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) | 'Write to BigQuery' >> beam.io.Write( beam.io.BigQuerySink( known_args.output, schema='title:STRING,text:STRING,id:STRING', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))p.run().wait_until_finish() 但是現在我看到這種方法給了我每個文件中的一行作為管道元素,而我希望將整個文件作為一個字符串作為每個元素。不知道該怎么做。我找到了這篇文章,但它是用 java 編寫的,完全不知道它是如何與 python 和 gcs 版本一起工作的。所以看起來 ReadFromText 對我的用例不起作用,我不知道如何創建文件管道。解決方案:在Ankur的幫助下,我修改了代碼,以包括從MatchResult對象列表轉換所需的步驟,這是GCSFileSystem返回到字符串pCollection的內容,每個字符串代表一個文件。p = beam.Pipeline(options=PipelineOptions(pipeline_args))gcs = GCSFileSystem(PipelineOptions(pipeline_args))gcs_reader = GCSFileReader(gcs)
添加回答
舉報
0/150
提交
取消