Hi everyone! Thank you for such an excellent product!
I wonder, is it possible to ingest column-level lineage from Airflow DAGs automatically without changing them?
My DAGs are written in a confusing way, maybe. Here is the example below. I don’t want to re-write a bunch of similar Python codes in order to call SDK directly https://datahubproject.io/docs/api/tutorials/lineage/#add-column-level-lineage|https://datahubproject.io/docs/api/tutorials/lineage/#add-column-level-lineage
Hope to get some help from you, thank you!
DataHub version is 0.12.0
from pathlib import Path
import pandas as pd
from generic_sdk.utils.logging import generic_log_function
from loguru import logger
from general_module.util.config.processing_config import GenericProcessingConfig
class DataProcessingConfig(GenericProcessingConfig):
def run(self):
source_db = self.source.db_by_name("source_db_name").source()
destination = self.target.storage_by_name("destination_name").target()
alternate_destination = self.target.storage_by_name("alt_destination_name").target() if self.target.has_storage("alt_destination_name") else None
query_source = source_db.data_source
process_date = self.execution_date.date()
script_path = dirname(__file__)
data_query = open(join(script_path, "data_query.sql")).read()
data_results = query_source.run_raw_query(data_query.format(process_date=process_date))
<http://logger.info|logger.info>(f"Loaded {len(data_results)}")
data_results["date"] = pd.to_datetime(data_results["timestamp_column"]).dt.date
data_results["identifier"] = data_results["identifier"].fillna(-1)
data_results["unique_id"] = data_results["unique_id"].fillna("-1")
generic_log_function(data_results, "data_log")
destination.write_pandas(data_results)
if alternate_destination:
alternate_destination.write_pandas(data_results)
if __name__ == "__main__":
app = DataProcessingConfig.load(Path(__file__).parent)
app.run()```