2 回答

TA貢獻1111條經驗 獲得超0個贊
TextIO
逐行讀取文件。因此,在您的 test.json 中,每一行都需要包含一個單獨的 Json 對象。
光束或任何分布式處理引擎的想法是能夠并行化輸入數據。從您的問題來看,似乎需要進行一些預處理才能將它們拆分為多個 json。請注意,它不必位于單個文件中,您可以擁有多個文件,每個文件都包含任意數量的 json 文件。Beam 將并行讀取行。
如果有幫助,請接受答案。

TA貢獻1871條經驗 獲得超8個贊
希望對從文件讀取的對象進行并行化是一個合理的用例。
import apache_beam as beam
from apache_beam.io import fileio
import json
# Make some fake data
for i in range(0,10):
with open(f'/tmp/data{i}.json', 'w') as f:
json.dump({'somethinig':i,'otherthing':[1,2,3]}, f)
filenames = [f'/tmp/data{i}.json' for i in range(0,10)]
with beam.Pipeline() as pipeline:
lines = (
pipeline
| beam.Create(filenames)
| fileio.MatchAll()
| fileio.ReadMatches()
| beam.Map(lambda file: print(file.read_utf8()))
)
添加回答
舉報