亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用一個 Python 任務的輸出并用作 Airflow 上另一個 Python 任務的輸入

使用一個 Python 任務的輸出并用作 Airflow 上另一個 Python 任務的輸入

收到一只叮咚 2022-05-24 17:19:27
因此,我正在使用 Apache Airflow 創建一個數據流,用于獲取一些存儲在 Pandas Dataframe 中的數據,然后將其存儲到 MongoDB 中。所以我有兩種python方法,一種用于獲取數據并返回數據框,另一種用于將其存儲到相關數據庫中。如何獲取一項任務的輸出并將其作為另一項任務的輸入?這就是我到目前為止所擁有的(總結和濃縮版)我研究了 xcom pull 和 push 的概念,這就是我在下面實現的,我還看到有一個用于 Airflow 的 MongoHook,但不太確定如何使用它。import pandas as pdimport pymongoimport airflowfrom datetime import datetime, timedeltafrom airflow.models import DAGfrom airflow.operators.python_operator import PythonOperatordef get_data(name, **context):    data = pd.read_csv('dataset.csv')    df = data.loc[data.name == name]    context['ti'].xcom_push(task_ids=['get-data'], value=data)def push_to_db(df, dbname, collection):    client = pymongo.MongoClient(-insert creds here-)    db = client[dbname][collection]    data = df.to_dict(orient='records')    db.insert_many(data)args = {    'owner': 'Airflow',    'start_date': airflow.utils.dates.days_ago(2),}dag = DAG(  dag_id='simple_xcom',  default_args=args,  start_date=datetime(2019, 09, 02),  schedule_interval="@daily",  retries=2)task1 = PythonOperator(task_id='get-data', params=['name': 'John'],         python_callable=get_data,         provide_context=True, dag=dag)task2 = PythonOperator(task_id='load-db', params=['df': context['ti'].xcom_pull(task_ids=['get-data'], key='data'),     'dbname': 'person', 'table': 'salary'),     python_callable=push_to_db, provide_context=True, dag=dag) task1 >> task2 每次我嘗試運行它時,它都會顯示上下文不存在。所以也許我在將一個任務的輸出作為另一個任務的輸入方面做錯了?
查看完整描述

2 回答

?
米脂

TA貢獻1836條經驗 獲得超3個贊

看看示例 xcom DAG。

https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py


查看完整回答
反對 回復 2022-05-24
?
守著一只汪

TA貢獻1872條經驗 獲得超4個贊

正如上面的答案,自定義 XCom 后端可以解決問題。


我們最近實現了一個自定義的 XCom 氣流后端,由vineyard支持,以支持這種情況。


vineyard XCom 后端支持DAG 中任務之間的零拷貝數據共享,并支持tensorflow/mxnet/pytorch 中的 python 值,如 、 和數據numpy.ndarray。pandas.DataFrame提供者在那里是開源的:https ://github.com/v6d-io/v6d/tree/main/python/vineyard/contrib/airflow


使用 Vineyard XCom 后端,用戶可以擁有pandas.DataFrame直接生產和消費的 dag,無需任何“to_csv”+“from_csv”黑客,


import numpy as np

import pandas as pd


from airflow.decorators import dag, task

from airflow.utils.dates import days_ago


default_args = {

    'owner': 'airflow',

}


@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])

def taskflow_etl_pandas():

    @task()

    def extract():

        order_data_dict = pd.DataFrame({

            'a': np.random.rand(100000),

            'b': np.random.rand(100000),

        })

        return order_data_dict


    @task(multiple_outputs=True)

    def transform(order_data_dict: dict):

        return {"total_order_value": order_data_dict["a"].sum()}


    @task()

    def load(total_order_value: float):

        print(f"Total order value is: {total_order_value:.2f}")


    order_data = extract()

    order_summary = transform(order_data)

    load(order_summary["total_order_value"])


taskflow_etl_pandas_dag = taskflow_etl_pandas()

希望對您的情況有所幫助。


查看完整回答
反對 回復 2022-05-24
  • 2 回答
  • 0 關注
  • 243 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號