亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

在 Airflow 中的組件之間傳輸數據

在 Airflow 中的組件之間傳輸數據

瀟湘沐 2021-12-29 20:19:25
我對 Airflow 很陌生,并且已經閱讀了大部分文檔。從文檔中,我了解到 DAG 中組件之間的小數據可以使用 XCom 類共享。DAG 中發布數據的組件必須推送,訂閱數據的組件必須拉取。但是,我對推和拉的語法部分不是很清楚。我指的是關于文檔的XCom 部分并開發了一個代碼模板。假設我有以下代碼,它只有兩個組件,一個 pusher 和一個 puller。pusher 發布 puller 必須消耗的當前時間并寫入日志文件。from datetime import datetimefrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorlog_file_location = '/usr/local/airflow/logs/time_log.log'default_args = {'owner':'apache'}dag = DAG('pushpull', default_args = default_args)def push_function():    #push this data on the DAG as key-value pair    return(datetime.now()) #current timedef pull_function():    with open(log_file_location, 'a') as logfile:        current_time = '' #pull data from the pusher as key - value pair        logfile.writelines('current time = '+current_time)    logfile.close()with dag:    t1 = PythonOperator(        task_id = 'pusher',         python_callable = push_function)    t2 = PythonOperator(        task_id = 'puller',         python_callable = pull_function)    t2.set_upstream(t1)我需要 Airflow 大師在兩種語法上的幫助:如何從推送功能連同鍵推送數據如何獲得 pull 函數使用 key 拉取數據。
查看完整描述

1 回答

?
天涯盡頭無女友

TA貢獻1831條經驗 獲得超9個贊

使用密鑰推送到 Xcom 的示例:


def push_function(**context):

    msg='the_message'

    print("message to push: '%s'" % msg)

    task_instance = context['task_instance']

    task_instance.xcom_push(key="the_message", value=msg)

使用密鑰拉到 Xcom 的示例:


def pull_function(**kwargs):

    ti = kwargs['ti']

    msg = ti.xcom_pull(task_ids='push_task',key='the_message')

    print("received message: '%s'" % msg)

示例 DAY:


from datetime import datetime, timedelta

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator


DAG = DAG(

  dag_id='simple_xcom',

  start_date=datetime(2017, 10, 26),

  schedule_interval=timedelta(1)

)


def push_function(**context):

    msg='the_message'

    print("message to push: '%s'" % msg)

    task_instance = context['task_instance']

    task_instance.xcom_push(key="the_message", value=msg)


push_task = PythonOperator(

    task_id='push_task', 

    python_callable=push_function,

    provide_context=True,

    dag=DAG)


def pull_function(**kwargs):

    ti = kwargs['ti']

    msg = ti.xcom_pull(task_ids='push_task',key='the_message')

    print("received message: '%s'" % msg)


pull_task = PythonOperator(

    task_id='pull_task', 

    python_callable=pull_function,

    provide_context=True,

    dag=DAG)


push_task >> pull_task


查看完整回答
反對 回復 2021-12-29
  • 1 回答
  • 0 關注
  • 333 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號