主機環境
- 兩台主機環境一致
- 操作系统: Ubuntu 16.04.7 LTS
- 内核版本: Linux 4.15.0-142-generic
- docker-compose版本: v1.29.1
叢集環境
管理對應目錄
配置對應目錄,當啟用多個service可便於管理
- MySQL配置文件: 放在/data/mysql
- airflow數據目錄: 放在/data/airflow
於serviceA、serviceB皆設定對應container目錄,並設定權限
1 | $ mkdir ./data/airflow/dags ./logs/airflow ./data/airflow/plugins |
serviceA(docker-compose.yml)
參照檔案建置master,修改yml: airflow單機版建構
github參考: https://github.com/Dawn0472/docker-airflow/tree/main/分散式/serverA
MySQL
- 修改MySQL配置volumes對應路徑
x-airflow-common 環境
- 新增extra_hosts,設定hostname對應ip位置
- 修改對應volumes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:worker@mysql/airflow # 變更為mysql連線方式
AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:worker@mysql/airflow # 變更為mysql連線方式
AIRFLOW__CELERY__BROKER_URL: amqp://worker:worker@rabbitmq:5672// # 變更為rabbitmq連線方式
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
AIRFLOW__CORE__PARALLELISM: 64
AIRFLOW__CORE__DAG_CONCURRENCY: 32
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
extra_hosts:
- "host-01:192.168.x.xx" # worker hostname : ip
- "host-02:192.168.x.xx"
volumes: # 修改對應目錄
- ./data/airflow/dags:/opt/airflow/dags
- ./logs/airflow:/opt/airflow/logs
- ./data/airflow/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
rabbitmq: # 設置rabbitmq service名稱
condition: service_healthy
mysql: # 設置mysql service名稱
condition: service_healthy
airflow-init
- 刪除自建目錄command
1
2mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
serviceB(docker-compose_worker.yml)
參照serviceA docker-compose.yml於services中刪除rabbitmq、mysql
需建置webserver,因webserver啟動時會啟動子服務log service,建置webserver才能於網頁中看到worker的log
github參考: https://github.com/Dawn0472/docker-airflow/tree/main/分散式/serverB
worker
- 刪除depends_on內容
- 啟用log server port
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22airflow-worker:
<<: *airflow-common
hostname: host-02 # 設定host_name
command: celery worker
ports:
- "8793:8793" # 啟用log server port
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
airflow-init:
condition: service_completed_successfully
x-airflow-common 環境
- 修改連接service
- 刪除depends_on內容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:worker@192.168.x.xx:3305/airflow # 連接serverA mysql
AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:worker@192.168.x.xx:3305/airflow # 連接serverA mysql
AIRFLOW__CELERY__BROKER_URL: amqp://worker:worker@192.168.x.xx:5672// # 連接serverA rabbitmq
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
AIRFLOW__CORE__PARALLELISM: 64
AIRFLOW__CORE__DAG_CONCURRENCY: 32
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
extra_hosts:
- "host-01:192.168.x.xx" # worker hostname : ip
- "host-02:192.168.x.xx"
volumes: # 修改對應目錄
- ./data/airflow/dags:/opt/airflow/dags
- ./logs/airflow:/opt/airflow/logs
- ./data/airflow/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
啟動docker-compose
1
2$ docker-compose up airflow-init # 初始化服務,確保container連結正常
$ docker-compose up -d # 於背景執行中創建airflow container
airflow啟動畫面
flower啟動畫面
RabbitMQ啟動畫面
數據同步
讓server間數據同步,當scheduler進行排程調度時才不會發生找不到文件而導致無法運行排程的錯誤發生
數據同步有兩種方式
- 只更新一台server,讓所有server數據同步
- 只更新一台server,讓其他server對應更新數據的server位置
本次利用lsyncd實現多台server同步部屬,免密碼ssh登入
serverA
-
1
$ sudo dpkg -i lsyncd_2.2.3-1_amd64.deb
檢查lsyncd版本
- 啟動lsyncd
1
2$ systemctl start lsyncd
$ systemctl status lsyncd
建置lsyncd設定檔,path=/etc/lsyncd/lsyncd.conf.lua
1
$ sudo vim /etc/lsyncd/lsyncd.conf.lua # 設定連接內容
配置節點之間通過公鑰連接
airflow-sync:私鑰
airflow-sync.pub:公鑰
1
$ ssh-keygen -t rsa -C "airflow-sync" -b 4096 #生成名稱為airflow-sync的密鑰
將pub 公鑰內容複製到遠端serviceB上,遠端serviceB使用者目錄中.ssh/authorized_keys中會有對應公鑰內容
1
$ ssh-copy-id -i ~/.ssh/airflow-sync.pub user@192.168.x.xx
-
放置測試文件於serviceA
路徑:/data/airflow/dags/sample-dag.py
列出hostname,以便測試是否task在不同機器上運行
dags 引用自How to Scale-out Apache Airflow 2.0 with Redis and Celery
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'Dawn',
'retries': 2,
'retry_delay': timedelta(minutes=1)
}
with DAG('dist_example',
start_date=datetime(2022, 5, 13, 16, 34),
schedule_interval="*/10 * * * *",
) as dag:
create_command = 'echo $(hostname)'
t1 = BashOperator(
task_id='task_for_q1',
bash_command=create_command,
dag=dag
)
t2 = BashOperator(
task_id= 'task_for_q2',
bash_command=create_command,
dag=dag
)
t1 >> t2於airflow運行
- host-01
- host-02
- host-01
評論