diff --git a/Dockerfile b/Dockerfile index f727a38..3114a43 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,6 +6,8 @@ ENV USERNAME=airflow # Устанавливаем переменные окружения для Airflow ENV AIRFLOW_HOME=/opt/airflow +COPY --chown=airflow:airflow scripts/init_airflow.sh /usr/local/bin/ + # Копируем файл зависимостей COPY dags ${AIRFLOW_HOME}/dags/ @@ -14,11 +16,4 @@ WORKDIR ${AIRFLOW_HOME} # Устанавливаем дополнительные Python-зависимости RUN pip install --no-cache-dir -r dags/requirements.txt - -COPY --chown=airflow:airflow scripts/init_airflow.sh /usr/local/bin/ RUN chmod +x /usr/local/bin/init_airflow.sh - -# (опционально) можно указать пользователя airflow для безопасности -USER airflow - -FROM build as final \ No newline at end of file diff --git a/README.md b/README.md index ca242eb..eaf98fa 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,43 @@ # Пример использования airflow с подключением к БД neo4j +## Как развернуть проект + +1. **Клонируйте репозиторий:** + +```bash +git clone +cd airflow_neo4j +``` + +2. **Скопируйте и настройте файл переменных окружения:** + +Переименуйте файл `env.example` в `.env` и обязательно заполните все переменные в файле (например, пароли и данные для подключения к Neo4j и Postgres). + +```bash +cp env.example .env +# Откройте .env и укажите свои значения +``` + +3. **Запустите сервисы через Docker Compose:** + +```bash +docker-compose up --build +``` + +4. **Дождитесь сообщения в логе**: + +В процессе инициализации дождитесь строки: + + Initialization complete! Starting webserver... + +После этого интерфейс Airflow будет доступен по адресу: http://ip:8080 + +5. **Первый запуск DAG** + +При первом запуске дага откройте его в Airflow UI и включите тумблер (On). Не нажимайте кнопку запуска (Play) вручную — дождитесь автоматического срабатывания по расписанию или другим триггерам. + +--- + +- Для подробностей по настройке Neo4j и других переменных обратитесь к комментариям внутри `.env` и документации вашего проекта. +- Документация актуальна на момент сборки контейнера и версии Airflow в Dockerfile. + diff --git a/dags/dag_01.py b/dags/dag_01.py index 4ebe70f..6161250 100644 --- a/dags/dag_01.py +++ b/dags/dag_01.py @@ -21,13 +21,12 @@ default_args = { # Определение DAG with DAG( - 'neo4j_health_check_dag', + 'neo4j_healthcheck_and_sample_loader', default_args=default_args, schedule_interval='0 9 * * *', catchup=False, tags=['neo4j', 'healthcheck', 'data_loading', 'vers.01'], - description= - 'DAG для проверки здоровья Neo4j и загрузки тестовых данных', + description='DAG для проверки здоровья Neo4j и загрузки тестовых данных', ) as dag: conn_id = Variable.get("CONN_ID") driver = get_driver(conn_id) @@ -47,6 +46,7 @@ with DAG( dag=dag, retries=3, retry_delay=timedelta(minutes=1), + doc_md="Проверяет соединение с Neo4j и возвращает сообщение о состоянии соединения", ) # Задача генерации тестовых данных @@ -56,6 +56,7 @@ with DAG( dag=dag, retries=3, retry_delay=timedelta(minutes=1), + doc_md="Генерирует тестовые данные, сохраняет их в CSV файл ", ) # Задача загрузки данных в Neo4j @@ -65,6 +66,7 @@ with DAG( dag=dag, retries=3, retry_delay=timedelta(minutes=1), + doc_md="Загружает данные из CSV файла в БД", ) # Финальная задача diff --git a/dags/task_01/target_db/repositories.py b/dags/task_01/target_db/repositories.py index 7f59305..2811d68 100644 --- a/dags/task_01/target_db/repositories.py +++ b/dags/task_01/target_db/repositories.py @@ -27,7 +27,7 @@ class TargetDBRepo: batch = users.iloc[i:i + BATCH_SIZE] records = batch.to_dict(orient="records") session.run(query, {"rows": records}) - self.log.info("rows saved %s", i + BATCH_SIZE) + self.log.info("Rows %s–%s saved (%s)", i, i+len(batch), len(batch)) def get_number_of_users(self) -> int: with self.driver.session() as session: diff --git a/dags/task_01/tasks.py b/dags/task_01/tasks.py index 05acfc5..5f379a2 100644 --- a/dags/task_01/tasks.py +++ b/dags/task_01/tasks.py @@ -13,6 +13,9 @@ log = logging.getLogger(__name__) class UserActionTransfer: + """ + Класс для работы с переносом пользовательских действий в Neo4j. + """ def __init__( self, @@ -22,7 +25,9 @@ class UserActionTransfer: self.log = logging.getLogger(__name__) def generate_sample_data(self): - """Генерация тестовых данных и сохранение в CSV файл""" + """ + Генерирует тестовые данные действий пользователей и сохраняет их в CSV-файл. + """ actions = self.get_fake_user_action_data() csv_file_path = self.get_csv_file_path() actions.to_csv(csv_file_path, index=False) @@ -34,6 +39,9 @@ class UserActionTransfer: logging.info(f"Data preview: %s", actions.head()) def load_data_to_neo4j(self) -> None: + """ + Загружает данные из CSV-файла во временной директории в базу данных Neo4j. + """ csv_file_path = Variable.get("user_action_data_path") if not csv_file_path or not os.path.exists(csv_file_path): @@ -52,7 +60,9 @@ class UserActionTransfer: logging.info("Temporary CSV file cleaned up") def check_neo4j_connection(self): - """Проверка соединения с БД""" + """ + Проверяет соединение с базой данных Neo4j. + """ try: result = self.target_db.check_connection() log.info(f"Neo4j message: {result}") @@ -64,13 +74,19 @@ class UserActionTransfer: @staticmethod def get_csv_file_path() -> str: + """ + Возвращает путь к CSV-файлу во временной директории для пользовательских действий. + """ temp_dir = tempfile.gettempdir() return os.path.join(temp_dir, "user_action_data.csv") @staticmethod def get_fake_user_action_data() -> pd.DataFrame: + """ + Генерирует случайные тестовые данные действий пользователей. + """ actions = ["login", "purchase", "view", "logout", "search"] - ids = list(range(1, 10001)) + ids = list(range(1, 12491)) action = [] timestamp = [] for _ in ids: diff --git a/scripts/init_airflow.sh b/scripts/init_airflow.sh index fa836d6..780b958 100644 --- a/scripts/init_airflow.sh +++ b/scripts/init_airflow.sh @@ -8,7 +8,7 @@ airflow db migrate # Создание пользователя-админа if ! airflow users list | grep -q "${AIRFLOW_ADMIN_EMAIL}"; then - echo "🧑‍💻 Creating Airflow admin user..." + echo "Creating Airflow admin user..." airflow users create \ --username "${AIRFLOW_ADMIN_USER}" \ --firstname "${AIRFLOW_ADMIN_FIRSTNAME}" \