亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

使用Apache Airflow、FastAPI和Docker構建更高效的任務調度系統

標簽:
Python Docker API
介绍

在今天的数据驱动世界中,高效的任务调度对于管理复杂的工作流程和确保及时的数据处理至关重要。Apache Airflow 是一种强大的工作流编排工具,但当与 FastAPI 和 Docker 结合使用时,才能真正发挥其全部潜力。这种组合提供了一种强大、可扩展且灵活的解决方案的,用于管理和部署任务调度。

本文将介绍如何用Docker来设置及集成Apache Airflow与FastAPI,以创建一个无缝且高效的调度任务系统。

搭建环境
第一步:安装 Docker

请确保您的机器上已经安装了 Docker。如果没有,请从这里下载并安装 Docker。

第 2 步,创建 Docker Compose 文件

创建一个 docker-compose.yml 文件来定义我们各服务的配置。

    x-airflow-common: &airflow-common  
      image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.9.2-python3.11}  
      environment:  
        &airflow-common-env  
        AIRFLOW__CORE__EXECUTOR: CeleryExecutor  
        AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow  
        AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow  
        AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0  
        AIRFLOW__CORE__FERNET_KEY: ''  
        AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'  
        AIRFLOW__CORE__LOAD_EXAMPLES: 'false'  
        AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'  
        AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'  
      volumes:  
        - ./airflow/dags:/opt/airflow/dags  
        - ./airflow/logs:/opt/airflow/logs  
        - ./airflow/config:/opt/airflow/config  
        - ./airflow/plugins:/opt/airflow/plugins  
      user: "${AIRFLOW_UID:-50000}:0"  
      depends_on:  
        &airflow-common-depends-on  
        redis:  
          condition: service_healthy  
        postgres:  
          condition: service_healthy  

    services:  
      postgres:  
        image: postgres:13  
        environment:  
          POSTGRES_USER: airflow  
          POSTGRES_PASSWORD: airflow  
          POSTGRES_DB: airflow  
        volumes:  
          - postgres-db-volume:/var/lib/postgresql/data  
        healthcheck:  
          test: ["CMD", "pg_isready", "-U", "airflow"]  
          interval: 10s  
          retries: 5  
          start_period: 5s  
        restart: always  
        networks:  
          - shared_network  

      redis:  
        image: redis:7.2-bookworm  
        expose:  
          - 6379  
        healthcheck:  
          test: ["CMD", "redis-cli", "ping"]  
          interval: 10s  
          timeout: 30s  
          retries: 5  
          start_period: 30s  
        restart: always  
        networks:  
          - shared_network  

      fastapi:  
        build:  
          context: .  
          dockerfile: .docker/backend/Dockerfile  
        ports:  
          - "8000:8000"  
        healthcheck:  
          test: ["CMD", "curl", "-f", "http://localhost:8000/health"]  
          interval: 30s         
          timeout: 10s           
          retries: 3             
          start_period: 5s   
        networks:  
          - shared_network  

      airflow-webserver:  
        <<: *airflow-common  
        command: webserver  
        ports:  
          - "8080:8080"  
        healthcheck:  
          test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]  
          interval: 30s  
          timeout: 10s  
          retries: 5  
          start_period: 30s  
        restart: always  
        depends_on:  
          <<: *airflow-common-depends-on  
          airflow-init:  
            condition: service_completed_successfully  
        networks:  
          - shared_network  

      airflow-scheduler:  
        <<: *airflow-common  
        command: scheduler  
        healthcheck:  
          test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]  
          interval: 30s  
          timeout: 10s  
          retries: 5  
          start_period: 30s  
        restart: always  
        depends_on:  
          <<: *airflow-common-depends-on  
          airflow-init:  
            condition: service_completed_successfully  
        networks:  
          - shared_network  

      airflow-init:  
        <<: *airflow-common  
        entrypoint: /bin/bash  
        command:  
          - -c  
          - |  
            if [[ -z "${AIRFLOW_UID}" ]]; then  
              echo  
              echo -e "\033[1;33m警告!!!: AIRFLOW_UID 未设置!\e[0m"  
              echo "建议按照下面的说明设置 AIRFLOW_UID 的环境变量,否则文件将归 root 用户所有。"  
              echo "对于其他操作系统,你可以通过手动创建 .env 文件来消除警告:"  
              echo "    参见:https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#设置正确的Airflow用户"  
              echo  
            fi  
            one_meg=1048576  
            mem_available=$(($(getconf _PHYS_PAGES) * $(getconf PAGE_SIZE) / one_meg))  
            cpus_available=$(grep -cE 'cpu[0-9]+' /proc/stat)  
            disk_available=$(df / | tail -1 | awk '{print $4}')  
            warning_resources="false"  
            if (( mem_available < 4000 )); then  
              echo  
              echo -e "\033[1;33m警告!!!: Docker 可用内存不足。\e[0m"  
              echo "至少需要 4GB 内存。您有 $(numfmt --to iec $((mem_available * one_meg)))"  
              echo  
              warning_resources="true"  
            fi  
            if (( cpus_available < 2 )); then  
              echo  
              echo -e "\033[1;33m警告!!!: Docker 可用 CPU 不足。\e[0m"  
              echo "至少需要 2 个 CPU。您有 ${cpus_available} 个可用。"  
              echo  
              warning_resources="true"  
            fi  
            if (( disk_available < one_meg * 10 )); then  
              echo  
              echo -e "\033[1;33m警告!!!: Docker 可用磁盘空间不足。\e[0m"  
              echo "推荐至少 10GB。您有 $(numfmt --to iec $((disk_available * 1024 )))"  
              echo  
              warning_resources="true"  
            fi  
            if [[ ${warning_resources} == "true" ]]; then  
              echo  
              echo -e "\033[1;33m警告!!!: 您没有足够的资源来运行 Airflow(参见上面的警告)!\e[0m"  
              echo "请按照说明增加可用资源,以满足运行 Airflow 的要求:"  
              echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#开始前的准备"  
              echo  
            fi  
            mkdir -p /sources/logs /sources/dags /sources/plugins  
            chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}  
            exec /entrypoint airflow version  
        environment:  
          <<: *airflow-common-env  
          _AIRFLOW_DB_MIGRATE: 'true'  
          _AIRFLOW_WWW_USER_CREATE: 'true'  
          _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}  
          _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}  
          _PIP_ADDITIONAL_REQUIREMENTS: ''  
        user: "0:0"  
        volumes:  
          - ./airflow:/sources  
        networks:  
          - shared_network  

    volumes:  
      postgres-db-volume:  

    networks:  
      shared_network:  
        driver: bridge

