I’m experienced in Airflow 1.10.14. I’m now trying to learn Airflow 2, and am having difficulty running DAGs from one to the next. I’m hoping someone can look and tell me what I’m doing wrong.
Here’s my DAG
@dag(
"FILE_DB_PRODUCT_TABLES",
schedule_interval="@daily",
start_date=datetime(2022, 11, 1),
catchup=False,
tags=['FILES', 'PRODUCT'],
template_searchpath="/home/airflow/airflow",
default_args={
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_success': False
}
)
def FILE_DB_PRODUCT_TABLES():
check_for_file = BranchPythonOperator(
task_id='Check_FTP_and_Download',
provide_context=True,
python_callable=GetFiles
)
PrepareFiles = BashOperator(
task_id='Prepare_Files_For_GCS',
bash_command=SendPRODUCT,
dag=dag,
)
load_File_to_PRODUCT_RAW = GCSToBigQueryOperator(
task_id='PRODUCT_GCS_to_GDB_Raw',
bucket="PRODUCT_files",
source_objects=['To_Process/*.txt'],
destination_project_dataset_table="PRODUCT.PRODUCT_RAW",
schema_fields=[
{'name': 'Field1', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Field2', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Field3', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Field4', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Field5', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Field6', 'type': 'STRING', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
google_cloud_storage_conn_id='CLOUD_STORAGE_Staging',
bigquery_conn_id='CLOUD_STORAGE_Staging',
skip_leading_rows=1,
soft_fail=True,
quote_character="",
field_delimiter="\x1f")
# [END howto_operator_gcs_to_DB]
Set_Staging = BigQueryExecuteQueryOperator(
task_id='Set_PRODUCT_Staging',
DBl="Select * from `STORAGE-stg-254212.PRODUCT.VWE_PRODUCT_RAW_TO_STAGE`;",
bigquery_conn_id='CLOUD_STORAGE_Staging',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
destination_dataset_table="STORAGE-stg-254212.PRODUCT.PRODUCT_STAGE"
)
Populate_Properties = BigQueryExecuteQueryOperator(
task_id='Update_Properties_Table',
DBl="./SQL/PRODUCT/PRODUCT.sql",
bigquery_conn_id='CLOUD_STORAGE_Staging',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED'
)
#
#
Populate_Properties_Details = BigQueryExecuteQueryOperator(
task_id='Update_Properties_Detail_Table',
DBl="./SQL/PRODUCT/PRODUCT_DETAIL.sql",
bigquery_conn_id='CLOUD_STORAGE_Staging',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED'
)
Populate_Commercial = BigQueryExecuteQueryOperator(
task_id='Update_COMMERCIAL_Table',
DBl="./SQL/PRODUCT/PRODUCT_COMMERCIAL.sql",
bigquery_conn_id='CLOUD_STORAGE_Staging',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED'
)
archive_PRODUCT_files = GCSToGCSOperator(
task_id='Archive_PRODUCT_Files',
source_bucket="PRODUCT_files",
source_object="To_Process/*.txt",
destination_bucket="PRODUCT_files",
destination_object="Archive/",
move_object=True,
google_cloud_storage_conn_id='CLOUD_STORAGE_Staging'
)
#
finished = BashOperator(
task_id='Cleanup_and_Finish',
bash_command='rm -rf {}'.format(Variable.get("temp_directory") + "PRODUCT/*.*"),
dag=dag,
)
check_for_file >> PrepareFiles >> load_File_to_PRODUCT_RAW >> Set_Staging >> Populate_Properties >> Populate_Properties_Details >> archive_PRODUCT_files >> finished
Set_Staging >> Populate_Commercial >> archive_PRODUCT_files
check_for_file >> finished
dag = FILE_DB_PRODUCT_TABLES()
This is the error it’s giving me:
Broken DAG: [/home/airflow/airflow/dags/FILE_BQ_ATTOM_TABLES.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 376, in apply_defaults
task_group = TaskGroupContext.get_current_task_group(dag)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/task_group.py", line 490, in get_current_task_group
return dag.task_group
AttributeError: 'function' object has no attribute 'task_group'
The problem is that I’m not asking for a task group. Where do I look for the error here?
Thanks!
You are using the old format of Airflow, which introduces errors like this.
-
You don’t need to pass dag=dag to each of your operators, this was removed in the TaskFlow API update.
-
More importantly, you are right now calling dag as a function object, rather than as an instance of a DAG. This is what causes your error.
Try the updated format:
dag = DAG(
dag_id="FILE_DB_PRODUCT_TABLES",
schedule_interval="@daily",
start_date=datetime(2022, 11, 1),
catchup=False,
tags=['FILES', 'PRODUCT'],
template_searchpath="/home/airflow/airflow",
default_args=default_args
)
with dag:
check_for_file = BranchPythonOperator(
task_id='Check_FTP_and_Download',
python_callable=GetFiles
)
PrepareFiles = BashOperator(
task_id='Prepare_Files_For_GCS',
bash_command=SendPRODUCT
)
# rest of your code refactored