亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何使用 While 循環執行 Airflow 運算符

如何使用 While 循環執行 Airflow 運算符

呼啦一陣風 2021-06-01 21:21:50
要求:使用 while 循環為每個日期運行 SQL 查詢。例如:開始日期選擇為 8 月 25 日,結束日期選擇為 8 月 28 日。然后 BigQueryOperator 首先運行 8 月 25 日,然后是 8 月 26 日,依此類推,直到我們到達 28 日。問題:在下面的 DAG 中,它只執行開始日期的查詢,然后完成作業。它甚至不會執行/迭代 BigQueryOperator 到下一個日期等等。from airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom airflow.contrib.operators.bigquery_operator import BigQueryOperatorfrom datetime import date, datetime, timedeltaimport datetimedefault_args = {    'owner': 'airflow',    'start_date': datetime.datetime(2018, 8, 31),    'email': ['[email protected]'],    'email_on_failure': True,    'retries': 1,    'retry_delay': timedelta(minutes=10),    'depends_on_past': False}dag = DAG('his_temp',default_args=default_args,schedule_interval=None)date1 = datetime.date(2018, 8, 25)date2 = datetime.date(2018, 8, 28)day = datetime.timedelta(days=1)while date1 <= date2:    parameter = {        'dataset': "projectname.finance",        'historical_date': date1.strftime('%Y%m%d')    }    sqlpartition = BigQueryOperator(    task_id='execute_sqlpartition',    use_legacy_sql=False,    write_disposition='WRITE_TRUNCATE',    allow_large_results=True,    bql="sqlqueries/sqlpartition.sql",    destination_dataset_table=parameter.get('dataset') + "." + "date_partition_" + parameter.get('historical_date'),    params=parameter,    bigquery_conn_id='bigquery',    dag=dag)    print "data loaded for "+ parameter.get('historical_date')    date1 = date1 + day   
查看完整描述

2 回答

?
慕姐4208626

TA貢獻1852條經驗 獲得超7個贊

Airflow scheduler 的整個概念是它會調度任務,你只需要正確配置它。難怪它會在提到的開始日期運行一次,因為將選擇 dag 開始日期,并且由于沒有安排每日任務,它將運行一次并停止。您必須在 dag 級別而不是操作員級別進行配置。



查看完整回答
反對 回復 2021-06-08
?
互換的青春

TA貢獻1797條經驗 獲得超6個贊

您可以在依賴項的末尾添加自觸發運算符。類似于以下內容:


def trigger_check(context, dag_run_obj):

    if date1 <= date2:

        return dag_run_obj

trigger = TriggerDagRunOperator(

    task_id="test_trigger_dagrun",

    trigger_dag_id="his_temp",

    python_callable=trigger_check,

    ... more arguments

)

op1 >> op2 >> ... >> trigger

第一次觸發它后,它會循環遍歷日期,直到達到 date2 閾值。您必須更加小心地通過將其設為有序的 PythonOperator 或類似的東西來更新日期


查看完整回答
反對 回復 2021-06-08
  • 2 回答
  • 0 關注
  • 281 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號