在今天的数据驱动世界中,高效的任务调度对于管理复杂的工作流程和确保及时的数据处理至关重要。Apache Airflow 是一种强大的工作流编排工具,但当与 FastAPI 和 Docker 结合使用时,才能真正发挥其全部潜力。这种组合提供了一种强大、可扩展且灵活的解决方案的,用于管理和部署任务调度。
本文将介绍如何用Docker来设置及集成Apache Airflow与FastAPI,以创建一个无缝且高效的调度任务系统。
请确保您的机器上已经安装了 Docker。如果没有,请从这里下载并安装 Docker。
第 2 步,创建 Docker Compose 文件创建一个 docker-compose.yml
文件来定义我们各服务的配置。
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.9.2-python3.11}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
volumes:
- ./airflow/dags:/opt/airflow/dags
- ./airflow/logs:/opt/airflow/logs
- ./airflow/config:/opt/airflow/config
- ./airflow/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
networks:
- shared_network
redis:
image: redis:7.2-bookworm
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 5
start_period: 30s
restart: always
networks:
- shared_network
fastapi:
build:
context: .
dockerfile: .docker/backend/Dockerfile
ports:
- "8000:8000"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 5s
networks:
- shared_network
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
networks:
- shared_network
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
networks:
- shared_network
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33m警告!!!: AIRFLOW_UID 未设置!\e[0m"
echo "建议按照下面的说明设置 AIRFLOW_UID 的环境变量,否则文件将归 root 用户所有。"
echo "对于其他操作系统,你可以通过手动创建 .env 文件来消除警告:"
echo " 参见:https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#设置正确的Airflow用户"
echo
fi
one_meg=1048576
mem_available=$(($(getconf _PHYS_PAGES) * $(getconf PAGE_SIZE) / one_meg))
cpus_available=$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$(df / | tail -1 | awk '{print $4}')
warning_resources="false"
if (( mem_available < 4000 )); then
echo
echo -e "\033[1;33m警告!!!: Docker 可用内存不足。\e[0m"
echo "至少需要 4GB 内存。您有 $(numfmt --to iec $((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33m警告!!!: Docker 可用 CPU 不足。\e[0m"
echo "至少需要 2 个 CPU。您有 ${cpus_available} 个可用。"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33m警告!!!: Docker 可用磁盘空间不足。\e[0m"
echo "推荐至少 10GB。您有 $(numfmt --to iec $((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ ${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33m警告!!!: 您没有足够的资源来运行 Airflow(参见上面的警告)!\e[0m"
echo "请按照说明增加可用资源,以满足运行 Airflow 的要求:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#开始前的准备"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- ./airflow:/sources
networks:
- shared_network
volumes:
postgres-db-volume:
networks:
shared_network:
driver: bridge
这个 docker-compose.yml
文件是用于通过 Docker Compose 部署 Apache Airflow 环境。它包括多个服务组件,如 PostgreSQL、Redis、FastAPI 和 Airflow 组件。下面简要介绍每个部分及其功能:
x-airflow-common
)
这是一个可重用的配置块,可以用于各种Airflow服务。
**image**
: 指定用于 Airflow 的 Docker 镜像名称。**environment**
: 定义用于 Airflow 配置的环境变量,例如执行器类型(如 LocalExecutor)、数据库连接设置以及其他 Airflow 特定的设置。**volumes**
: 将本地目录映射到容器目录,以持久保存数据和配置。**user**
: 设置容器的用户和组 ID。**depends_on**
: 确保在启动 Airflow 之前,Redis 和 PostgreSQL 服务是健康的。
**postgres**
: 主要的数据库服务,基于官方的 postgres Docker 镜像构建。**fastapi**
: 我们的后端服务。它是根据.docker\backend\Dockerfile
文件构建的。**airflow-webserver**
: 运行 airflow 的 web 服务器,并监听 8080 端口。**airflow-scheduler**
: 运行 airflow 调度器。**airflow-init**
: airflow 初始化服务。建议使用 init 服务启动 airflow。
**postgres-db-volume**
: 定义一个用于保存PostgreSQL数据的Docker卷。
**shared_network**
: 创建一个Docker桥接网络,以便服务之间可以相互通信。
创建一个名为 dags
的目录并将您的 DAG 放进去。这里有一个示例 DAG example_dag.py
。(有向无环图)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=10),
}
def extract_data():
print("正在提取数据")
# 在这里添加数据提取逻辑
返回 "数据已提取"
def transform_data(**kwargs):
ti = kwargs['ti']
extracted_data = ti.xcom_pull(task_ids='extract_task')
print(f"正在转换数据:{extracted_data}")
# 在这里添加数据转换逻辑
返回 "转换后的数据"
def load_data(**kwargs):
ti = kwargs['ti']
transformed_data = ti.xcom_pull(task_ids='transform_task')
print(f"正在加载数据:{transformed_data}")
# 在这里添加数据加载逻辑
print("数据加载成功")
with DAG(
'complex_example_dag',
default_args=default_args,
description='一个包含多个任务的更复杂的DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
start_task = BashOperator(
task_id='start',
bash_command='echo "启动DAG"',
)
extract_task = PythonOperator(
task_id='extract_task',
python_callable=extract_data,
)
transform_task = PythonOperator(
task_id='transform_task',
python_callable=transform_data,
provide_context=True,
)
load_task = PythonOperator(
task_id='load_task',
python_callable=load_data,
provide_context=True,
)
end_task = BashOperator(
task_id='end',
bash_command='echo "DAG运行完成"',
)
# 定义任务之间的依赖关系
start_task >> extract_task >> transform_task >> load_task >> end_task
步骤 4:搭建 FastAPI 应用程序
创建一个名为 fastapi
的目录并将你的 FastAPI 应用程序添加到该目录中。这里有一个 main.py
的示例:
import aiohttp
import base64
from uuid import UUID
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, UUID4
app = FastAPI()
class DAGRunRequest(BaseModel):
dag_id: str
task_uuid: UUID
@app.get("/")
def read_root():
return {"message": "欢迎来到Airflow与FastAPI的集成示例"}
@app.get("/health", status_code=200)
async def health_check():
return JSONResponse(content={"status": "ok"})
@app.post("/trigger_dag/")
async def trigger_dag(request: DAGRunRequest):
url = f"http://airflow-webserver:8080/api/v1/dags/{request.dag_id}/dagRuns"
# dag_run_id 是任务的运行标识符
data = {"dag_run_id": str(request.task_uuid)}
headers = {"Content-Type": "application/json", "Authorization": "Basic " + base64.b64encode(b"airflow:airflow").decode("utf-8")}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=data) as response:
if response.status == 200:
response_data = await response.json()
print("成功:")
print(response_data)
return response_data
else:
error_text = await response.text()
print(f"失败,状态码:{response.status},错误信息:{error_text}")
raise HTTPException(status_code=response.status, detail=error_text)
except Exception as e:
# HTTPException 用于表示HTTP请求中的异常
raise HTTPException(status_code=500, detail=f"服务器内部错误:发生了一个意外错误。{e}")
Trigger_dag
是我们 fastAPI 应用的核心。app.post("/trigger_dag/")
:首先我们实例化该函数,并定义了 POST 端点 /trigger_dag/
。客户端可以通过发送 POST 请求到该 URL 来触发 DAG 运行。
接下来,我们将通过构建URL来构造Airflow DAG的触发器。
url
: 构建Airflow DAG触发的API端点URL。headers
: 设置所需的HTTP头,包括Content-Type
和Authorization
。这里使用了基本认证,并通过Base64编码进行加密。data
: 准备POST请求的JSON负载,其中包括从task_uuid
派生的dag_run_id
。
- 发起一个异步请求:
aiohttp.ClientSession()
:这创建了一个异步的HTTP会话,允许非阻塞请求。session.post(url, headers=headers, json=data)
:发出一个POST请求到Airflow的API,带有指定的headers和json格式的数据。
- 处理响应:
- 成功:如果响应状态是200(OK),则解析并返回响应。
- 失败:如果响应状态不是200,则记录错误消息,然后根据错误详情引发HTTP异常。
- 异常处理:
- 捕获请求时出现的任何意外错误,并抛出一个带有描述性消息的500错误。
运行下面的命令来构建和启动您的Docker容器实例:
运行以下命令来构建和启动服务:
docker-compose up --build
该命令会重新构建所有服务并启动它们。
此命令会配置您的PostgreSQL数据库、Web服务器(即Airflow的web服务器)、调度器和FastAPI应用程序。
将FastAPI与Airflow集成在一起在环境设置好之后,我们就可以将FastAPI与Airflow集成在一起,通过编程方式触发DAG运行。
使用FastAPI触发DAG们 (有向无环图们)我们的FastAPI应用包含一个触发DAG的端点。例如,你可以使用curl
、Postman或任何HTTP客户端向该端点发送一个POST请求。
这里有个使用curl
的例子:
运行以下命令来触发一个任务流程:
curl -X POST "http://localhost:8000/触发dag/" -H "Content-Type: application/json" -d '{"dag_id": "example_dag"}' # 这里设置dag_id为"example_dag"
安排和监督
一旦DAG启动,你就可以通过Airflow的Web UI(在http://localhost:8080
)来监控其进度。该UI提供了一个全面的DAG运行、任务状态、日志等等信息的概览。
通过整合Apache Airflow、FastAPI和Docker,我们创建了一个强大、灵活且可扩展的任务调度框架。这种设置允许通过程序触发工作流,无缝部署容器化的应用,高效监控和管理任务。
这种组合非常适合处理复杂的数据工作流和ETL过程,以及任何需要强大任务调度和编排的场景。通过Docker的容器化技术、FastAPI的高性能和Airflow的强大编排能力,您将能够应对现代数据处理的各种需求。
您可以随意扩展此设置,添加诸如认证、详细日志记录和更复杂的工作流等功能,以满足您的特定需求。
代码库已上传至 GitHub [https://github.com/n-sakib/airflow-fastapi-docker]。
安排愉快!
共同學習,寫下你的評論
評論加載中...
作者其他優質文章