<@U01GZEETMEZ> I’m starting a new thread since it’s a distinct issue - inlets are still not emitting for Airflow DataProcessInstance. Are you all able to take a look at this? In the thread are logs where you can see inlets populated for the DataJob emission, but not the DataProcessInstance emission. We are on DataHub 0.12.0, Airflow plugin 0.12.0.3, and Airflow 2.5.3. I’ve traced back through the source code, but have not been able to find anything that would allow outlets to be populated, but not inlets.
Hey there! Make sure your message includes the following information if relevant, so we can help more effectively!
- Which DataHub version are you using? (e.g. 0.12.0)
- Please post any relevant error logs on the thread!
[2023-11-28, 20:46:48 UTC] {_plugin.py:165} INFO - Emitting Datahub Datajob: DataJob(id='dbt.run.tag-airflow_metadata', urn=<datahub.utilities.urns.data_job_urn.DataJobUrn object at 0x7f8bc4784760>, flow_urn=<datahub.utilities.urns.data_flow_urn.DataFlowUrn object at 0x7f8bc4ad7f10>, name=None, description=None, properties={'depends_on_past': 'False', 'email': 'None', 'label': "'dbt.run.tag-airflow_metadata'", 'execution_timeout': 'None', 'sla': 'None', 'task_id': "'dbt.run.tag-airflow_metadata'", 'trigger_rule': "<TriggerRule.ALL_SUCCESS: 'all_success'>", 'wait_for_downstream': 'False', 'downstream_task_ids': "{'dbt.test.tag-airflow_metadata'}", 'inlets': "[Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.external_staging.airflow_tables_public_task_instance,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.external_staging.airflow_tables_public_job,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task_hierarchy_path_detail,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.external_staging.airflow_tables_public_dag_run,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.external_staging.airflow_tables_public_serialized_dag,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_downstream_task,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.external_staging.airflow_tables_public_dag,STG)')]", 'outlets': "[Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task_dependency,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task_hierarchy_path_detail,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_downstream_task,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.fact_airflow_task_instance,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.fact_airflow_dag_run,STG)'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_job,STG)')]"}, url='', tags={'daily', 'data_foundations'}, owners=set(), group_owners=set(), inlets=[], outlets=['urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task_dependency,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task_hierarchy_path_detail,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_downstream_task,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.fact_airflow_task_instance,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.fact_airflow_dag_run,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_job,STG)'], upstream_urns=[<datahub.utilities.urns.data_job_urn.DataJobUrn object at 0x7f8bc4aed910>])
[2023-11-28, 20:46:49 UTC] {_plugin.py:179} INFO - Emitted Start Datahub Dataprocess Instance: DataProcessInstance(id='airflow_metadata_daily_dbt.run.tag-airflow_metadata_scheduled__2023-11-27T02:00:00+00:00', urn=<datahub.utilities.urns.data_process_instance_urn.DataProcessInstanceUrn object at 0x7f8bc4aed580>, orchestrator='airflow', cluster='STG', type='BATCH_SCHEDULED', template_urn=<datahub.utilities.urns.data_job_urn.DataJobUrn object at 0x7f8bc4784760>, parent_instance=None, properties={'run_id': 'scheduled__2023-11-27T02:00:00+00:00', 'duration': '259.447376', 'start_date': '2023-11-28 20:42:28.440766+00:00', 'end_date': '2023-11-28 20:46:47.888142+00:00', 'execution_date': '2023-11-27 02:00:00+00:00', 'try_number': '2', 'hostname': 'airflow-metadata-daily-dbt-run-dd95c3ac32df4c5f953740ea1f4d3f8d', 'max_tries': '4', 'external_executor_id': 'None', 'pid': '29', 'state': 'success', 'operator': 'BashOperator', 'priority_weight': '2', 'unixname': 'airflow', 'log_url': ''}, url='', inlets=[], outlets=['urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task_dependency,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task_hierarchy_path_detail,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_downstream_task,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.fact_airflow_task_instance,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.fact_airflow_dag_run,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_job,STG)'], upstream_urns=[])
[2023-11-28, 20:46:49 UTC] {_plugin.py:191} INFO - Emitted Completed Data Process Instance: DataProcessInstance(id='airflow_metadata_daily_dbt.run.tag-airflow_metadata_scheduled__2023-11-27T02:00:00+00:00', urn=<datahub.utilities.urns.data_process_instance_urn.DataProcessInstanceUrn object at 0x7f8bc42b7820>, orchestrator='airflow', cluster='STG', type='BATCH_SCHEDULED', template_urn=<datahub.utilities.urns.data_job_urn.DataJobUrn object at 0x7f8bc4784760>, parent_instance=None, properties={}, url=None, inlets=[], outlets=['urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task_dependency,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task_hierarchy_path_detail,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag_task,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_dag,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_downstream_task,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.fact_airflow_task_instance,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.fact_airflow_dag_run,STG)', 'urn:li:dataset:(urn:li:dataPlatform:dbt,db.public.dim_airflow_job,STG)'], upstream_urns=[])
[2023-11-28, 20:46:49 UTC] {local_task_job.py:212} INFO - Task exited with return code 0```
I suspect that this PR somehow introduced the issue https://github.com/datahub-project/datahub/pull/9211
Need to dig in a bit more
Actually the _plugin.py
file no longer exists, which leads me to suspect that the airflow plugin is still on an older version and not 0.12.0.3
We previously had a regression that caused inlets to disappear, so I suspect an upgrade should fix this
Thank you - that pointed me to a mismatch we had between our dbt build and Airflow build which was causing this issue. We can see the inlets now.