Перейти к содержимому

Практическое занятие 10: Docker и Airflow

Изучить основы Docker, создать контейнеризированное окружение и настроить автоматизацию с помощью Apache Airflow.

Docker — это платформа для контейнеризации приложений. Контейнеры позволяют упаковывать приложение со всеми зависимостями в изолированную среду.

ПонятиеОписание
Image (Образ)Шаблон для создания контейнера
Container (Контейнер)Запущенный экземпляр образа
DockerfileИнструкция для создания образа
docker-compose.ymlФайл для управления несколькими контейнерами
  1. Скачайте Docker Desktop с docker.com
  2. Запустите установщик и следуйте инструкциям
  3. После установки проверьте работу:
Окно терминала
docker --version
docker-compose --version
Окно терминала
# Поиск образа
docker search python
# Скачивание образа
docker pull python:3.10
# Список загруженных образов
docker images
# Удаление образа
docker rmi python:3.10
Окно терминала
# Запуск контейнера
docker run python:3.10 python --version
# Запуск в интерактивном режиме
docker run -it python:3.10 bash
# Запуск с пробросом порта
docker run -p 8080:8080 nginx
# Список запущенных контейнеров
docker ps
# Список всех контейнеров (включая остановленные)
docker ps -a
# Остановка контейнера
docker stop <container_id>
# Удаление контейнера
docker rm <container_id>
# Просмотр логов
docker logs <container_id>

Создайте файл Dockerfile:

# Базовый образ
FROM python:3.10-slim
# Рабочая директория
WORKDIR /app
# Копирование файла зависимостей
COPY requirements.txt .
# Установка зависимостей
RUN pip install --no-cache-dir -r requirements.txt
# Копирование файлов проекта
COPY . .
# Команда при запуске
CMD ["python", "main.py"]
requests
beautifulsoup4
transformers
torch
Окно терминала
# Сборка образа
docker build -t my-scraper .
# Запуск контейнера
docker run my-scraper

Docker Compose позволяет управлять несколькими контейнерами одновременно.

version: '3.8'
services:
# Сервис скрапера
scraper:
build: .
container_name: web-scraper
volumes:
- ./data:/app/data # Проброс директории с данными
environment:
- ENVIRONMENT=production
# Сервис базы данных
database:
image: postgres:13
container_name: postgres-db
environment:
POSTGRES_DB: scraper_db
POSTGRES_USER: scraper_user
POSTGRES_PASSWORD: scraper_pass
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
Окно терминала
# Запуск всех сервисов
docker-compose up -d
# Остановка всех сервисов
docker-compose down
# Просмотр логов
docker-compose logs -f
# Пересборка контейнеров
docker-compose up --build

Apache Airflow — это платформа для программирования, планирования и мониторинга рабочих процессов (workflows).

Окно терминала
# Скачивание docker-compose файла
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.0/docker-compose.yaml'
# Настройка переменных окружения
mkdir -p ./dags ./logs ./plugins ./config
# Запуск Airflow
docker-compose up -d
airflow-project/
├── dags/ # Папка с DAG файлами
├── logs/ # Логи выполнения
├── plugins/ # Плагины
└── docker-compose.yml # Конфигурация Docker

DAG (Directed Acyclic Graph) — это направленный ациклический граф, описывающий рабочий процесс.