这个 docker-compose.yml 文件是用于通过 Docker Compose 部署 Apache Airflow 环境。它包括多个服务组件,如 PostgreSQL、Redis、FastAPI 和 Airflow 组件。下面简要介绍每个部分及其功能:

常见配置设置 (x-airflow-common)

这是一个可重用的配置块,可以用于各种Airflow服务。

  • **image**: 指定用于 Airflow 的 Docker 镜像名称。
  • **environment**: 定义用于 Airflow 配置的环境变量,例如执行器类型(如 LocalExecutor)、数据库连接设置以及其他 Airflow 特定的设置。
  • **volumes**: 将本地目录映射到容器目录,以持久保存数据和配置。
  • **user**: 设置容器的用户和组 ID。
  • **depends_on**: 确保在启动 Airflow 之前,Redis 和 PostgreSQL 服务是健康的。
服务
  1. **postgres**: 主要的数据库服务,基于官方的 postgres Docker 镜像构建。
  2. **fastapi**: 我们的后端服务。它是根据 .docker\backend\Dockerfile 文件构建的。
  3. **airflow-webserver**: 运行 airflow 的 web 服务器,并监听 8080 端口。
  4. **airflow-scheduler**: 运行 airflow 调度器。
  5. **airflow-init**: airflow 初始化服务。建议使用 init 服务启动 airflow。
