82 lines
2.8 KiB
Python
82 lines
2.8 KiB
Python
from datetime import datetime, timedelta
|
||
|
||
from airflow import DAG
|
||
from airflow.decorators import task_group
|
||
from airflow.models import Variable
|
||
from airflow.operators.empty import EmptyOperator
|
||
from airflow.operators.python import PythonOperator
|
||
from task_01.composites import UserActionTransferTask
|
||
from task_01.target_db.utills import get_driver
|
||
|
||
# Аргументы DAG по умолчанию
|
||
default_args = {
|
||
'owner': 'data_engineer',
|
||
'depends_on_past': False,
|
||
'start_date': datetime(2025, 11, 1),
|
||
'email_on_failure': False,
|
||
'email_on_retry': False,
|
||
'retries': 1,
|
||
'retry_delay': timedelta(minutes=5),
|
||
'execution_timeout': timedelta(minutes=30),
|
||
}
|
||
|
||
# Определение DAG
|
||
with DAG(
|
||
'neo4j_healthcheck_and_sample_loader',
|
||
default_args=default_args,
|
||
schedule_interval='0 9 * * *',
|
||
catchup=False,
|
||
tags=['neo4j', 'healthcheck', 'data_loading', 'vers.01'],
|
||
description='DAG для проверки здоровья Neo4j и загрузки тестовых данных',
|
||
) as dag:
|
||
conn_id = Variable.get("CONN_ID")
|
||
driver = get_driver(conn_id)
|
||
|
||
task = UserActionTransferTask(driver)
|
||
|
||
# Начальная задача
|
||
start_task = EmptyOperator(
|
||
task_id='start_task',
|
||
dag=dag,
|
||
)
|
||
|
||
# Задача проверки соединения с Neo4j
|
||
check_neo4j_connection_task = PythonOperator(
|
||
task_id='check_neo4j_connection',
|
||
python_callable=task.run.check_neo4j_connection,
|
||
dag=dag,
|
||
retries=3,
|
||
retry_delay=timedelta(minutes=1),
|
||
doc_md="Проверяет соединение с Neo4j и возвращает сообщение о состоянии соединения",
|
||
)
|
||
|
||
# Задача генерации тестовых данных
|
||
generate_sample_data_task = PythonOperator(
|
||
task_id='generate_sample_data',
|
||
python_callable=task.run.generate_sample_data,
|
||
dag=dag,
|
||
retries=3,
|
||
retry_delay=timedelta(minutes=1),
|
||
doc_md="Генерирует тестовые данные, сохраняет их в CSV файл ",
|
||
)
|
||
|
||
# Задача загрузки данных в Neo4j
|
||
load_data_to_neo4j_task = PythonOperator(
|
||
task_id='load_data_to_neo4j',
|
||
python_callable=task.run.load_data_to_neo4j,
|
||
dag=dag,
|
||
retries=3,
|
||
retry_delay=timedelta(minutes=1),
|
||
doc_md="Загружает данные из CSV файла в БД",
|
||
)
|
||
|
||
# Финальная задача
|
||
end_task = EmptyOperator(
|
||
task_id='end_task',
|
||
dag=dag,
|
||
)
|
||
|
||
# Определение зависимостей задач
|
||
(start_task >> check_neo4j_connection_task >> generate_sample_data_task >>
|
||
load_data_to_neo4j_task >> end_task)
|