Лекция 7: Оркестрация с Apache Airflow
Зачем нужна оркестрация?
Заголовок раздела «Зачем нужна оркестрация?»Представьте: у вас есть несколько скриптов, которые нужно запускать в определённом порядке и по расписанию.
Проблемы без оркестратора
Заголовок раздела «Проблемы без оркестратора»- Запуск скриптов вручную
- Отслеживание зависимостей
- Обработка ошибок
- Повторное выполнение при сбоях
- Мониторинг состояния
Решение: Airflow
Заголовок раздела «Решение: Airflow»Airflow — платформа для оркестрации рабочих процессов (пайплайнов).
Основные понятия
Заголовок раздела «Основные понятия»DAG (Directed Acyclic Graph)
Заголовок раздела «DAG (Directed Acyclic Graph)»DAG — ориентированный ациклический граф. Это описание вашего пайплайна.
scrape_habr ↓ scrape_dzen ↓ analyze_sentimentTask (Задача)
Заголовок раздела «Task (Задача)»Task — отдельное действие в DAG.
Operator
Заголовок раздела «Operator»Operator — шаблон задачи. Airflow имеет встроенные операторы:
PythonOperator— выполнить Python функциюBashOperator— выполнить bash командуPostgresOperator— выполнить SQL- и многие другие
Структура DAG
Заголовок раздела «Структура DAG»from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetime
def scrape_habr(): """Скрейпинг Хабра.""" print("Скрейпинг Хабра...")
def scrape_dzen(): """Скрейпинг Dzen.""" print("Скрейпинг Dzen...")
def analyze(): """Анализ тональности.""" print("Анализ тональности...")
# Создание DAGwith DAG( 'news_pipeline', start_date=datetime(2024, 1, 1), schedule_interval='@daily', # Ежедневно catchup=False) as dag:
# Задачи task1 = PythonOperator( task_id='scrape_habr', python_callable=scrape_habr )
task2 = PythonOperator( task_id='scrape_dzen', python_callable=scrape_dzen )
task3 = PythonOperator( task_id='analyze', python_callable=analyze )
# Зависимости [task1, task2] >> task3Расписание
Заголовок раздела «Расписание»# Cron выраженияschedule_interval='0 0 * * *' # Ежедневно в полночьschedule_interval='0 */6 * * *' # Каждые 6 часовschedule_interval='@daily' # Ежедневно (псевдоним)schedule_interval='@hourly' # Ежечасноschedule_interval=None # Ручной запускВеб-интерфейс Airflow
Заголовок раздела «Веб-интерфейс Airflow»Airflow имеет веб-интерфейс для:
- Просмотра DAG
- Запуска задач
- Мониторинга логов
- Просмотра графов зависимостей
Установка через Docker
Заголовок раздела «Установка через Docker»version: '3'services: postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db:/var/lib/postgresql/data
webserver: image: apache/airflow:2.8.0 depends_on: - postgres environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow volumes: - ./dags:/opt/airflow/dags ports: - "8080:8080" command: webserver
volumes: postgres-db:Мониторинг и логи
Заголовок раздела «Мониторинг и логи»- Каждый запуск задачи создаётся DAG run
- Можно смотреть логи каждой задачи
- При ошибках можно перезапускать отдельные задачи