卷数
  • **postgres-db-volume**: 定义一个用于保存PostgreSQL数据的Docker卷。
网络(网路)
  • **shared_network**: 创建一个Docker桥接网络,以便服务之间可以相互通信。
步骤 3:定义 Airflow 的 DAG

创建一个名为 dags 的目录并将您的 DAG 放进去。这里有一个示例 DAG example_dag.py。(有向无环图)

    from airflow import DAG  
    from airflow.operators.python_operator import PythonOperator  
    from airflow.operators.bash_operator import BashOperator  
    from datetime import datetime, timedelta  

    default_args = {  
        'owner': 'airflow',  
        'depends_on_past': False,  
        'email_on_failure': False,  
        'email_on_retry': False,  
        'retries': 2,  
        'retry_delay': timedelta(minutes=10),  
    }  

    def extract_data():  
        print("正在提取数据")  
        # 在这里添加数据提取逻辑  
        返回 "数据已提取"  

    def transform_data(**kwargs):  
        ti = kwargs['ti']  
        extracted_data = ti.xcom_pull(task_ids='extract_task')  
        print(f"正在转换数据:{extracted_data}")  
        # 在这里添加数据转换逻辑  
        返回 "转换后的数据"  

    def load_data(**kwargs):  
        ti = kwargs['ti']  
        transformed_data = ti.xcom_pull(task_ids='transform_task')  
        print(f"正在加载数据:{transformed_data}")  
        # 在这里添加数据加载逻辑  
        print("数据加载成功")  

    with DAG(  
        'complex_example_dag',  
        default_args=default_args,  
        description='一个包含多个任务的更复杂的DAG',  
        schedule_interval=timedelta(days=1),  
        start_date=datetime(2023, 1, 1),  
        catchup=False,  
    ) as dag:  

        start_task = BashOperator(  
            task_id='start',  
            bash_command='echo "启动DAG"',  
        )  

        extract_task = PythonOperator(  
            task_id='extract_task',  
            python_callable=extract_data,  
        )  

        transform_task = PythonOperator(  
            task_id='transform_task',  
            python_callable=transform_data,  
            provide_context=True,  
        )  

        load_task = PythonOperator(  
            task_id='load_task',  
            python_callable=load_data,  
            provide_context=True,  
        )  

        end_task = BashOperator(  
            task_id='end',  
            bash_command='echo "DAG运行完成"',  
        )  

        # 定义任务之间的依赖关系  
        start_task >> extract_task >> transform_task >> load_task >> end_task
步骤 4:搭建 FastAPI 应用程序

创建一个名为 fastapi 的目录并将你的 FastAPI 应用程序添加到该目录中。这里有一个 main.py 的示例:

    import aiohttp  
    import base64  
    from uuid import UUID  
    from fastapi import FastAPI, HTTPException  
    from fastapi.responses import JSONResponse  
    from pydantic import BaseModel, UUID4  

    app = FastAPI()  

    class DAGRunRequest(BaseModel):  
        dag_id: str  
        task_uuid: UUID  

    @app.get("/")  
    def read_root():  
        return {"message": "欢迎来到Airflow与FastAPI的集成示例"}  

    @app.get("/health", status_code=200)  
    async def health_check():  
        return JSONResponse(content={"status": "ok"})  

    @app.post("/trigger_dag/")  
    async def trigger_dag(request: DAGRunRequest):  
        url = f"http://airflow-webserver:8080/api/v1/dags/{request.dag_id}/dagRuns"  
        # dag_run_id 是任务的运行标识符
        data = {"dag_run_id": str(request.task_uuid)}  
        headers = {"Content-Type": "application/json", "Authorization": "Basic " + base64.b64encode(b"airflow:airflow").decode("utf-8")}  
        try:  
            async with aiohttp.ClientSession() as session:  
                async with session.post(url, headers=headers, json=data) as response:  
                    if response.status == 200:  
                        response_data = await response.json()  
                        print("成功:")  
                        print(response_data)  
                        return response_data  
                    else:  
                        error_text = await response.text()  
                        print(f"失败,状态码:{response.status},错误信息:{error_text}")  
                        raise HTTPException(status_code=response.status, detail=error_text)  
        except Exception as e:  
            # HTTPException 用于表示HTTP请求中的异常
            raise HTTPException(status_code=500, detail=f"服务器内部错误:发生了一个意外错误。{e}")

