Воздушный поток: Создайте DAG из отдельного файла

0

Вопрос

В airflow я пытаюсь создать функцию, предназначенную для создания DAG в файле:

dynamic_dags.py:

def generate_dag(name):
    with DAG(
        dag_id=f'dag_{name}',
        default_args=args,
        start_date=days_ago(2),
        schedule_interval='5 5 * * *',
        tags=['Test'],
        catchup=False
    ) as dag:
        dummy_task=DummyOperator(
            task_id="dynamic_dummy_task",
            dag=dag
        )
    return dag

Затем в другом файле я пытаюсь импортировать dag из отдельного файла:

load_dags.py:

from dynamic_dag import generate_dag
globals()["Dynamic_DAG_A"] = generate_dag('A')

Тем не менее, dag не отображаются в веб-интерфейсе. Но если я сделаю их в одном файле, как показано ниже, это будет работать:

def generate_dag(name):
    with DAG(
        dag_id=f'dag_{name}',
        default_args=args,
        start_date=days_ago(2),
        schedule_interval='5 5 * * *',
        tags=['Test'],
        catchup=False
    ) as dag:
        dummy_task=DummyOperator(
            task_id="dynamic_dummy_task",
            dag=dag
        )
    return dag

globals()["Dynamic_DAG_A"] = generate_dag('A')

Мне интересно, почему это не работает в двух отдельных файлах.

airflow airflow-scheduler
2021-11-21 00:44:01
1

Лучший ответ

1

Я думаю, что если вы используете Airflow 1.10, то файлы DAG должны содержать DAG и воздушный поток:

https://airflow.apache.org/docs/apache-airflow/1.10.15/concepts.html?выделите=airflowignore#dag

При поиске DAG Airflow рассматривает только файлы python, которые по умолчанию содержат строки “airflow” и “DAG". Чтобы вместо этого рассмотреть все файлы python, отключите флаг конфигурации DAG_DISCOVERY_SAFE_MODE.

В Airflow 2 он был изменен (немного - dag не учитывает регистр):

https://airflow.apache.org/docs/apache-airflow/2.2.2/concepts/dags.html

При поиске DAG внутри папки DAG_FOLDER Airflow рассматривает только файлы Python, содержащие строки airflow и dag (без учета регистра), в качестве оптимизации. Чтобы вместо этого рассмотреть все файлы Python, отключите флаг конфигурации DAG_DISCOVERY_SAFE_MODE.

Я думаю, что вам просто не хватает "воздушного потока" в вашем load_dags.py. Вы можете добавить его куда угодно, включая комментарии.

2021-11-21 19:02:47

На других языках

Эта страница на других языках

Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................