добавляет dag

This commit is contained in:
2025-11-04 00:17:47 +03:00
parent 319a7baa8b
commit 745f1ef2de
17 changed files with 425 additions and 4 deletions

View File

5
.gitignore vendored
View File

@@ -3,9 +3,11 @@
.DS_Store .DS_Store
.AppleDouble .AppleDouble
.LSOverride .LSOverride
.env
# Icon must end with two \r # Icon must end with two \r
Icon Icon
# Thumbnails # Thumbnails
._* ._*
@@ -227,4 +229,3 @@ cython_debug/
# PyPI configuration file # PyPI configuration file
.pypirc .pypirc
.pypirc

24
Dockerfile Normal file
View File

@@ -0,0 +1,24 @@
# Используем официальный образ Apache Airflow
FROM apache/airflow:2.7.1-python3.9 as build
ENV USERNAME=airflow
# Устанавливаем переменные окружения для Airflow
ENV AIRFLOW_HOME=/opt/airflow
# Копируем файл зависимостей
COPY dags ${AIRFLOW_HOME}/dags/
WORKDIR ${AIRFLOW_HOME}
# Устанавливаем дополнительные Python-зависимости
RUN pip install --no-cache-dir -r dags/requirements.txt
COPY --chown=airflow:airflow scripts/init_airflow.sh /usr/local/bin/
RUN chmod +x /usr/local/bin/init_airflow.sh
# (опционально) можно указать пользователя airflow для безопасности
USER airflow
FROM build as final

View File

@@ -1,3 +1,2 @@
# airflow_neo4j # Пример использования airflow с подключением к БД neo4j
Airflow с подключением к БД Neo4j

0
__init__.py Normal file
View File

78
dags/dag_01.py Normal file
View File

