Практическое занятие 10: Docker и Airflow
Цель занятия
Заголовок раздела «Цель занятия»Изучить основы Docker, создать контейнеризированное окружение и настроить автоматизацию с помощью Apache Airflow.
Что такое Docker?
Заголовок раздела «Что такое Docker?»Docker — это платформа для контейнеризации приложений. Контейнеры позволяют упаковывать приложение со всеми зависимостями в изолированную среду.
Основные понятия
Заголовок раздела «Основные понятия»| Понятие | Описание |
|---|---|
| Image (Образ) | Шаблон для создания контейнера |
| Container (Контейнер) | Запущенный экземпляр образа |
| Dockerfile | Инструкция для создания образа |
| docker-compose.yml | Файл для управления несколькими контейнерами |
Установка Docker
Заголовок раздела «Установка Docker»- Скачайте Docker Desktop с docker.com
- Запустите установщик и следуйте инструкциям
- После установки проверьте работу:
docker --versiondocker-compose --versionОсновные команды Docker
Заголовок раздела «Основные команды Docker»Управление образами
Заголовок раздела «Управление образами»# Поиск образаdocker search python
# Скачивание образаdocker pull python:3.10
# Список загруженных образовdocker images
# Удаление образаdocker rmi python:3.10Управление контейнерами
Заголовок раздела «Управление контейнерами»# Запуск контейнераdocker run python:3.10 python --version
# Запуск в интерактивном режимеdocker run -it python:3.10 bash
# Запуск с пробросом портаdocker run -p 8080:8080 nginx
# Список запущенных контейнеровdocker ps
# Список всех контейнеров (включая остановленные)docker ps -a
# Остановка контейнераdocker stop <container_id>
# Удаление контейнераdocker rm <container_id>
# Просмотр логовdocker logs <container_id>Создание Dockerfile
Заголовок раздела «Создание Dockerfile»Простой Dockerfile для Python-скрипта
Заголовок раздела «Простой Dockerfile для Python-скрипта»Создайте файл Dockerfile:
# Базовый образFROM python:3.10-slim
# Рабочая директорияWORKDIR /app
# Копирование файла зависимостейCOPY requirements.txt .
# Установка зависимостейRUN pip install --no-cache-dir -r requirements.txt
# Копирование файлов проектаCOPY . .
# Команда при запускеCMD ["python", "main.py"]requirements.txt
Заголовок раздела «requirements.txt»requestsbeautifulsoup4transformerstorchСборка образа
Заголовок раздела «Сборка образа»# Сборка образаdocker build -t my-scraper .
# Запуск контейнераdocker run my-scraperDocker Compose
Заголовок раздела «Docker Compose»Docker Compose позволяет управлять несколькими контейнерами одновременно.
docker-compose.yml
Заголовок раздела «docker-compose.yml»version: '3.8'
services: # Сервис скрапера scraper: build: . container_name: web-scraper volumes: - ./data:/app/data # Проброс директории с данными environment: - ENVIRONMENT=production
# Сервис базы данных database: image: postgres:13 container_name: postgres-db environment: POSTGRES_DB: scraper_db POSTGRES_USER: scraper_user POSTGRES_PASSWORD: scraper_pass ports: - "5432:5432" volumes: - postgres_data:/var/lib/postgresql/data
volumes: postgres_data:Запуск docker-compose
Заголовок раздела «Запуск docker-compose»# Запуск всех сервисовdocker-compose up -d
# Остановка всех сервисовdocker-compose down
# Просмотр логовdocker-compose logs -f
# Пересборка контейнеровdocker-compose up --buildApache Airflow
Заголовок раздела «Apache Airflow»Apache Airflow — это платформа для программирования, планирования и мониторинга рабочих процессов (workflows).
Установка Airflow через Docker
Заголовок раздела «Установка Airflow через Docker»# Скачивание docker-compose файлаcurl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.0/docker-compose.yaml'
# Настройка переменных окруженияmkdir -p ./dags ./logs ./plugins ./config
# Запуск Airflowdocker-compose up -dСтруктура проекта Airflow
Заголовок раздела «Структура проекта Airflow»airflow-project/├── dags/ # Папка с DAG файлами├── logs/ # Логи выполнения├── plugins/ # Плагины└── docker-compose.yml # Конфигурация DockerСоздание DAG в Airflow
Заголовок раздела «Создание DAG в Airflow»DAG (Directed Acyclic Graph) — это направленный ациклический граф, описывающий рабочий процесс.
Простой DAG
Заголовок раздела «Простой DAG»Создайте файл dags/scraper_dag.py:
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetime, timedeltaimport requestsimport json
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5),}
def scrape_website(**context): """Функция скрапинга веб-сайта""" url = "https://example.com" response = requests.get(url)
data = { "url": url, "status": response.status_code, "timestamp": str(datetime.now()) }
# Сохранение в файл with open("/app/data/scraped_data.json", "w") as f: json.dump(data, f, indent=4)
print("Скрапинг завершён")
def process_data(**context): """Функция обработки данных""" print("Обработка данных...")
# Чтение данных with open("/app/data/scraped_data.json", "r") as f: data = json.load(f)
print(f"Обработано: {data}")
def save_to_database(**context): """Функция сохранения в базу данных""" print("Сохранение в базу данных...") # Здесь будет код для сохранения в БД
# Создание DAGdag = DAG( 'web_scraper_dag', default_args=default_args, description='DAG для веб-скрапинга', schedule_interval='@daily', # Ежедневное выполнение catchup=False,)
# Создание задачscrape_task = PythonOperator( task_id='scrape_website', python_callable=scrape_website, dag=dag,)
process_task = PythonOperator( task_id='process_data', python_callable=process_data, dag=dag,)
save_task = PythonOperator( task_id='save_to_database', python_callable=save_to_database, dag=dag,)
# Определение порядка выполнения задачscrape_task >> process_task >> save_taskDAG с параметрами
Заголовок раздела «DAG с параметрами»from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetime, timedelta
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'retries': 1,}
def scrape_habr(**context): """Скрапинг Habr""" print("Скрапинг Habr...")
def scrape_dzen(**context): """Скрапинг Dzen""" print("Скрапинг Dzen...")
def analyze_sentiment(**context): """Анализ тональности""" print("Анализ тональности...")
def save_results(**context): """Сохранение результатов""" print("Сохранение результатов...")
dag = DAG( 'multi_source_scraper', default_args=default_args, description='Скрапинг из нескольких источников', schedule_interval='0 9 * * *', # Ежедневно в 9:00 catchup=False,)
# Параллельное выполнение скраперовhabr_task = PythonOperator( task_id='scrape_habr', python_callable=scrape_habr, dag=dag,)
dzen_task = PythonOperator( task_id='scrape_dzen', python_callable=scrape_dzen, dag=dag,)
# Последовательная обработкаanalyze_task = PythonOperator( task_id='analyze_sentiment', python_callable=analyze_sentiment, dag=dag,)
save_task = PythonOperator( task_id='save_results', python_callable=save_results, dag=dag,)
# Определение зависимостей[habr_task, dzen_task] >> analyze_task >> save_taskПолный пример проекта
Заголовок раздела «Полный пример проекта»Структура проекта
Заголовок раздела «Структура проекта»scraper-project/├── dags/│ └── scraper_dag.py├── scrapers/│ ├── __init__.py│ ├── base_scraper.py│ ├── habr_scraper.py│ └── dzen_scraper.py├── data/├── requirements.txt└── docker-compose.ymldocker-compose.yml для проекта
Заголовок раздела «docker-compose.yml для проекта»version: '3.8'
services: # Airflow airflow: image: apache/airflow:2.7.0-python3.10 container_name: airflow depends_on: - postgres - redis environment: AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0 AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres:5432/airflow AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags AIRFLOW__CORE__LOAD_EXAMPLES: "false" volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./scrapers:/opt/airflow/scrapers - ./data:/opt/airflow/data ports: - "8080:8080" command: > bash -c " airflow db init && airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com && airflow scheduler & airflow webserver "
# PostgreSQL postgres: image: postgres:13 container_name: postgres environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres_data:/var/lib/postgresql/data
# Redis redis: image: redis:7 container_name: redis
volumes: postgres_data:requirements.txt
Заголовок раздела «requirements.txt»requestsbeautifulsoup4transformerstorchpsycopg2-binaryredisУправление Airflow
Заголовок раздела «Управление Airflow»Web-интерфейс
Заголовок раздела «Web-интерфейс»После запуска Airflow откройте в браузере:
- URL:
http://localhost:8080 - Логин:
admin - Пароль:
admin
Основные команды
Заголовок раздела «Основные команды»# Запуск DAGairflow dags trigger web_scraper_dag
# Остановка DAGairflow dags pause web_scraper_dag
# Запуск DAGairflow dags unpause web_scraper_dag
# Просмотр списка DAGairflow dags list
# Просмотр задач в DAGairflow tasks list web_scraper_dag
# Ручной запуск задачиairflow tasks test web_scraper_dag scrape_website 2024-01-01Задания для самостоятельной работы
Заголовок раздела «Задания для самостоятельной работы»-
Создайте Dockerfile для вашего скрапера и соберите контейнер.
-
Настройте docker-compose.yml для запуска Airflow с PostgreSQL и Redis.
-
Создайте DAG для ежедневного скрапинга Habr и Dzen с последующим анализом тональности.
-
Реализуйте отправку уведомлений по email при успешном/неудачном выполнении DAG.
-
Добавьте мониторинг и логирование для всех задач в DAG.
Пример решения задания 3
Заголовок раздела «Пример решения задания 3»from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.slack.operators.slack import SlackAPIPostOperatorfrom datetime import datetime, timedeltaimport syssys.path.append('/opt/airflow')
from scrapers.habr_scraper import HabrScraperfrom scrapers.dzen_scraper import DzenScraper
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5),}
def scrape_habr_task(): """Задача скрапинга Habr""" scraper = HabrScraper() urls = [f"https://habr.com/ru/hub/python/page{i}/" for i in range(1, 3)] data = scraper.run(urls, "/opt/airflow/data/habr.json") print(f"Собрано {len(data)} статей с Habr")
def scrape_dzen_task(): """Задача скрапинга Dzen""" scraper = DzenScraper() urls = [f"https://dzen.ru/news/python/page{i}/" for i in range(1, 3)] data = scraper.run(urls, "/opt/airflow/data/dzen.json") print(f"Собрано {len(data)} статей с Dzen")
def analyze_task(): """Задача анализа тональности""" print("Анализ тональности...")
dag = DAG( 'daily_scraper', default_args=default_args, description='Ежедневный скрапинг и анализ', schedule_interval='0 9 * * *', catchup=False,)
habr_task = PythonOperator( task_id='scrape_habr', python_callable=scrape_habr_task, dag=dag,)
dzen_task = PythonOperator( task_id='scrape_dzen', python_callable=scrape_dzen_task, dag=dag,)
analyze_task = PythonOperator( task_id='analyze', python_callable=analyze_task, dag=dag,)
[habr_task, dzen_task] >> analyze_taskПолезные ресурсы
Заголовок раздела «Полезные ресурсы»Завершение курса
Заголовок раздела «Завершение курса»Поздравляем! Вы завершили все практические занятия курса. Теперь вы умеете:
- Программировать на Python
- Работать с Git
- Создавать веб-скраперы
- Работать с базами данных
- Использовать библиотеку transformers
- Контейнеризировать приложения с Docker
- Автоматизировать процессы с Airflow
Удачи в дальнейшем изучении Data Engineering и Data Science!