Почему задача зависает и не выполняется в воздушном потоке?

Я пытаюсь обдувать BigQueryOperator. Я думал, что позже воспользуюсь google composer, но сначала хочу, чтобы он работал локально. У меня есть воздушный поток, и BashOperator работает нормально, я также могу запустить airflow test <dag> <task>, где task - это большая задача запроса, которую я хочу запустить, но когда я запускаю DAG из пользовательского интерфейса, задача bigquery никогда не ставится в очередь. Вместо этого у них есть состояние REMOVED, и ничего не происходит.

Мое определение DAG выглядит так:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

yesterday = datetime.combine(
    datetime.today() - timedelta(1),
datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
        'tutorial', default_args=default_args) as dag:

operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date')

    template_sql = '''
            SELECT 'TOMAS' name, '{{ params.my_value }}' value, "{{ params.my_value2 }}" tables
        '''

    sampleBigQuery1 = BigQueryOperator(
        task_id='bq_simple_sql1',
        bql=template_sql,
        use_legacy_sql=False,
        destination_dataset_table='temp_tomas.airflow_1',
        allow_large_results=True,
        params={'my_value': (datetime.now()).strftime("%D %T"),
                'my_value2': "yolo"},  # getTables()},
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE'
    )

    t1 >> sampleBigQuery1

Итак, как мне отладить случай, когда он работает airflow test ..., но не запускается планировщиком или пользовательским интерфейсом? Кажется, что-то не так с тем, что у меня здесь?

Локально я использую стандартную установку воздушного потока с sqllite, но я думаю, это не должно иметь никакого влияния. Я запускаю все в одном окружении python, поэтому он должен быть довольно сдержанным.


person Tomas Jansson    schedule 04.05.2018    source источник


Ответы (1)


Если это ваша первая настройка Airflow, вы можете сначала проверить эти вещи: Airflow 1.9.0 стоит в очереди, но не запускает задачи

Кроме того, здесь я бы порекомендовал особенно последний шаг:

  • Если больше ничего не работает, вы можете использовать веб-интерфейс, чтобы щелкнуть значок, а затем в виде графика. Теперь выберите первую задачу и нажмите «Экземпляр задачи». В абзаце «Сведения об экземпляре задачи» вы увидите, почему группа DAG ожидает или не запускается.

Это может дать вам больше информации о том, почему задача не запланирована.

person tobi6    schedule 04.05.2018
comment
Я проверил детали экземпляра, и его состояние: Task is in the 'removed' state which is not a valid state for execution. The task must be cleared in order to be run. Но я понятия не имею, почему он находится в состоянии removed. Как я сказал выше, если я попытаюсь запустить airflow test для конкретной задачи, это сработает. Также в целом все остальное работает. Я могу без проблем запускать группы DAG, состоящие из нескольких BashOperator. - person Tomas Jansson; 04.05.2018
comment
@TomasJansson Вы тоже пробовали airflow clear для этой задачи? - person tobi6; 04.05.2018
comment
Это любопытно, поскольку состояние REMOVED обычно предназначено именно для тех удаленных задач, которые были там раньше, но которые были удалены из DAG. - person tobi6; 04.05.2018
comment
Я переименовал файл, а также имя DAG и снова скопировал файл в папку dags, и, похоже, он работает. - person Tomas Jansson; 05.05.2018
comment
Спасибо, @TomasJansson. Вы должны оставить свой комментарий в качестве ответа на вопрос. Переименование дага исправило это для меня, по крайней мере, на данный момент. - person Jack Davidson; 13.11.2019