@@ -0,0 +1,78 @@
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from task_01.composites import UserActionTransferTask
from task_01.target_db.utills import get_driver
# Аргументы DAG по умолчанию
default_args = {
'owner': 'data_engineer',
'depends_on_past': False,
'start_date': datetime(2025, 11, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(minutes=30),
}
# Определение DAG
with DAG(
'neo4j_health_check_dag',
default_args=default_args,
schedule_interval='0 9 * * *',
catchup=False,
tags=['neo4j', 'healthcheck', 'data_loading', 'vers.01'],
description=
'DAG для проверки здоровья Neo4j и загрузки тестовых данных',
) as dag:
conn_id = Variable.get("CONN_ID")
driver = get_driver(conn_id)
task = UserActionTransferTask(driver)
# Начальная задача
start_task = EmptyOperator(
task_id='start_task',
dag=dag,
)
# Задача проверки соединения с Neo4j
check_neo4j_connection_task = PythonOperator(
task_id='check_neo4j_connection',
python_callable=task.run.check_neo4j_connection,
dag=dag,
retries=3,
retry_delay=timedelta(minutes=1),
)
# Задача генерации тестовых данных
generate_sample_data_task = PythonOperator(
task_id='generate_sample_data',
python_callable=task.run.generate_sample_data,
dag=dag,
retries=3,
retry_delay=timedelta(minutes=1),
)
# Задача загрузки данных в Neo4j
load_data_to_neo4j_task = PythonOperator(
task_id='load_data_to_neo4j',
python_callable=task.run.load_data_to_neo4j,
dag=dag,
retries=3,
retry_delay=timedelta(minutes=1),
)
# Финальная задача
end_task = EmptyOperator(
task_id='end_task',
dag=dag,
)
# Определение зависимостей задач
(start_task >> check_neo4j_connection_task >> generate_sample_data_task >>
load_data_to_neo4j_task >> end_task)

4
dags/requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
apache-airflow~=2.7.1
apache-airflow-providers-neo4j
pandas
Flask-Session<0.6

0
dags/task_01/__init__.py Normal file
View File

View File

@@ -0,0 +1,22 @@
from task_01.target_db.repositories import TargetDBRepo
from task_01.tasks import UserActionTransfer
class Adapters:
def __init__(self, driver):
self.driver = driver
@property
def target_repo(self):
return TargetDBRepo(self.driver)
class UserActionTransferTask:
def __init__(self, driver):
self.adapters = Adapters(driver)
@property
def run(self):
return UserActionTransfer(target_db=self.adapters.target_repo, )

8
dags/task_01/dto.py Normal file
View File

@@ -0,0 +1,8 @@
from dataclasses import dataclass
@dataclass
class ConnectionInfo:
uri: str
username: str
password: str

View File

View File

@@ -0,0 +1,41 @@
import logging
from neo4j import Driver
from pandas import DataFrame
BATCH_SIZE = 1000
class TargetDBRepo:
def __init__(self, driver: Driver):
self.driver = driver
self.log = logging.getLogger(__name__)
def save_users(self, users: DataFrame) -> None:
query = """
UNWIND $rows AS row
CREATE (u:User {
user_id: row.user_id,
action: row.action,
timestamp: row.timestamp
})
"""
with self.driver.session() as session:
for i in range(0, len(users), BATCH_SIZE):
batch = users.iloc[i:i + BATCH_SIZE]
records = batch.to_dict(orient="records")
session.run(query, {"rows": records})
self.log.info("rows saved %s", i + BATCH_SIZE)
def get_number_of_users(self) -> int:
with self.driver.session() as session:
result = session.run(
"MATCH (u:User) RETURN count(u) as user_count")
return result.single()["user_count"]
def check_connection(self):
with self.driver.session() as session:
result = session.run('RETURN "Connection successful" AS message')
return result.single()["message"]

View File

@@ -0,0 +1,21 @@
from airflow.hooks.base import BaseHook
from neo4j import Driver, GraphDatabase
from task_01.dto import ConnectionInfo
def get_neo4j_connection(conn_id: str) -> ConnectionInfo:
conn = BaseHook.get_connection(conn_id)
uri = f'bolt://{conn.host}:{conn.port}'
return ConnectionInfo(
uri,
username=conn.login,
password=conn.password,
)
def get_driver(conn_id: str) -> Driver:
connection_info = get_neo4j_connection(conn_id)
return GraphDatabase.driver(
uri=connection_info.uri,
auth=(connection_info.username, connection_info.password),
)

84
dags/task_01/tasks.py Normal file
View File

@@ -0,0 +1,84 @@
import logging
import os
import tempfile
from datetime import datetime
from random import randint
import pandas as pd
from airflow import AirflowException
from airflow.models import Variable
from task_01.target_db.repositories import TargetDBRepo
log = logging.getLogger(__name__)
class UserActionTransfer:
def __init__(
self,
target_db: TargetDBRepo,
):
self.target_db = target_db
self.log = logging.getLogger(__name__)
def generate_sample_data(self):
"""Генерация тестовых данных и сохранение в CSV файл"""
actions = self.get_fake_user_action_data()
csv_file_path = self.get_csv_file_path()
actions.to_csv(csv_file_path, index=False)
# Сохраняем путь к файлу в переменную DAG
Variable.set("user_action_data_path", csv_file_path)
logging.info("Sample data generated and saved to: %s", csv_file_path)
logging.info(f"Data preview: %s", actions.head())
def load_data_to_neo4j(self) -> None:
csv_file_path = Variable.get("user_action_data_path")
if not csv_file_path or not os.path.exists(csv_file_path):
raise AirflowException("CSV file not found: %s", csv_file_path)
# Чтение CSV файла
user_actions = pd.read_csv(csv_file_path)
logging.info("Loaded CSV data with %s rows", len(user_actions))
self.target_db.save_users(user_actions)
total_rows = self.target_db.get_number_of_users()
logging.info("Total rows: %s", total_rows)
# Очистка временного файла
os.remove(csv_file_path)
logging.info("Temporary CSV file cleaned up")
def check_neo4j_connection(self):
"""Проверка соединения с БД"""
try:
result = self.target_db.check_connection()
log.info(f"Neo4j message: {result}")
log.info("Neo4j connection is healthy")
except Exception as e:
log.error(f"Neo4j connection failed: {e}")
raise
return result
@staticmethod
def get_csv_file_path() -> str:
temp_dir = tempfile.gettempdir()
return os.path.join(temp_dir, "user_action_data.csv")
@staticmethod
def get_fake_user_action_data() -> pd.DataFrame:
actions = ["login", "purchase", "view", "logout", "search"]
ids = list(range(1, 10001))
action = []
timestamp = []
for _ in ids:
action.append(actions[randint(0, len(actions) - 1)])
timestamp.append(datetime.now())
sample_data = {
"user_id": ids,
"action": action,
"timestamp": timestamp
}
return pd.DataFrame(sample_data)

65
docker-compose.yml Normal file
View File

@@ -0,0 +1,65 @@
version: '3.8'
services:
postgres:
image: postgres:14
env_file: .env
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
volumes:
- postgres_data:/var/lib/postgresql/data
restart: always
neo4j:
image: neo4j:5
environment:
- NEO4J_AUTH=${NEO4J_USER}/${NEO4J_PASSWORD}
ports:
- "7474:7474"
- "7687:7687"
volumes:
- neo4j_data:/data
restart: always
airflow-webserver:
build: .
depends_on:
- postgres
- neo4j
env_file: .env
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres/${POSTGRES_DB}
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
ports:
- "8080:8080"
command: bash -c "/usr/local/bin/init_airflow.sh"
restart: always
airflow-scheduler:
build: .
depends_on:
- postgres
- neo4j
- airflow-webserver
env_file: .env
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres/${POSTGRES_DB}
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
command: scheduler
restart: always
volumes:
postgres_data:
neo4j_data:

20
env.example Normal file
View File

@@ -0,0 +1,20 @@
# Airflow credentials
AIRFLOW_ADMIN_USER=admin
AIRFLOW_ADMIN_PASSWORD=admin
AIRFLOW_ADMIN_EMAIL=admin@example.com
AIRFLOW_ADMIN_FIRSTNAME=Admin
AIRFLOW_ADMIN_LASTNAME=User
# Database (Postgres)
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_DB=airflow
# Neo4j connection details
NEO4J_CONN_ID=
NEO4J_URI=
NEO4J_USER=
NEO4J_PASSWORD=
NEO4J_PORT=7687
AIRFLOW_HOME=/opt/airflow

54
scripts/init_airflow.sh Normal file
View File

@@ -0,0 +1,54 @@
#!/usr/bin/env bash
set -e
echo "Initializing Airflow..."
# Инициализация БД (только если первый запуск)
airflow db migrate
# Создание пользователя-админа
if ! airflow users list | grep -q "${AIRFLOW_ADMIN_EMAIL}"; then
echo "🧑‍💻 Creating Airflow admin user..."
airflow users create \
--username "${AIRFLOW_ADMIN_USER}" \
--firstname "${AIRFLOW_ADMIN_FIRSTNAME}" \
--lastname "${AIRFLOW_ADMIN_LASTNAME}" \
--role Admin \
--email "${AIRFLOW_ADMIN_EMAIL}" \
--password "${AIRFLOW_ADMIN_PASSWORD}"
else
echo "✅ Admin user already exists."
fi
# Создание подключения к Neo4j
if ! airflow connections list | grep -q "${NEO4J_CONN_ID}"; then
echo "🔌 Creating Neo4j connection..."
airflow connections add "${NEO4J_CONN_ID}" \
--conn-type neo4j \
--conn-host "${NEO4J_URI}" \
--conn-login "${NEO4J_USER}" \
--conn-password "${NEO4J_PASSWORD}" \
--conn-port "${NEO4J_PORT}"
else
echo "✅ Neo4j connection already exists."
fi
# Установка Airflow Variables из .env
echo "🧩 Setting Airflow Variables..."
# Helper для установки переменной, если её ещё нет
function set_variable() {
local key=$1
local value=$2
if ! airflow variables get "$key" &>/dev/null; then
airflow variables set "$key" "$value"
echo " $key = $value"
else
echo "$key already exists"
fi
}
set_variable "CONN_ID" "${NEO4J_CONN_ID}"
echo "🎉 Initialization complete! Starting webserver..."
exec airflow webserver