Automating Airflow DAG Column-Level Lineage Ingestion without Code Changes

Original Slack Thread

Hi everyone! Thank you for such an excellent product!
I wonder, is it possible to ingest column-level lineage from Airflow DAGs automatically without changing them?
My DAGs are written in a confusing way, maybe. Here is the example below. I don’t want to re-write a bunch of similar Python codes in order to call SDK directly https://datahubproject.io/docs/api/tutorials/lineage/#add-column-level-lineage|https://datahubproject.io/docs/api/tutorials/lineage/#add-column-level-lineage
Hope to get some help from you, thank you!
DataHub version is 0.12.0

from pathlib import Path

import pandas as pd
from generic_sdk.utils.logging import generic_log_function
from loguru import logger

from general_module.util.config.processing_config import GenericProcessingConfig

class DataProcessingConfig(GenericProcessingConfig):
    def run(self):
        source_db = self.source.db_by_name("source_db_name").source()
        destination = self.target.storage_by_name("destination_name").target()
        alternate_destination = self.target.storage_by_name("alt_destination_name").target() if self.target.has_storage("alt_destination_name") else None
        query_source = source_db.data_source
        process_date = self.execution_date.date()
        script_path = dirname(__file__)

        data_query = open(join(script_path, "data_query.sql")).read()
        data_results = query_source.run_raw_query(data_query.format(process_date=process_date))
        <http://logger.info|logger.info>(f"Loaded {len(data_results)}")
        data_results["date"] = pd.to_datetime(data_results["timestamp_column"]).dt.date
        data_results["identifier"] = data_results["identifier"].fillna(-1)
        data_results["unique_id"] = data_results["unique_id"].fillna("-1")
        generic_log_function(data_results, "data_log")
        destination.write_pandas(data_results)
        if alternate_destination:
            alternate_destination.write_pandas(data_results)


if __name__ == "__main__":
    app = DataProcessingConfig.load(Path(__file__).parent)
    app.run()```

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Are you using UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g. 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g. BigQuery)

Hi Bul! We definitely understand the pain of re-writing Airflow DAGs :slightly_smiling_face: If you’re able, I suggest joining us at the Community Marathon, specifically the Ingestion sessions so we can give you live help https://datahubspace.slack.com/archives/CUMV92XRQ/p1705433341623299