Trigger_dag 是我们 fastAPI 应用的核心。app.post("/trigger_dag/"):首先我们实例化该函数,并定义了 POST 端点 /trigger_dag/。客户端可以通过发送 POST 请求到该 URL 来触发 DAG 运行。

接下来,我们将通过构建URL来构造Airflow DAG的触发器。

  • url: 构建Airflow DAG触发的API端点URL。
  • headers: 设置所需的HTTP头,包括Content-TypeAuthorization。这里使用了基本认证,并通过Base64编码进行加密。
  • data: 准备POST请求的JSON负载,其中包括从task_uuid派生的dag_run_id
  1. 发起一个异步请求
  • aiohttp.ClientSession():这创建了一个异步的HTTP会话,允许非阻塞请求。
  • session.post(url, headers=headers, json=data):发出一个POST请求到Airflow的API,带有指定的headers和json格式的数据。
  1. 处理响应:
  • 成功:如果响应状态是200(OK),则解析并返回响应。
  • 失败:如果响应状态不是200,则记录错误消息,然后根据错误详情引发HTTP异常。
  1. 异常处理
  • 捕获请求时出现的任何意外错误,并抛出一个带有描述性消息的500错误。
步骤 5:构建并运行你的项目

运行下面的命令来构建和启动您的Docker容器实例:

运行以下命令来构建和启动服务:

docker-compose up --build

该命令会重新构建所有服务并启动它们。

此命令会配置您的PostgreSQL数据库、Web服务器(即Airflow的web服务器)、调度器和FastAPI应用程序。

将FastAPI与Airflow集成在一起

在环境设置好之后,我们就可以将FastAPI与Airflow集成在一起,通过编程方式触发DAG运行。

使用FastAPI触发DAG们 (有向无环图们)

我们的FastAPI应用包含一个触发DAG的端点。例如,你可以使用curl、Postman或任何HTTP客户端向该端点发送一个POST请求。

这里有个使用curl的例子:

运行以下命令来触发一个任务流程:
curl -X POST "http://localhost:8000/触发dag/" -H "Content-Type: application/json" -d '{"dag_id": "example_dag"}' # 这里设置dag_id为"example_dag"
安排和监督

一旦DAG启动,你就可以通过Airflow的Web UI(在http://localhost:8080)来监控其进度。该UI提供了一个全面的DAG运行、任务状态、日志等等信息的概览。

结论部分

通过整合Apache Airflow、FastAPI和Docker,我们创建了一个强大、灵活且可扩展的任务调度框架。这种设置允许通过程序触发工作流,无缝部署容器化的应用,高效监控和管理任务。

这种组合非常适合处理复杂的数据工作流和ETL过程,以及任何需要强大任务调度和编排的场景。通过Docker的容器化技术、FastAPI的高性能和Airflow的强大编排能力,您将能够应对现代数据处理的各种需求。

您可以随意扩展此设置,添加诸如认证、详细日志记录和更复杂的工作流等功能,以满足您的特定需求。

代码库已上传至 GitHub [https://github.com/n-sakib/airflow-fastapi-docker]。

安排愉快!

點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消