2 回答

TA貢獻1836條經驗 獲得超3個贊
看看示例 xcom DAG。
https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py

TA貢獻1872條經驗 獲得超4個贊
正如上面的答案,自定義 XCom 后端可以解決問題。
我們最近實現了一個自定義的 XCom 氣流后端,由vineyard支持,以支持這種情況。
vineyard XCom 后端支持DAG 中任務之間的零拷貝數據共享,并支持tensorflow/mxnet/pytorch 中的 python 值,如 、 和數據numpy.ndarray。pandas.DataFrame提供者在那里是開源的:https ://github.com/v6d-io/v6d/tree/main/python/vineyard/contrib/airflow
使用 Vineyard XCom 后端,用戶可以擁有pandas.DataFrame直接生產和消費的 dag,無需任何“to_csv”+“from_csv”黑客,
import numpy as np
import pandas as pd
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_etl_pandas():
@task()
def extract():
order_data_dict = pd.DataFrame({
'a': np.random.rand(100000),
'b': np.random.rand(100000),
})
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
return {"total_order_value": order_data_dict["a"].sum()}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
taskflow_etl_pandas_dag = taskflow_etl_pandas()
希望對您的情況有所幫助。
添加回答
舉報