Перейти к содержимому

Лекция 7: Оркестрация с Apache Airflow

Представьте: у вас есть несколько скриптов, которые нужно запускать в определённом порядке и по расписанию.

  • Запуск скриптов вручную
  • Отслеживание зависимостей
  • Обработка ошибок
  • Повторное выполнение при сбоях
  • Мониторинг состояния

Airflow — платформа для оркестрации рабочих процессов (пайплайнов).

DAG — ориентированный ациклический граф. Это описание вашего пайплайна.

scrape_habr
scrape_dzen
analyze_sentiment

Task — отдельное действие в DAG.

Operator — шаблон задачи. Airflow имеет встроенные операторы:

  • PythonOperator — выполнить Python функцию
  • BashOperator — выполнить bash команду
  • PostgresOperator — выполнить SQL
  • и многие другие
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def scrape_habr():
"""Скрейпинг Хабра."""
print("Скрейпинг Хабра...")
def scrape_dzen():
"""Скрейпинг Dzen."""
print("Скрейпинг Dzen...")
def analyze():
"""Анализ тональности."""
print("Анализ тональности...")
# Создание DAG
with DAG(
'news_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily', # Ежедневно
catchup=False
) as dag:
# Задачи
task1 = PythonOperator(
task_id='scrape_habr',
python_callable=scrape_habr
)
task2 = PythonOperator(
task_id='scrape_dzen',
python_callable=scrape_dzen
)
task3 = PythonOperator(
task_id='analyze',
python_callable=analyze
)
# Зависимости
[task1, task2] >> task3
# Cron выражения
schedule_interval='0 0 * * *' # Ежедневно в полночь
schedule_interval='0 */6 * * *' # Каждые 6 часов
schedule_interval='@daily' # Ежедневно (псевдоним)
schedule_interval='@hourly' # Ежечасно
schedule_interval=None # Ручной запуск

Airflow имеет веб-интерфейс для:

  • Просмотра DAG
  • Запуска задач
  • Мониторинга логов
  • Просмотра графов зависимостей
docker-compose.yml
version: '3'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db:/var/lib/postgresql/data
webserver:
image: apache/airflow:2.8.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
ports:
- "8080:8080"
command: webserver
volumes:
postgres-db:
  • Каждый запуск задачи создаётся DAG run
  • Можно смотреть логи каждой задачи
  • При ошибках можно перезапускать отдельные задачи