Создайте файл dags/scraper_dag.py:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import json
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def scrape_website(**context):
"""Функция скрапинга веб-сайта"""
url = "https://example.com"
response = requests.get(url)
data = {
"url": url,
"status": response.status_code,
"timestamp": str(datetime.now())
}
# Сохранение в файл
with open("/app/data/scraped_data.json", "w") as f:
json.dump(data, f, indent=4)
print("Скрапинг завершён")
def process_data(**context):
"""Функция обработки данных"""
print("Обработка данных...")
# Чтение данных
with open("/app/data/scraped_data.json", "r") as f:
data = json.load(f)
print(f"Обработано: {data}")
def save_to_database(**context):
"""Функция сохранения в базу данных"""
print("Сохранение в базу данных...")
# Здесь будет код для сохранения в БД
# Создание DAG
dag = DAG(
'web_scraper_dag',
default_args=default_args,
description='DAG для веб-скрапинга',
schedule_interval='@daily', # Ежедневное выполнение
catchup=False,
)
# Создание задач
scrape_task = PythonOperator(
task_id='scrape_website',
python_callable=scrape_website,
dag=dag,
)
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=dag,
)
save_task = PythonOperator(
task_id='save_to_database',
python_callable=save_to_database,
dag=dag,
)
# Определение порядка выполнения задач
scrape_task >> process_task >> save_task
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
}
def scrape_habr(**context):
"""Скрапинг Habr"""
print("Скрапинг Habr...")
def scrape_dzen(**context):
"""Скрапинг Dzen"""
print("Скрапинг Dzen...")
def analyze_sentiment(**context):
"""Анализ тональности"""
print("Анализ тональности...")
def save_results(**context):
"""Сохранение результатов"""
print("Сохранение результатов...")
dag = DAG(
'multi_source_scraper',
default_args=default_args,
description='Скрапинг из нескольких источников',
schedule_interval='0 9 * * *', # Ежедневно в 9:00
catchup=False,
)
# Параллельное выполнение скраперов
habr_task = PythonOperator(
task_id='scrape_habr',
python_callable=scrape_habr,
dag=dag,
)
dzen_task = PythonOperator(
task_id='scrape_dzen',
python_callable=scrape_dzen,
dag=dag,
)
# Последовательная обработка
analyze_task = PythonOperator(
task_id='analyze_sentiment',
python_callable=analyze_sentiment,
dag=dag,
)
save_task = PythonOperator(
task_id='save_results',
python_callable=save_results,
dag=dag,
)
# Определение зависимостей
[habr_task, dzen_task] >> analyze_task >> save_task
scraper-project/
├── dags/
│ └── scraper_dag.py
├── scrapers/
│ ├── __init__.py
│ ├── base_scraper.py
│ ├── habr_scraper.py
│ └── dzen_scraper.py
├── data/
├── requirements.txt
└── docker-compose.yml
version: '3.8'
services:
# Airflow
airflow:
image: apache/airflow:2.7.0-python3.10
container_name: airflow
depends_on:
- postgres
- redis
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres:5432/airflow
AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./scrapers:/opt/airflow/scrapers
- ./data:/opt/airflow/data
ports:
- "8080:8080"
command: >
bash -c "
airflow db init &&
airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com &&
airflow scheduler & airflow webserver
"
# PostgreSQL
postgres:
image: postgres:13
container_name: postgres
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres_data:/var/lib/postgresql/data
# Redis
redis:
image: redis:7
container_name: redis
volumes:
postgres_data:
requests
beautifulsoup4
transformers
torch
psycopg2-binary
redis

После запуска Airflow откройте в браузере:

  • URL: http://localhost:8080
  • Логин: admin
  • Пароль: admin
Окно терминала
# Запуск DAG
airflow dags trigger web_scraper_dag
# Остановка DAG
airflow dags pause web_scraper_dag
# Запуск DAG
airflow dags unpause web_scraper_dag
# Просмотр списка DAG
airflow dags list
# Просмотр задач в DAG
airflow tasks list web_scraper_dag
# Ручной запуск задачи
airflow tasks test web_scraper_dag scrape_website 2024-01-01
  1. Создайте Dockerfile для вашего скрапера и соберите контейнер.

  2. Настройте docker-compose.yml для запуска Airflow с PostgreSQL и Redis.

  3. Создайте DAG для ежедневного скрапинга Habr и Dzen с последующим анализом тональности.

  4. Реализуйте отправку уведомлений по email при успешном/неудачном выполнении DAG.

  5. Добавьте мониторинг и логирование для всех задач в DAG.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from datetime import datetime, timedelta
import sys
sys.path.append('/opt/airflow')
from scrapers.habr_scraper import HabrScraper
from scrapers.dzen_scraper import DzenScraper
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def scrape_habr_task():
"""Задача скрапинга Habr"""
scraper = HabrScraper()
urls = [f"https://habr.com/ru/hub/python/page{i}/" for i in range(1, 3)]
data = scraper.run(urls, "/opt/airflow/data/habr.json")
print(f"Собрано {len(data)} статей с Habr")
def scrape_dzen_task():
"""Задача скрапинга Dzen"""
scraper = DzenScraper()
urls = [f"https://dzen.ru/news/python/page{i}/" for i in range(1, 3)]
data = scraper.run(urls, "/opt/airflow/data/dzen.json")
print(f"Собрано {len(data)} статей с Dzen")
def analyze_task():
"""Задача анализа тональности"""
print("Анализ тональности...")
dag = DAG(
'daily_scraper',
default_args=default_args,
description='Ежедневный скрапинг и анализ',
schedule_interval='0 9 * * *',
catchup=False,
)
habr_task = PythonOperator(
task_id='scrape_habr',
python_callable=scrape_habr_task,
dag=dag,
)
dzen_task = PythonOperator(
task_id='scrape_dzen',
python_callable=scrape_dzen_task,
dag=dag,
)
analyze_task = PythonOperator(
task_id='analyze',
python_callable=analyze_task,
dag=dag,
)
[habr_task, dzen_task] >> analyze_task

Поздравляем! Вы завершили все практические занятия курса. Теперь вы умеете:

  • Программировать на Python
  • Работать с Git
  • Создавать веб-скраперы
  • Работать с базами данных
  • Использовать библиотеку transformers
  • Контейнеризировать приложения с Docker
  • Автоматизировать процессы с Airflow

Удачи в дальнейшем изучении Data Engineering и Data Science!