Airflow with docker compose (airflow+mysql+rabbitmq) 分散式環境

主機環境

  • 兩台主機環境一致
  • 操作系统: Ubuntu 16.04.7 LTS
  • 内核版本: Linux 4.15.0-142-generic
  • docker-compose版本: v1.29.1

叢集環境

管理對應目錄

配置對應目錄,當啟用多個service可便於管理

  • MySQL配置文件: 放在/data/mysql
  • airflow數據目錄: 放在/data/airflow

於serviceA、serviceB皆設定對應container目錄,並設定權限

1
2
$ mkdir ./data/airflow/dags ./logs/airflow ./data/airflow/plugins
$ echo -e "AIRFLOW_UID=$(id -u)" > .env

serviceA(docker-compose.yml)

  • 參照檔案建置master,修改yml: airflow單機版建構

  • github參考: https://github.com/Dawn0472/docker-airflow/tree/main/分散式/serverA

    MySQL

    • 修改MySQL配置volumes對應路徑

    x-airflow-common 環境

    • 新增extra_hosts,設定hostname對應ip位置
    • 修改對應volumes
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      x-airflow-common:
      &airflow-common
      # In order to add custom dependencies or upgrade provider packages you can use your extended image.
      # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
      # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
      image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
      # build: .
      environment:
      &airflow-common-env
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:worker@mysql/airflow # 變更為mysql連線方式
      AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:worker@mysql/airflow # 變更為mysql連線方式
      AIRFLOW__CELERY__BROKER_URL: amqp://worker:worker@rabbitmq:5672// # 變更為rabbitmq連線方式
      AIRFLOW__CORE__FERNET_KEY: ''
      AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
      AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
      AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
      _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
      AIRFLOW__CORE__PARALLELISM: 64
      AIRFLOW__CORE__DAG_CONCURRENCY: 32
      AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
      extra_hosts:
      - "host-01:192.168.x.xx" # worker hostname : ip
      - "host-02:192.168.x.xx"
      volumes: # 修改對應目錄
      - ./data/airflow/dags:/opt/airflow/dags
      - ./logs/airflow:/opt/airflow/logs
      - ./data/airflow/plugins:/opt/airflow/plugins
      user: "${AIRFLOW_UID:-50000}:0"
      depends_on:
      &airflow-common-depends-on
      rabbitmq: # 設置rabbitmq service名稱
      condition: service_healthy
      mysql: # 設置mysql service名稱
      condition: service_healthy

    airflow-init

    • 刪除自建目錄command
    1
    2
    mkdir -p /sources/logs /sources/dags /sources/plugins
    chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}

serviceB(docker-compose_worker.yml)

  • 參照serviceA docker-compose.yml於services中刪除rabbitmq、mysql

  • 需建置webserver,因webserver啟動時會啟動子服務log service,建置webserver才能於網頁中看到worker的log

  • github參考: https://github.com/Dawn0472/docker-airflow/tree/main/分散式/serverB

    worker

    • 刪除depends_on內容
    • 啟用log server port
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      airflow-worker: 
      <<: *airflow-common
      hostname: host-02 # 設定host_name
      command: celery worker
      ports:
      - "8793:8793" # 啟用log server port
      healthcheck:
      test:
      - "CMD-SHELL"
      - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
      environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
      restart: always
      depends_on:
      airflow-init:
      condition: service_completed_successfully

    x-airflow-common 環境

    • 修改連接service
    • 刪除depends_on內容
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      x-airflow-common:
      &airflow-common
      # In order to add custom dependencies or upgrade provider packages you can use your extended image.
      # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
      # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
      image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
      # build: .
      environment:
      &airflow-common-env
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:worker@192.168.x.xx:3305/airflow # 連接serverA mysql
      AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:worker@192.168.x.xx:3305/airflow # 連接serverA mysql
      AIRFLOW__CELERY__BROKER_URL: amqp://worker:worker@192.168.x.xx:5672// # 連接serverA rabbitmq
      AIRFLOW__CORE__FERNET_KEY: ''
      AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
      AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
      AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
      _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
      AIRFLOW__CORE__PARALLELISM: 64
      AIRFLOW__CORE__DAG_CONCURRENCY: 32
      AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
      extra_hosts:
      - "host-01:192.168.x.xx" # worker hostname : ip
      - "host-02:192.168.x.xx"
      volumes: # 修改對應目錄
      - ./data/airflow/dags:/opt/airflow/dags
      - ./logs/airflow:/opt/airflow/logs
      - ./data/airflow/plugins:/opt/airflow/plugins
      user: "${AIRFLOW_UID:-50000}:0"

    啟動docker-compose

    1
    2
    $ docker-compose up airflow-init # 初始化服務,確保container連結正常
    $ docker-compose up -d # 於背景執行中創建airflow container

airflow啟動畫面

flower啟動畫面

RabbitMQ啟動畫面

數據同步

  • 讓server間數據同步,當scheduler進行排程調度時才不會發生找不到文件而導致無法運行排程的錯誤發生

  • 數據同步有兩種方式

    • 只更新一台server,讓所有server數據同步
    • 只更新一台server,讓其他server對應更新數據的server位置
  • 本次利用lsyncd實現多台server同步部屬,免密碼ssh登入

    serverA

    • 啟動lsyncd
      1
      2
      $ systemctl start lsyncd
      $ systemctl status lsyncd
    • 建置lsyncd設定檔,path=/etc/lsyncd/lsyncd.conf.lua

      1
      $ sudo vim /etc/lsyncd/lsyncd.conf.lua # 設定連接內容

    • 配置節點之間通過公鑰連接

      • airflow-sync:私鑰

      • airflow-sync.pub:公鑰

        1
        $ ssh-keygen -t rsa -C "airflow-sync" -b 4096 #生成名稱為airflow-sync的密鑰 

      • 將pub 公鑰內容複製到遠端serviceB上,遠端serviceB使用者目錄中.ssh/authorized_keys中會有對應公鑰內容

        1
        $ ssh-copy-id -i ~/.ssh/airflow-sync.pub user@192.168.x.xx

放置測試文件於serviceA

  • 路徑:/data/airflow/dags/sample-dag.py

    • 列出hostname,以便測試是否task在不同機器上運行

    • dags 引用自How to Scale-out Apache Airflow 2.0 with Redis and Celery

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      from airflow import DAG
      from airflow.operators.bash import BashOperator
      from datetime import datetime, timedelta

      default_args = {
      'owner': 'Dawn',
      'retries': 2,
      'retry_delay': timedelta(minutes=1)
      }

      with DAG('dist_example',
      start_date=datetime(2022, 5, 13, 16, 34),
      schedule_interval="*/10 * * * *",
      ) as dag:

      create_command = 'echo $(hostname)'
      t1 = BashOperator(
      task_id='task_for_q1',
      bash_command=create_command,
      dag=dag
      )
      t2 = BashOperator(
      task_id= 'task_for_q2',
      bash_command=create_command,
      dag=dag
      )
      t1 >> t2

    • 於airflow運行

      • host-01
      • host-02

參考來源

  1. Airflow2.2.3 + Celery + MySQL 8构建一个健壮的分布式调度集群
  2. lsyncd 官網
【爬蟲基礎介紹】part1: 什麼是爬蟲? Airflow with docker compose (airflow+mysql+rabbitmq) 單機

評論

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×