diff --git a/.gitea/workflows/pipeline.yml b/.gitea/workflows/pipeline.yml new file mode 100644 index 0000000..e69de29 diff --git a/.gitignore b/.gitignore index 19119b0..49a01f3 100644 --- a/.gitignore +++ b/.gitignore @@ -3,9 +3,11 @@ .DS_Store .AppleDouble .LSOverride +.env # Icon must end with two \r -Icon +Icon + # Thumbnails ._* @@ -227,4 +229,3 @@ cython_debug/ # PyPI configuration file .pypirc - diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..f727a38 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ +# Используем официальный образ Apache Airflow +FROM apache/airflow:2.7.1-python3.9 as build + +ENV USERNAME=airflow + +# Устанавливаем переменные окружения для Airflow +ENV AIRFLOW_HOME=/opt/airflow + +# Копируем файл зависимостей +COPY dags ${AIRFLOW_HOME}/dags/ + +WORKDIR ${AIRFLOW_HOME} + +# Устанавливаем дополнительные Python-зависимости +RUN pip install --no-cache-dir -r dags/requirements.txt + + +COPY --chown=airflow:airflow scripts/init_airflow.sh /usr/local/bin/ +RUN chmod +x /usr/local/bin/init_airflow.sh + +# (опционально) можно указать пользователя airflow для безопасности +USER airflow + +FROM build as final \ No newline at end of file diff --git a/README.md b/README.md index ac829a0..ca242eb 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,2 @@ -# airflow_neo4j +# Пример использования airflow с подключением к БД neo4j -Airflow с подключением к БД Neo4j \ No newline at end of file diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dags/dag_01.py b/dags/dag_01.py new file mode 100644 index 0000000..4ebe70f --- /dev/null +++ b/dags/dag_01.py @@ -0,0 +1,78 @@ +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.models import Variable +from airflow.operators.empty import EmptyOperator +from airflow.operators.python import PythonOperator +from task_01.composites import UserActionTransferTask +from task_01.target_db.utills import get_driver + +# Аргументы DAG по умолчанию +default_args = { + 'owner': 'data_engineer', + 'depends_on_past': False, + 'start_date': datetime(2025, 11, 1), + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + 'execution_timeout': timedelta(minutes=30), +} + +# Определение DAG +with DAG( + 'neo4j_health_check_dag', + default_args=default_args, + schedule_interval='0 9 * * *', + catchup=False, + tags=['neo4j', 'healthcheck', 'data_loading', 'vers.01'], + description= + 'DAG для проверки здоровья Neo4j и загрузки тестовых данных', +) as dag: + conn_id = Variable.get("CONN_ID") + driver = get_driver(conn_id) + + task = UserActionTransferTask(driver) + + # Начальная задача + start_task = EmptyOperator( + task_id='start_task', + dag=dag, + ) + + # Задача проверки соединения с Neo4j + check_neo4j_connection_task = PythonOperator( + task_id='check_neo4j_connection', + python_callable=task.run.check_neo4j_connection, + dag=dag, + retries=3, + retry_delay=timedelta(minutes=1), + ) + + # Задача генерации тестовых данных + generate_sample_data_task = PythonOperator( + task_id='generate_sample_data', + python_callable=task.run.generate_sample_data, + dag=dag, + retries=3, + retry_delay=timedelta(minutes=1), + ) + + # Задача загрузки данных в Neo4j + load_data_to_neo4j_task = PythonOperator( + task_id='load_data_to_neo4j', + python_callable=task.run.load_data_to_neo4j, + dag=dag, + retries=3, + retry_delay=timedelta(minutes=1), + ) + + # Финальная задача + end_task = EmptyOperator( + task_id='end_task', + dag=dag, + ) + + # Определение зависимостей задач + (start_task >> check_neo4j_connection_task >> generate_sample_data_task >> + load_data_to_neo4j_task >> end_task) diff --git a/dags/requirements.txt b/dags/requirements.txt new file mode 100644 index 0000000..9f8ffc3 --- /dev/null +++ b/dags/requirements.txt @@ -0,0 +1,4 @@ +apache-airflow~=2.7.1 +apache-airflow-providers-neo4j +pandas +Flask-Session<0.6 \ No newline at end of file diff --git a/dags/task_01/__init__.py b/dags/task_01/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dags/task_01/composites.py b/dags/task_01/composites.py new file mode 100644 index 0000000..df24d62 --- /dev/null +++ b/dags/task_01/composites.py @@ -0,0 +1,22 @@ +from task_01.target_db.repositories import TargetDBRepo +from task_01.tasks import UserActionTransfer + + +class Adapters: + + def __init__(self, driver): + self.driver = driver + + @property + def target_repo(self): + return TargetDBRepo(self.driver) + + +class UserActionTransferTask: + + def __init__(self, driver): + self.adapters = Adapters(driver) + + @property + def run(self): + return UserActionTransfer(target_db=self.adapters.target_repo, ) diff --git a/dags/task_01/dto.py b/dags/task_01/dto.py new file mode 100644 index 0000000..c5d7946 --- /dev/null +++ b/dags/task_01/dto.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass + + +@dataclass +class ConnectionInfo: + uri: str + username: str + password: str diff --git a/dags/task_01/target_db/__init__.py b/dags/task_01/target_db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dags/task_01/target_db/repositories.py b/dags/task_01/target_db/repositories.py new file mode 100644 index 0000000..7f59305 --- /dev/null +++ b/dags/task_01/target_db/repositories.py @@ -0,0 +1,41 @@ +import logging + +from neo4j import Driver +from pandas import DataFrame + +BATCH_SIZE = 1000 + + +class TargetDBRepo: + + def __init__(self, driver: Driver): + self.driver = driver + self.log = logging.getLogger(__name__) + + def save_users(self, users: DataFrame) -> None: + query = """ + UNWIND $rows AS row + CREATE (u:User { + user_id: row.user_id, + action: row.action, + timestamp: row.timestamp + }) + """ + + with self.driver.session() as session: + for i in range(0, len(users), BATCH_SIZE): + batch = users.iloc[i:i + BATCH_SIZE] + records = batch.to_dict(orient="records") + session.run(query, {"rows": records}) + self.log.info("rows saved %s", i + BATCH_SIZE) + + def get_number_of_users(self) -> int: + with self.driver.session() as session: + result = session.run( + "MATCH (u:User) RETURN count(u) as user_count") + return result.single()["user_count"] + + def check_connection(self): + with self.driver.session() as session: + result = session.run('RETURN "Connection successful" AS message') + return result.single()["message"] diff --git a/dags/task_01/target_db/utills.py b/dags/task_01/target_db/utills.py new file mode 100644 index 0000000..8c42462 --- /dev/null +++ b/dags/task_01/target_db/utills.py @@ -0,0 +1,21 @@ +from airflow.hooks.base import BaseHook +from neo4j import Driver, GraphDatabase +from task_01.dto import ConnectionInfo + + +def get_neo4j_connection(conn_id: str) -> ConnectionInfo: + conn = BaseHook.get_connection(conn_id) + uri = f'bolt://{conn.host}:{conn.port}' + return ConnectionInfo( + uri, + username=conn.login, + password=conn.password, + ) + + +def get_driver(conn_id: str) -> Driver: + connection_info = get_neo4j_connection(conn_id) + return GraphDatabase.driver( + uri=connection_info.uri, + auth=(connection_info.username, connection_info.password), + ) diff --git a/dags/task_01/tasks.py b/dags/task_01/tasks.py new file mode 100644 index 0000000..05acfc5 --- /dev/null +++ b/dags/task_01/tasks.py @@ -0,0 +1,84 @@ +import logging +import os +import tempfile +from datetime import datetime +from random import randint + +import pandas as pd +from airflow import AirflowException +from airflow.models import Variable +from task_01.target_db.repositories import TargetDBRepo + +log = logging.getLogger(__name__) + + +class UserActionTransfer: + + def __init__( + self, + target_db: TargetDBRepo, + ): + self.target_db = target_db + self.log = logging.getLogger(__name__) + + def generate_sample_data(self): + """Генерация тестовых данных и сохранение в CSV файл""" + actions = self.get_fake_user_action_data() + csv_file_path = self.get_csv_file_path() + actions.to_csv(csv_file_path, index=False) + + # Сохраняем путь к файлу в переменную DAG + Variable.set("user_action_data_path", csv_file_path) + + logging.info("Sample data generated and saved to: %s", csv_file_path) + logging.info(f"Data preview: %s", actions.head()) + + def load_data_to_neo4j(self) -> None: + csv_file_path = Variable.get("user_action_data_path") + + if not csv_file_path or not os.path.exists(csv_file_path): + raise AirflowException("CSV file not found: %s", csv_file_path) + + # Чтение CSV файла + user_actions = pd.read_csv(csv_file_path) + logging.info("Loaded CSV data with %s rows", len(user_actions)) + + self.target_db.save_users(user_actions) + total_rows = self.target_db.get_number_of_users() + logging.info("Total rows: %s", total_rows) + + # Очистка временного файла + os.remove(csv_file_path) + logging.info("Temporary CSV file cleaned up") + + def check_neo4j_connection(self): + """Проверка соединения с БД""" + try: + result = self.target_db.check_connection() + log.info(f"Neo4j message: {result}") + log.info("Neo4j connection is healthy") + except Exception as e: + log.error(f"Neo4j connection failed: {e}") + raise + return result + + @staticmethod + def get_csv_file_path() -> str: + temp_dir = tempfile.gettempdir() + return os.path.join(temp_dir, "user_action_data.csv") + + @staticmethod + def get_fake_user_action_data() -> pd.DataFrame: + actions = ["login", "purchase", "view", "logout", "search"] + ids = list(range(1, 10001)) + action = [] + timestamp = [] + for _ in ids: + action.append(actions[randint(0, len(actions) - 1)]) + timestamp.append(datetime.now()) + sample_data = { + "user_id": ids, + "action": action, + "timestamp": timestamp + } + return pd.DataFrame(sample_data) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..f590c4c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,65 @@ +version: '3.8' + +services: + postgres: + image: postgres:14 + env_file: .env + environment: + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_DB: ${POSTGRES_DB} + volumes: + - postgres_data:/var/lib/postgresql/data + restart: always + + neo4j: + image: neo4j:5 + environment: + - NEO4J_AUTH=${NEO4J_USER}/${NEO4J_PASSWORD} + ports: + - "7474:7474" + - "7687:7687" + volumes: + - neo4j_data:/data + restart: always + + airflow-webserver: + build: . + depends_on: + - postgres + - neo4j + env_file: .env + environment: + AIRFLOW__CORE__EXECUTOR: LocalExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres/${POSTGRES_DB} + AIRFLOW__CORE__LOAD_EXAMPLES: 'False' + volumes: + - ./dags:/opt/airflow/dags + - ./logs:/opt/airflow/logs + - ./plugins:/opt/airflow/plugins + ports: + - "8080:8080" + command: bash -c "/usr/local/bin/init_airflow.sh" + restart: always + + airflow-scheduler: + build: . + depends_on: + - postgres + - neo4j + - airflow-webserver + env_file: .env + environment: + AIRFLOW__CORE__EXECUTOR: LocalExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres/${POSTGRES_DB} + AIRFLOW__CORE__LOAD_EXAMPLES: 'False' + volumes: + - ./dags:/opt/airflow/dags + - ./logs:/opt/airflow/logs + - ./plugins:/opt/airflow/plugins + command: scheduler + restart: always + +volumes: + postgres_data: + neo4j_data: \ No newline at end of file diff --git a/env.example b/env.example new file mode 100644 index 0000000..d3efad5 --- /dev/null +++ b/env.example @@ -0,0 +1,20 @@ +# Airflow credentials +AIRFLOW_ADMIN_USER=admin +AIRFLOW_ADMIN_PASSWORD=admin +AIRFLOW_ADMIN_EMAIL=admin@example.com +AIRFLOW_ADMIN_FIRSTNAME=Admin +AIRFLOW_ADMIN_LASTNAME=User + +# Database (Postgres) +POSTGRES_USER=airflow +POSTGRES_PASSWORD=airflow +POSTGRES_DB=airflow + +# Neo4j connection details +NEO4J_CONN_ID= +NEO4J_URI= +NEO4J_USER= +NEO4J_PASSWORD= +NEO4J_PORT=7687 + +AIRFLOW_HOME=/opt/airflow \ No newline at end of file diff --git a/scripts/init_airflow.sh b/scripts/init_airflow.sh new file mode 100644 index 0000000..fa836d6 --- /dev/null +++ b/scripts/init_airflow.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash +set -e + +echo "Initializing Airflow..." + +# Инициализация БД (только если первый запуск) +airflow db migrate + +# Создание пользователя-админа +if ! airflow users list | grep -q "${AIRFLOW_ADMIN_EMAIL}"; then + echo "🧑‍💻 Creating Airflow admin user..." + airflow users create \ + --username "${AIRFLOW_ADMIN_USER}" \ + --firstname "${AIRFLOW_ADMIN_FIRSTNAME}" \ + --lastname "${AIRFLOW_ADMIN_LASTNAME}" \ + --role Admin \ + --email "${AIRFLOW_ADMIN_EMAIL}" \ + --password "${AIRFLOW_ADMIN_PASSWORD}" +else + echo "✅ Admin user already exists." +fi + +# Создание подключения к Neo4j +if ! airflow connections list | grep -q "${NEO4J_CONN_ID}"; then + echo "🔌 Creating Neo4j connection..." + airflow connections add "${NEO4J_CONN_ID}" \ + --conn-type neo4j \ + --conn-host "${NEO4J_URI}" \ + --conn-login "${NEO4J_USER}" \ + --conn-password "${NEO4J_PASSWORD}" \ + --conn-port "${NEO4J_PORT}" +else + echo "✅ Neo4j connection already exists." +fi + +# Установка Airflow Variables из .env +echo "🧩 Setting Airflow Variables..." + +# Helper для установки переменной, если её ещё нет +function set_variable() { + local key=$1 + local value=$2 + if ! airflow variables get "$key" &>/dev/null; then + airflow variables set "$key" "$value" + echo " ➕ $key = $value" + else + echo " ✅ $key already exists" + fi +} + +set_variable "CONN_ID" "${NEO4J_CONN_ID}" + +echo "🎉 Initialization complete! Starting webserver..." +exec airflow webserver