亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Google Cloud Composer DAG 沒有被觸發

Google Cloud Composer DAG 沒有被觸發

慕雪6442864 2023-05-16 14:58:15
我計劃從今天 2020/08/11 開始在東部標準時間 (NY) 周二至周六凌晨 04:00 運行 DAG。編寫代碼并部署后,我預計 DAG 會被觸發。我刷新了我的 Airflow UI 頁面幾次,但它仍然沒有觸發。我正在使用帶有 python 3 的 Airflow 版本 v1.10.9-composer。這是我的 DAG 代碼:"""This DAG executes a retrieval job"""# Required packages to execute DAGfrom __future__ import print_functionimport pendulumfrom airflow.models import DAGfrom airflow.models import Variablefrom datetime import datetime, timedeltafrom airflow.contrib.operators.ssh_operator import SSHOperatorfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.utils.trigger_rule import TriggerRulelocal_tz = pendulum.timezone("America/New_York")# DAG parametersdefault_args = {    'owner': 'Me',    'depends_on_past': False,    'start_date': datetime(2020, 8, 10, 4, tzinfo=local_tz),    'dagrun_timeout': None,    'email': Variable.get('email'),    'email_on_failure': True,    'email_on_retry': False,    'provide_context': True,    'retries': None,    'retry_delay': timedelta(minutes=5)}# create DAG object with Name and default_argswith DAG(        'retrieve_files',        schedule_interval='0 4 * * 2-6',        description='Retrieves files from sftp',        max_active_runs=1,        catchup=True,        default_args=default_args) as dag:    # Define tasks - below are dummy tasks and a task instantiated by SSHOperator- calling methods written in other py class    start_dummy = DummyOperator(        task_id='start',        dag=dag    )    end_dummy = DummyOperator(        task_id='end',        trigger_rule=TriggerRule.NONE_FAILED,        dag=dag    )    retrieve_file = SSHOperator(        ssh_conn_id="my_conn",        task_id='retrieve_file',        command='/usr/bin/python3  /path_to_file/getFile.py',        dag=dag)    dag.doc_md = __doc__    retrieve_file.doc_md = """\    #### Task Documentation    Connects to sftp and retrieves files.    """    start_dummy >> retrieve_file >> end_dummy
查看完整描述

1 回答

?
郎朗坤

TA貢獻1921條經驗 獲得超9個贊

參考官方文檔:

調度程序會在開始日期之后的一個 schedule_interval 運行您的作業。

如果您的 start_date 是 2020-01-01 并且 schedule_interval 是@daily,則第一次運行將在 2020-01-02 上創建,即在您的開始日期過后。

為了在每天的特定時間(包括今天)運行 DAG,start_date需要將時間設置為過去的時間,并且schedule_interval需要具有所需的時間格式cron。正確設置昨天的日期時間非常重要,否則觸發器將不起作用。

在這種情況下,我們應該將 設置start_date為上一周的星期二,即:(2020, 8, 4)。由于每周運行一次,因此從開始日期起應該有 1 周的間隔。

讓我們看一下以下示例,它展示了如何在美國東部標準時間周二至周六凌晨 04:00 運行作業:

from datetime import datetime, timedelta

from airflow import models

import pendulum

from airflow.operators import bash_operator


local_tz = pendulum.timezone("America/New_York")


default_dag_args = {

? ? 'start_date': datetime(2020, 8, 4, 4, tzinfo=local_tz),

? ? 'retries': 0,

}


with models.DAG(

? ? ? ? 'Test',

? ? ? ? default_args=default_dag_args,

? ? ? ? schedule_interval='00 04 * * 2-6') as dag:

? ? ? ?# DAG code

? ? print_dag_run_conf = bash_operator.BashOperator(

? ? ? ? task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

我建議您查看start_date 文檔的處理方式。



查看完整回答
反對 回復 2023-05-16
  • 1 回答
  • 0 關注
  • 147 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號