亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

氣流 - 創建 dag 和任務動態地為一個對象創建管道

氣流 - 創建 dag 和任務動態地為一個對象創建管道

慕碼人2483693 2024-01-15 17:09:42
在氣流中,我想將一些表從 pg 導出到 BQ。task1: get the max id from BQtask2: export the data from PG (id>maxid)task3: GCS to BQ stagetask4: BQ stage to BQ main但有一個小挑戰,日程間隔不同。所以我創建了一個 JSON 文件來告訴同步間隔。因此,如果是 2 分鐘,那么它將使用 DAG upsert_2mins,否則將使用 10 分鐘間隔 ( upsert_10mins) 。我使用這個語法來動態生成它。JSON 配置文件:{    "tbl1": ["update_timestamp", "2mins", "stg"],    "tbl2": ["update_timestamp", "2mins", "stg"]}它實際上創建了 dag,但問題是來自 Web UI,我能夠看到最后一個表的任務。但它必須顯示 2 個表的任務。
查看完整描述

1 回答

?
慕森卡

TA貢獻1806條經驗 獲得超8個贊

您的代碼正在創建 2 個 dags,每個表一個,但用第二個覆蓋第一個。


我的建議是將 JSON 文件的格式更改為:


{

    "2mins": [

                "tbl1": ["update_timestamp", "stg"],

                "tbl2": ["update_timestamp", "stg"]

             ],

    "10mins": [

                "tbl3": ["update_timestamp", "stg"],

                "tbl4": ["update_timestamp", "stg"]

             ]

}

讓您的代碼迭代計劃并為每個表創建所需的任務(您將需要兩個循環):


# looping on the schedules to create two dags

for schedule, tables in config.items():


cron_time = '*/10 * * * *'


if schedule== '2mins':

    cron_time = '*/20 * * * *'


dag_id = 'upsert_every_{}'.format(schedule)


dag = DAG(

    dag_id ,

    default_args=default_args,

    description='Incremental load - Every 10mins',

    schedule_interval=cron_time,

    catchup=False,

    max_active_runs=1,

    doc_md = docs

)


# Looping over the tables to create the tasks for 

# each table in the current schedule

for table_name, table_config in tables.items():

    max_ts = PythonOperator(

        task_id="get_maxts_{}".format(table_name),

        python_callable=get_max_ts,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )


    export_gcs = PythonOperator(

        task_id='export_gcs_{}'.format(table_name),

        python_callable=pgexport,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )


    stg_load = PythonOperator(

        task_id='stg_load_{}'.format(table_name),

        python_callable=stg_bqimport,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )    


    merge = PythonOperator(

        task_id='merge_{}'.format(table_name),

        python_callable=prd_merge,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )

    

    # Tasks for the same table will be chained

    max_ts >> export_gcs >> stg_load >> merge


# DAG is created among the global objects

globals()[dag_id] = dag


查看完整回答
反對 回復 2024-01-15
  • 1 回答
  • 0 關注
  • 143 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號