Dag error asking for Task Group that is not asked for

I’m experienced in Airflow 1.10.14. I’m now trying to learn Airflow 2, and am having difficulty running DAGs from one to the next. I’m hoping someone can look and tell me what I’m doing wrong.

Here’s my DAG

@dag(
    "FILE_DB_PRODUCT_TABLES",
    schedule_interval="@daily",
    start_date=datetime(2022, 11, 1),
    catchup=False,
    tags=['FILES', 'PRODUCT'],
    template_searchpath="/home/airflow/airflow",
    default_args={
        'email': ['[email protected]'],
        'email_on_failure': True,
        'email_on_success': False
    }
)


def FILE_DB_PRODUCT_TABLES():

    check_for_file = BranchPythonOperator(
        task_id='Check_FTP_and_Download',
        provide_context=True,
        python_callable=GetFiles
    )

    PrepareFiles = BashOperator(
        task_id='Prepare_Files_For_GCS',
        bash_command=SendPRODUCT,
        dag=dag,
    )

    load_File_to_PRODUCT_RAW = GCSToBigQueryOperator(
        task_id='PRODUCT_GCS_to_GDB_Raw',
        bucket="PRODUCT_files",
        source_objects=['To_Process/*.txt'],
        destination_project_dataset_table="PRODUCT.PRODUCT_RAW",
        schema_fields=[
            {'name': 'Field1', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Field2', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Field3', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Field4', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Field5', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Field6', 'type': 'STRING', 'mode': 'NULLABLE'},

        ],
        write_disposition='WRITE_TRUNCATE',
        google_cloud_storage_conn_id='CLOUD_STORAGE_Staging',
        bigquery_conn_id='CLOUD_STORAGE_Staging',
        skip_leading_rows=1,
        soft_fail=True,
        quote_character="",
        field_delimiter="\x1f")
    # [END howto_operator_gcs_to_DB]

    Set_Staging = BigQueryExecuteQueryOperator(
        task_id='Set_PRODUCT_Staging',
        DBl="Select * from `STORAGE-stg-254212.PRODUCT.VWE_PRODUCT_RAW_TO_STAGE`;",
        bigquery_conn_id='CLOUD_STORAGE_Staging',
        use_legacy_sql=False,
        write_disposition='WRITE_TRUNCATE',
        create_disposition='CREATE_IF_NEEDED',
        destination_dataset_table="STORAGE-stg-254212.PRODUCT.PRODUCT_STAGE"
    )

    Populate_Properties = BigQueryExecuteQueryOperator(
        task_id='Update_Properties_Table',
        DBl="./SQL/PRODUCT/PRODUCT.sql",
        bigquery_conn_id='CLOUD_STORAGE_Staging',
        use_legacy_sql=False,
        write_disposition='WRITE_APPEND',
        create_disposition='CREATE_IF_NEEDED'
    )
    #
    #
    Populate_Properties_Details = BigQueryExecuteQueryOperator(
        task_id='Update_Properties_Detail_Table',
        DBl="./SQL/PRODUCT/PRODUCT_DETAIL.sql",
        bigquery_conn_id='CLOUD_STORAGE_Staging',
        use_legacy_sql=False,
        write_disposition='WRITE_APPEND',
        create_disposition='CREATE_IF_NEEDED'
    )

    Populate_Commercial = BigQueryExecuteQueryOperator(
        task_id='Update_COMMERCIAL_Table',
        DBl="./SQL/PRODUCT/PRODUCT_COMMERCIAL.sql",
        bigquery_conn_id='CLOUD_STORAGE_Staging',
        use_legacy_sql=False,
        write_disposition='WRITE_APPEND',
        create_disposition='CREATE_IF_NEEDED'
    )

    archive_PRODUCT_files = GCSToGCSOperator(
        task_id='Archive_PRODUCT_Files',
        source_bucket="PRODUCT_files",
        source_object="To_Process/*.txt",
        destination_bucket="PRODUCT_files",
        destination_object="Archive/",
        move_object=True,
        google_cloud_storage_conn_id='CLOUD_STORAGE_Staging'
    )
    #
    finished = BashOperator(
        task_id='Cleanup_and_Finish',
        bash_command='rm -rf {}'.format(Variable.get("temp_directory") + "PRODUCT/*.*"),
        dag=dag,
    )

    check_for_file >> PrepareFiles >> load_File_to_PRODUCT_RAW >> Set_Staging >> Populate_Properties >> Populate_Properties_Details >> archive_PRODUCT_files >> finished
    Set_Staging >> Populate_Commercial >> archive_PRODUCT_files
    check_for_file >> finished

dag = FILE_DB_PRODUCT_TABLES()

This is the error it’s giving me:

Broken DAG: [/home/airflow/airflow/dags/FILE_BQ_ATTOM_TABLES.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 376, in apply_defaults
    task_group = TaskGroupContext.get_current_task_group(dag)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/task_group.py", line 490, in get_current_task_group
    return dag.task_group
AttributeError: 'function' object has no attribute 'task_group'

The problem is that I’m not asking for a task group. Where do I look for the error here?
Thanks!

You are using the old format of Airflow, which introduces errors like this.

  1. You don’t need to pass dag=dag to each of your operators, this was removed in the TaskFlow API update.

  2. More importantly, you are right now calling dag as a function object, rather than as an instance of a DAG. This is what causes your error.

Try the updated format:

dag = DAG(
    dag_id="FILE_DB_PRODUCT_TABLES",
    schedule_interval="@daily",
    start_date=datetime(2022, 11, 1),
    catchup=False,
    tags=['FILES', 'PRODUCT'],
    template_searchpath="/home/airflow/airflow",
    default_args=default_args
)

with dag:
    check_for_file = BranchPythonOperator(
        task_id='Check_FTP_and_Download',
        python_callable=GetFiles
    )

    PrepareFiles = BashOperator(
        task_id='Prepare_Files_For_GCS',
        bash_command=SendPRODUCT
    )
# rest of your code refactored

Leave a Comment