From 745f1ef2de4a579980ed64e06eab31b632611147 Mon Sep 17 00:00:00 2001 From: iradik Date: Tue, 4 Nov 2025 00:17:47 +0300 Subject: [PATCH 1/3] =?UTF-8?q?=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D1=8F?= =?UTF-8?q?=D0=B5=D1=82=20dag?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitea/workflows/pipeline.yml | 0 .gitignore | 5 +- Dockerfile | 24 ++++++++ README.md | 3 +- __init__.py | 0 dags/dag_01.py | 78 ++++++++++++++++++++++++ dags/requirements.txt | 4 ++ dags/task_01/__init__.py | 0 dags/task_01/composites.py | 22 +++++++ dags/task_01/dto.py | 8 +++ dags/task_01/target_db/__init__.py | 0 dags/task_01/target_db/repositories.py | 41 +++++++++++++ dags/task_01/target_db/utills.py | 21 +++++++ dags/task_01/tasks.py | 84 ++++++++++++++++++++++++++ docker-compose.yml | 65 ++++++++++++++++++++ env.example | 20 ++++++ scripts/init_airflow.sh | 54 +++++++++++++++++ 17 files changed, 425 insertions(+), 4 deletions(-) create mode 100644 .gitea/workflows/pipeline.yml create mode 100644 Dockerfile create mode 100644 __init__.py create mode 100644 dags/dag_01.py create mode 100644 dags/requirements.txt create mode 100644 dags/task_01/__init__.py create mode 100644 dags/task_01/composites.py create mode 100644 dags/task_01/dto.py create mode 100644 dags/task_01/target_db/__init__.py create mode 100644 dags/task_01/target_db/repositories.py create mode 100644 dags/task_01/target_db/utills.py create mode 100644 dags/task_01/tasks.py create mode 100644 docker-compose.yml create mode 100644 env.example create mode 100644 scripts/init_airflow.sh diff --git a/.gitea/workflows/pipeline.yml b/.gitea/workflows/pipeline.yml new file mode 100644 index 0000000..e69de29 diff --git a/.gitignore b/.gitignore index 19119b0..49a01f3 100644 --- a/.gitignore +++ b/.gitignore @@ -3,9 +3,11 @@ .DS_Store .AppleDouble .LSOverride +.env # Icon must end with two \r -Icon +Icon + # Thumbnails ._* @@ -227,4 +229,3 @@ cython_debug/ # PyPI configuration file .pypirc - diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..f727a38 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ +# Используем официальный образ Apache Airflow +FROM apache/airflow:2.7.1-python3.9 as build + +ENV USERNAME=airflow + +# Устанавливаем переменные окружения для Airflow +ENV AIRFLOW_HOME=/opt/airflow + +# Копируем файл зависимостей +COPY dags ${AIRFLOW_HOME}/dags/ + +WORKDIR ${AIRFLOW_HOME} + +# Устанавливаем дополнительные Python-зависимости +RUN pip install --no-cache-dir -r dags/requirements.txt + + +COPY --chown=airflow:airflow scripts/init_airflow.sh /usr/local/bin/ +RUN chmod +x /usr/local/bin/init_airflow.sh + +# (опционально) можно указать пользователя airflow для безопасности +USER airflow + +FROM build as final \ No newline at end of file diff --git a/README.md b/README.md index ac829a0..ca242eb 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,2 @@ -# airflow_neo4j +# Пример использования airflow с подключением к БД neo4j -Airflow с подключением к БД Neo4j \ No newline at end of file diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dags/dag_01.py b/dags/dag_01.py new file mode 100644 index 0000000..4ebe70f --- /dev/null +++ b/dags/dag_01.py @@ -0,0 +1,78 @@ +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.models import Variable +from airflow.operators.empty import EmptyOperator +from airflow.operators.python import PythonOperator +from task_01.composites import UserActionTransferTask +from task_01.target_db.utills import get_driver + +# Аргументы DAG по умолчанию +default_args = { + 'owner': 'data_engineer', + 'depends_on_past': False, + 'start_date': datetime(2025, 11, 1), + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + 'execution_timeout': timedelta(minutes=30), +} + +# Определение DAG +with DAG( + 'neo4j_health_check_dag', + default_args=default_args, + schedule_interval='0 9 * * *', + catchup=False, + tags=['neo4j', 'healthcheck', 'data_loading', 'vers.01'], + description= + 'DAG для проверки здоровья Neo4j и загрузки тестовых данных', +) as dag: + conn_id = Variable.get("CONN_ID") + driver = get_driver(conn_id) + + task = UserActionTransferTask(driver) + + # Начальная задача + start_task = EmptyOperator( + task_id='start_task', + dag=dag, + ) + + # Задача проверки соединения с Neo4j + check_neo4j_connection_task = PythonOperator( + task_id='check_neo4j_connection', + python_callable=task.run.check_neo4j_connection, + dag=dag, + retries=3, + retry_delay=timedelta(minutes=1), + ) + + # Задача генерации тестовых данных + generate_sample_data_task = PythonOperator( + task_id='generate_sample_data', + python_callable=task.run.generate_sample_data, + dag=dag, + retries=3, + retry_delay=timedelta(minutes=1), + ) + + # Задача загрузки данных в Neo4j + load_data_to_neo4j_task = PythonOperator( + task_id='load_data_to_neo4j', + python_callable=task.run.load_data_to_neo4j, + dag=dag, + retries=3, + retry_delay=timedelta(minutes=1), + ) + + # Финальная задача + end_task = EmptyOperator( + task_id='end_task', + dag=dag, + ) + + # Определение зависимостей задач + (start_task >> check_neo4j_connection_task >> generate_sample_data_task >> + load_data_to_neo4j_task >> end_task) diff --git a/dags/requirements.txt b/dags/requirements.txt new file mode 100644 index 0000000..9f8ffc3 --- /dev/null +++ b/dags/requirements.txt @@ -0,0 +1,4 @@ +apache-airflow~=2.7.1 +apache-airflow-providers-neo4j +pandas +Flask-Session<0.6 \ No newline at end of file diff --git a/dags/task_01/__init__.py b/dags/task_01/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dags/task_01/composites.py b/dags/task_01/composites.py new file mode 100644 index 0000000..df24d62 --- /dev/null +++ b/dags/task_01/composites.py @@ -0,0 +1,22 @@ +from task_01.target_db.repositories import TargetDBRepo +from task_01.tasks import UserActionTransfer + + +class Adapters: + + def __init__(self, driver): + self.driver = driver + + @property + def target_repo(self): + return TargetDBRepo(self.driver) + + +class UserActionTransferTask: + + def __init__(self, driver): + self.adapters = Adapters(driver) + + @property + def run(self): + return UserActionTransfer(target_db=self.adapters.target_repo, ) diff --git a/dags/task_01/dto.py b/dags/task_01/dto.py new file mode 100644 index 0000000..c5d7946 --- /dev/null +++ b/dags/task_01/dto.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass + + +@dataclass +class ConnectionInfo: + uri: str + username: str + password: str diff --git a/dags/task_01/target_db/__init__.py b/dags/task_01/target_db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dags/task_01/target_db/repositories.py b/dags/task_01/target_db/repositories.py new file mode 100644 index 0000000..7f59305 --- /dev/null +++ b/dags/task_01/target_db/repositories.py @@ -0,0 +1,41 @@ +import logging + +from neo4j import Driver +from pandas import DataFrame + +BATCH_SIZE = 1000 + + +class TargetDBRepo: + + def __init__(self, driver: Driver): + self.driver = driver + self.log = logging.getLogger(__name__) + + def save_users(self, users: DataFrame) -> None: + query = """ + UNWIND $rows AS row + CREATE (u:User { + user_id: row.user_id, + action: row.action, + timestamp: row.timestamp + }) + """ + + with self.driver.session() as session: + for i in range(0, len(users), BATCH_SIZE): + batch = users.iloc[i:i + BATCH_SIZE] + records = batch.to_dict(orient="records") + session.run(query, {"rows": records}) + self.log.info("rows saved %s", i + BATCH_SIZE) + + def get_number_of_users(self) -> int: + with self.driver.session() as session: + result = session.run( + "MATCH (u:User) RETURN count(u) as user_count") + return result.single()["user_count"] + + def check_connection(self): + with self.driver.session() as session: + result = session.run('RETURN "Connection successful" AS message') + return result.single()["message"] diff --git a/dags/task_01/target_db/utills.py b/dags/task_01/target_db/utills.py new file mode 100644 index 0000000..8c42462 --- /dev/null +++ b/dags/task_01/target_db/utills.py @@ -0,0 +1,21 @@ +from airflow.hooks.base import BaseHook +from neo4j import Driver, GraphDatabase +from task_01.dto import ConnectionInfo + + +def get_neo4j_connection(conn_id: str) -> ConnectionInfo: + conn = BaseHook.get_connection(conn_id) + uri = f'bolt://{conn.host}:{conn.port}' + return ConnectionInfo( + uri, + username=conn.login, + password=conn.password, + ) + + +def get_driver(conn_id: str) -> Driver: + connection_info = get_neo4j_connection(conn_id) + return GraphDatabase.driver( + uri=connection_info.uri, + auth=(connection_info.username, connection_info.password), + ) diff --git a/dags/task_01/tasks.py b/dags/task_01/tasks.py new file mode 100644 index 0000000..05acfc5 --- /dev/null +++ b/dags/task_01/tasks.py @@ -0,0 +1,84 @@ +import logging +import os +import tempfile +from datetime import datetime +from random import randint + +import pandas as pd +from airflow import AirflowException +from airflow.models import Variable +from task_01.target_db.repositories import TargetDBRepo + +log = logging.getLogger(__name__) + + +class UserActionTransfer: + + def __init__( + self, + target_db: TargetDBRepo, + ): + self.target_db = target_db + self.log = logging.getLogger(__name__) + + def generate_sample_data(self): + """Генерация тестовых данных и сохранение в CSV файл""" + actions = self.get_fake_user_action_data() + csv_file_path = self.get_csv_file_path() + actions.to_csv(csv_file_path, index=False) + + # Сохраняем путь к файлу в переменную DAG + Variable.set("user_action_data_path", csv_file_path) + + logging.info("Sample data generated and saved to: %s", csv_file_path) + logging.info(f"Data preview: %s", actions.head()) + + def load_data_to_neo4j(self) -> None: + csv_file_path = Variable.get("user_action_data_path") + + if not csv_file_path or not os.path.exists(csv_file_path): + raise AirflowException("CSV file not found: %s", csv_file_path) + + # Чтение CSV файла + user_actions = pd.read_csv(csv_file_path) + logging.info("Loaded CSV data with %s rows", len(user_actions)) + + self.target_db.save_users(user_actions) + total_rows = self.target_db.get_number_of_users() + logging.info("Total rows: %s", total_rows) + + # Очистка временного файла + os.remove(csv_file_path) + logging.info("Temporary CSV file cleaned up") + + def check_neo4j_connection(self): + """Проверка соединения с БД""" + try: + result = self.target_db.check_connection() + log.info(f"Neo4j message: {result}") + log.info("Neo4j connection is healthy") + except Exception as e: + log.error(f"Neo4j connection failed: {e}") + raise + return result + + @staticmethod + def get_csv_file_path() -> str: + temp_dir = tempfile.gettempdir() + return os.path.join(temp_dir, "user_action_data.csv") + + @staticmethod + def get_fake_user_action_data() -> pd.DataFrame: + actions = ["login", "purchase", "view", "logout", "search"] + ids = list(range(1, 10001)) + action = [] + timestamp = [] + for _ in ids: + action.append(actions[randint(0, len(actions) - 1)]) + timestamp.append(datetime.now()) + sample_data = { + "user_id": ids, + "action": action, + "timestamp": timestamp + } + return pd.DataFrame(sample_data) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..f590c4c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,65 @@ +version: '3.8' + +services: + postgres: + image: postgres:14 + env_file: .env + environment: + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_DB: ${POSTGRES_DB} + volumes: + - postgres_data:/var/lib/postgresql/data + restart: always + + neo4j: + image: neo4j:5 + environment: + - NEO4J_AUTH=${NEO4J_USER}/${NEO4J_PASSWORD} + ports: + - "7474:7474" + - "7687:7687" + volumes: + - neo4j_data:/data + restart: always + + airflow-webserver: + build: . + depends_on: + - postgres + - neo4j + env_file: .env + environment: + AIRFLOW__CORE__EXECUTOR: LocalExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres/${POSTGRES_DB} + AIRFLOW__CORE__LOAD_EXAMPLES: 'False' + volumes: + - ./dags:/opt/airflow/dags + - ./logs:/opt/airflow/logs + - ./plugins:/opt/airflow/plugins + ports: + - "8080:8080" + command: bash -c "/usr/local/bin/init_airflow.sh" + restart: always + + airflow-scheduler: + build: . + depends_on: + - postgres + - neo4j + - airflow-webserver + env_file: .env + environment: + AIRFLOW__CORE__EXECUTOR: LocalExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres/${POSTGRES_DB} + AIRFLOW__CORE__LOAD_EXAMPLES: 'False' + volumes: + - ./dags:/opt/airflow/dags + - ./logs:/opt/airflow/logs + - ./plugins:/opt/airflow/plugins + command: scheduler + restart: always + +volumes: + postgres_data: + neo4j_data: \ No newline at end of file diff --git a/env.example b/env.example new file mode 100644 index 0000000..d3efad5 --- /dev/null +++ b/env.example @@ -0,0 +1,20 @@ +# Airflow credentials +AIRFLOW_ADMIN_USER=admin +AIRFLOW_ADMIN_PASSWORD=admin +AIRFLOW_ADMIN_EMAIL=admin@example.com +AIRFLOW_ADMIN_FIRSTNAME=Admin +AIRFLOW_ADMIN_LASTNAME=User + +# Database (Postgres) +POSTGRES_USER=airflow +POSTGRES_PASSWORD=airflow +POSTGRES_DB=airflow + +# Neo4j connection details +NEO4J_CONN_ID= +NEO4J_URI= +NEO4J_USER= +NEO4J_PASSWORD= +NEO4J_PORT=7687 + +AIRFLOW_HOME=/opt/airflow \ No newline at end of file diff --git a/scripts/init_airflow.sh b/scripts/init_airflow.sh new file mode 100644 index 0000000..fa836d6 --- /dev/null +++ b/scripts/init_airflow.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash +set -e + +echo "Initializing Airflow..." + +# Инициализация БД (только если первый запуск) +airflow db migrate + +# Создание пользователя-админа +if ! airflow users list | grep -q "${AIRFLOW_ADMIN_EMAIL}"; then + echo "🧑‍💻 Creating Airflow admin user..." + airflow users create \ + --username "${AIRFLOW_ADMIN_USER}" \ + --firstname "${AIRFLOW_ADMIN_FIRSTNAME}" \ + --lastname "${AIRFLOW_ADMIN_LASTNAME}" \ + --role Admin \ + --email "${AIRFLOW_ADMIN_EMAIL}" \ + --password "${AIRFLOW_ADMIN_PASSWORD}" +else + echo "✅ Admin user already exists." +fi + +# Создание подключения к Neo4j +if ! airflow connections list | grep -q "${NEO4J_CONN_ID}"; then + echo "🔌 Creating Neo4j connection..." + airflow connections add "${NEO4J_CONN_ID}" \ + --conn-type neo4j \ + --conn-host "${NEO4J_URI}" \ + --conn-login "${NEO4J_USER}" \ + --conn-password "${NEO4J_PASSWORD}" \ + --conn-port "${NEO4J_PORT}" +else + echo "✅ Neo4j connection already exists." +fi + +# Установка Airflow Variables из .env +echo "🧩 Setting Airflow Variables..." + +# Helper для установки переменной, если её ещё нет +function set_variable() { + local key=$1 + local value=$2 + if ! airflow variables get "$key" &>/dev/null; then + airflow variables set "$key" "$value" + echo " ➕ $key = $value" + else + echo " ✅ $key already exists" + fi +} + +set_variable "CONN_ID" "${NEO4J_CONN_ID}" + +echo "🎉 Initialization complete! Starting webserver..." +exec airflow webserver From 5c2c37952a5354141323f1d607580fdb7c865ca5 Mon Sep 17 00:00:00 2001 From: iradik Date: Tue, 4 Nov 2025 00:54:47 +0300 Subject: [PATCH 2/3] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=BB=D1=8F?= =?UTF-8?q?=D0=B5=D1=82=20Dockerfile,=20=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2?= =?UTF-8?q?=D0=BB=D1=8F=D0=B5=D1=82=20=D1=81=D0=BA=D1=80=D0=B8=D0=BF=D1=82?= =?UTF-8?q?=20=D0=B8=D0=BD=D0=B8=D1=86=D0=B8=D0=B0=D0=BB=D0=B8=D0=B7=D0=B0?= =?UTF-8?q?=D1=86=D0=B8=D0=B8=20Airflow,=20=D1=83=D0=BB=D1=83=D1=87=D1=88?= =?UTF-8?q?=D0=B0=D0=B5=D1=82=20=D0=B4=D0=BE=D0=BA=D1=83=D0=BC=D0=B5=D0=BD?= =?UTF-8?q?=D1=82=D0=B0=D1=86=D0=B8=D1=8E=20=D0=B2=20README.md,=20=D0=B8?= =?UTF-8?q?=D0=B7=D0=BC=D0=B5=D0=BD=D1=8F=D0=B5=D1=82=20=D0=BD=D0=B0=D0=B7?= =?UTF-8?q?=D0=B2=D0=B0=D0=BD=D0=B8=D0=B5=20DAG=20=D0=B8=20=D0=B4=D0=BE?= =?UTF-8?q?=D0=B1=D0=B0=D0=B2=D0=BB=D1=8F=D0=B5=D1=82=20=D0=BE=D0=BF=D0=B8?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B8=D0=B5=20=D0=B7=D0=B0=D0=B4=D0=B0=D1=87?= =?UTF-8?q?,=20=D0=B0=20=D1=82=D0=B0=D0=BA=D0=B6=D0=B5=20=D1=83=D0=BB?= =?UTF-8?q?=D1=83=D1=87=D1=88=D0=B0=D0=B5=D1=82=20=D0=BB=D0=BE=D0=B3=D0=B8?= =?UTF-8?q?=20=D0=B2=20=D0=BA=D0=BB=D0=B0=D1=81=D1=81=D0=B5=20UserActionTr?= =?UTF-8?q?ansfer.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 9 ++---- README.md | 41 ++++++++++++++++++++++++++ dags/dag_01.py | 8 +++-- dags/task_01/target_db/repositories.py | 2 +- dags/task_01/tasks.py | 22 ++++++++++++-- scripts/init_airflow.sh | 2 +- 6 files changed, 69 insertions(+), 15 deletions(-) diff --git a/Dockerfile b/Dockerfile index f727a38..3114a43 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,6 +6,8 @@ ENV USERNAME=airflow # Устанавливаем переменные окружения для Airflow ENV AIRFLOW_HOME=/opt/airflow +COPY --chown=airflow:airflow scripts/init_airflow.sh /usr/local/bin/ + # Копируем файл зависимостей COPY dags ${AIRFLOW_HOME}/dags/ @@ -14,11 +16,4 @@ WORKDIR ${AIRFLOW_HOME} # Устанавливаем дополнительные Python-зависимости RUN pip install --no-cache-dir -r dags/requirements.txt - -COPY --chown=airflow:airflow scripts/init_airflow.sh /usr/local/bin/ RUN chmod +x /usr/local/bin/init_airflow.sh - -# (опционально) можно указать пользователя airflow для безопасности -USER airflow - -FROM build as final \ No newline at end of file diff --git a/README.md b/README.md index ca242eb..eaf98fa 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,43 @@ # Пример использования airflow с подключением к БД neo4j +## Как развернуть проект + +1. **Клонируйте репозиторий:** + +```bash +git clone +cd airflow_neo4j +``` + +2. **Скопируйте и настройте файл переменных окружения:** + +Переименуйте файл `env.example` в `.env` и обязательно заполните все переменные в файле (например, пароли и данные для подключения к Neo4j и Postgres). + +```bash +cp env.example .env +# Откройте .env и укажите свои значения +``` + +3. **Запустите сервисы через Docker Compose:** + +```bash +docker-compose up --build +``` + +4. **Дождитесь сообщения в логе**: + +В процессе инициализации дождитесь строки: + + Initialization complete! Starting webserver... + +После этого интерфейс Airflow будет доступен по адресу: http://ip:8080 + +5. **Первый запуск DAG** + +При первом запуске дага откройте его в Airflow UI и включите тумблер (On). Не нажимайте кнопку запуска (Play) вручную — дождитесь автоматического срабатывания по расписанию или другим триггерам. + +--- + +- Для подробностей по настройке Neo4j и других переменных обратитесь к комментариям внутри `.env` и документации вашего проекта. +- Документация актуальна на момент сборки контейнера и версии Airflow в Dockerfile. + diff --git a/dags/dag_01.py b/dags/dag_01.py index 4ebe70f..6161250 100644 --- a/dags/dag_01.py +++ b/dags/dag_01.py @@ -21,13 +21,12 @@ default_args = { # Определение DAG with DAG( - 'neo4j_health_check_dag', + 'neo4j_healthcheck_and_sample_loader', default_args=default_args, schedule_interval='0 9 * * *', catchup=False, tags=['neo4j', 'healthcheck', 'data_loading', 'vers.01'], - description= - 'DAG для проверки здоровья Neo4j и загрузки тестовых данных', + description='DAG для проверки здоровья Neo4j и загрузки тестовых данных', ) as dag: conn_id = Variable.get("CONN_ID") driver = get_driver(conn_id) @@ -47,6 +46,7 @@ with DAG( dag=dag, retries=3, retry_delay=timedelta(minutes=1), + doc_md="Проверяет соединение с Neo4j и возвращает сообщение о состоянии соединения", ) # Задача генерации тестовых данных @@ -56,6 +56,7 @@ with DAG( dag=dag, retries=3, retry_delay=timedelta(minutes=1), + doc_md="Генерирует тестовые данные, сохраняет их в CSV файл ", ) # Задача загрузки данных в Neo4j @@ -65,6 +66,7 @@ with DAG( dag=dag, retries=3, retry_delay=timedelta(minutes=1), + doc_md="Загружает данные из CSV файла в БД", ) # Финальная задача diff --git a/dags/task_01/target_db/repositories.py b/dags/task_01/target_db/repositories.py index 7f59305..2811d68 100644 --- a/dags/task_01/target_db/repositories.py +++ b/dags/task_01/target_db/repositories.py @@ -27,7 +27,7 @@ class TargetDBRepo: batch = users.iloc[i:i + BATCH_SIZE] records = batch.to_dict(orient="records") session.run(query, {"rows": records}) - self.log.info("rows saved %s", i + BATCH_SIZE) + self.log.info("Rows %s–%s saved (%s)", i, i+len(batch), len(batch)) def get_number_of_users(self) -> int: with self.driver.session() as session: diff --git a/dags/task_01/tasks.py b/dags/task_01/tasks.py index 05acfc5..5f379a2 100644 --- a/dags/task_01/tasks.py +++ b/dags/task_01/tasks.py @@ -13,6 +13,9 @@ log = logging.getLogger(__name__) class UserActionTransfer: + """ + Класс для работы с переносом пользовательских действий в Neo4j. + """ def __init__( self, @@ -22,7 +25,9 @@ class UserActionTransfer: self.log = logging.getLogger(__name__) def generate_sample_data(self): - """Генерация тестовых данных и сохранение в CSV файл""" + """ + Генерирует тестовые данные действий пользователей и сохраняет их в CSV-файл. + """ actions = self.get_fake_user_action_data() csv_file_path = self.get_csv_file_path() actions.to_csv(csv_file_path, index=False) @@ -34,6 +39,9 @@ class UserActionTransfer: logging.info(f"Data preview: %s", actions.head()) def load_data_to_neo4j(self) -> None: + """ + Загружает данные из CSV-файла во временной директории в базу данных Neo4j. + """ csv_file_path = Variable.get("user_action_data_path") if not csv_file_path or not os.path.exists(csv_file_path): @@ -52,7 +60,9 @@ class UserActionTransfer: logging.info("Temporary CSV file cleaned up") def check_neo4j_connection(self): - """Проверка соединения с БД""" + """ + Проверяет соединение с базой данных Neo4j. + """ try: result = self.target_db.check_connection() log.info(f"Neo4j message: {result}") @@ -64,13 +74,19 @@ class UserActionTransfer: @staticmethod def get_csv_file_path() -> str: + """ + Возвращает путь к CSV-файлу во временной директории для пользовательских действий. + """ temp_dir = tempfile.gettempdir() return os.path.join(temp_dir, "user_action_data.csv") @staticmethod def get_fake_user_action_data() -> pd.DataFrame: + """ + Генерирует случайные тестовые данные действий пользователей. + """ actions = ["login", "purchase", "view", "logout", "search"] - ids = list(range(1, 10001)) + ids = list(range(1, 12491)) action = [] timestamp = [] for _ in ids: diff --git a/scripts/init_airflow.sh b/scripts/init_airflow.sh index fa836d6..780b958 100644 --- a/scripts/init_airflow.sh +++ b/scripts/init_airflow.sh @@ -8,7 +8,7 @@ airflow db migrate # Создание пользователя-админа if ! airflow users list | grep -q "${AIRFLOW_ADMIN_EMAIL}"; then - echo "🧑‍💻 Creating Airflow admin user..." + echo "Creating Airflow admin user..." airflow users create \ --username "${AIRFLOW_ADMIN_USER}" \ --firstname "${AIRFLOW_ADMIN_FIRSTNAME}" \ From b1595497d1d4ff63fc89689b00a9dd8e16fc7f8a Mon Sep 17 00:00:00 2001 From: iradik Date: Tue, 4 Nov 2025 00:57:50 +0300 Subject: [PATCH 3/3] read.me --- .dockerignore | 5 +++++ README.md | 12 +++--------- 2 files changed, 8 insertions(+), 9 deletions(-) create mode 100644 .dockerignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..b3a6ab3 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ + __pycache__ + *.pyc + *.pyo + .git + .DS_Store \ No newline at end of file diff --git a/README.md b/README.md index eaf98fa..52dfacc 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ -# Пример использования airflow с подключением к БД neo4j +### Пример использования airflow с подключением к БД neo4j -## Как развернуть проект +### Как развернуть проект 1. **Клонируйте репозиторий:** ```bash -git clone +git clone https://git.ooru.ru/radik/airflow_neo4j.git cd airflow_neo4j ``` @@ -35,9 +35,3 @@ docker-compose up --build 5. **Первый запуск DAG** При первом запуске дага откройте его в Airflow UI и включите тумблер (On). Не нажимайте кнопку запуска (Play) вручную — дождитесь автоматического срабатывания по расписанию или другим триггерам. - ---- - -- Для подробностей по настройке Neo4j и других переменных обратитесь к комментариям внутри `.env` и документации вашего проекта. -- Документация актуальна на момент сборки контейнера и версии Airflow в Dockerfile. -