我可以根據在上一個數據流步驟中處理的數據將數據插入到不同的 bigQuery 數據集嗎?我正在創建一個數據流管道,它從 PubSub 訂閱中讀取數據并寫入大查詢表。其定義如下:def run(argv=None, save_main_session=True): options: PipelineOptions = PipelineOptions( project='project-id', runner='DataflowRunner', region='region', streaming=True, setup_file='dataflow/setup.py', autoscaling_algorithm='THROUGHPUT_BASED', job_name='telemetry-processing' ) with beam.Pipeline(options=options) as p: status = ( p | 'Get Status PubSub' >> beam.io.ReadFromPubSub( subscription='projects/project-id/subscriptions/subscription-id', with_attributes=True)) status_records = (status| 'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, nozzle_status_proto.NozzleStatus)) ) status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project- id:dataset-id.table-id') bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes())) bytes_status | 'Write to BQ BackUp' >> beam.io.WriteToBigQuery( 'project-id:dataset-id.backup-table-id')對于給定的輸入和輸出,它完全按照預期工作。我想要的是,關于 PubSubMessage 中的特定屬性,定義我的消息應該發送到哪個數據集。所以我需要改變的部分是:status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:dataset-id.table-id')我已經嘗試提取所需的數據并像這樣使用它:status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:{data-from-previous-step}.table-id')但我們無法直接從 PCollection 獲取數據。我嘗試像這篇文章中那樣覆蓋 WriteToBigQuery(How can I write to Big Query using a runtime valueprovider in Apache Beam?),但我沒有收到錯誤,也沒有插入任何內容。我不知道如何實現這一點。你知道我應該從哪里開始做這件事嗎?我是否必須為 n 個數據集創建 n 個管道?
1 回答

婷婷同學_
TA貢獻1844條經驗 獲得超8個贊
WriteToBigQuery 的“table”參數可以是從元素到應寫入的表的函數。例如:
status_records | 'Write' >> beam.io.WriteToBigQuery( lambda e: 'dataset1.invalid_records' if is_invalid(e) else 'dataset2.good_records')
添加回答
舉報
0/150
提交
取消