要求:使用 while 循環為每個日期運行 SQL 查詢。例如:開始日期選擇為 8 月 25 日,結束日期選擇為 8 月 28 日。然后 BigQueryOperator 首先運行 8 月 25 日,然后是 8 月 26 日,依此類推,直到我們到達 28 日。問題:在下面的 DAG 中,它只執行開始日期的查詢,然后完成作業。它甚至不會執行/迭代 BigQueryOperator 到下一個日期等等。from airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom airflow.contrib.operators.bigquery_operator import BigQueryOperatorfrom datetime import date, datetime, timedeltaimport datetimedefault_args = { 'owner': 'airflow', 'start_date': datetime.datetime(2018, 8, 31), 'email': ['[email protected]'], 'email_on_failure': True, 'retries': 1, 'retry_delay': timedelta(minutes=10), 'depends_on_past': False}dag = DAG('his_temp',default_args=default_args,schedule_interval=None)date1 = datetime.date(2018, 8, 25)date2 = datetime.date(2018, 8, 28)day = datetime.timedelta(days=1)while date1 <= date2: parameter = { 'dataset': "projectname.finance", 'historical_date': date1.strftime('%Y%m%d') } sqlpartition = BigQueryOperator( task_id='execute_sqlpartition', use_legacy_sql=False, write_disposition='WRITE_TRUNCATE', allow_large_results=True, bql="sqlqueries/sqlpartition.sql", destination_dataset_table=parameter.get('dataset') + "." + "date_partition_" + parameter.get('historical_date'), params=parameter, bigquery_conn_id='bigquery', dag=dag) print "data loaded for "+ parameter.get('historical_date') date1 = date1 + day
2 回答

慕姐4208626
TA貢獻1852條經驗 獲得超7個贊
Airflow scheduler 的整個概念是它會調度任務,你只需要正確配置它。難怪它會在提到的開始日期運行一次,因為將選擇 dag 開始日期,并且由于沒有安排每日任務,它將運行一次并停止。您必須在 dag 級別而不是操作員級別進行配置。

互換的青春
TA貢獻1797條經驗 獲得超6個贊
您可以在依賴項的末尾添加自觸發運算符。類似于以下內容:
def trigger_check(context, dag_run_obj):
if date1 <= date2:
return dag_run_obj
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",
trigger_dag_id="his_temp",
python_callable=trigger_check,
... more arguments
)
op1 >> op2 >> ... >> trigger
第一次觸發它后,它會循環遍歷日期,直到達到 date2 閾值。您必須更加小心地通過將其設為有序的 PythonOperator 或類似的東西來更新日期
添加回答
舉報
0/150
提交
取消