Troubleshooting code for column-level lineage from database to Elasticsearch index

Original Slack Thread

<@U03MF8MU5P0> i am currently running this code to see the column level lineage from database to elasticsearch index but when the code is completed, i don’t see the lineage. What i am doing wrong.

from elasticsearch import Elasticsearch
import re
from datahub.ingestion.run.pipeline import Pipeline
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
    DatasetLineageType,
    Upstream,
    UpstreamLineage,
    FineGrainedLineage,
    FineGrainedLineageUpstreamType,
    FineGrainedLineageDownstreamType,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass

# Disable the InsecureRequestWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# ===============================
# MySQL Ingestion Pipeline
# ===============================
mysql_config = {
    "pipeline_name": "alarmdb_mysql_ingest_pipeline",
    "source": {
        "type": "mysql",
        "config": {
            "host_port": "host",
            "database": "xxx",
            "username": "xxx",
            "password": "xxx",
            "platform_instance": "c1customer_info",
            "include_tables": True,
            "include_views": False,
            "profiling": {"enabled": True, "profile_table_level_only": True},
            "stateful_ingestion": {"enabled": True},
            "env": "PROD",
        },
    },
    "sink": {"type": "datahub-rest", "config": {"server": "<http://xxxx:8080>"}},
}

# Create and run MySQL ingestion pipeline
mysql_pipeline = Pipeline.create(mysql_config)
mysql_pipeline.run()
mysql_pipeline.raise_from_status()
print("MySQL ingestion pipeline completed successfully.")

# ===============================
# Elasticsearch Ingestion Pipeline
# ===============================
elasticsearch_config = {
    "source": {
        "type": "elasticsearch",
        "config": {
            "host": "<https://xxxx>",
            "username": "xxx",
            "password": "xxx",
            "platform_instance": "c1one_customer_data",
            "env": "PROD",
            "use_ssl": True,
            "verify_certs": False,
            "index_pattern": {"allow": ["watch-syslog-2024-*"], "deny": ["partial-*", r"^\..*"]},
            "ingest_index_templates": True,
            "index_template_pattern": {"allow": ["customer-syslog-template"]},
        },
    },
    "sink": {"type": "datahub-rest", "config": {"server": "<http://xxxx:8080>"}},
}

# Create and run Elasticsearch ingestion pipeline
elasticsearch_pipeline = Pipeline.create(elasticsearch_config)
elasticsearch_pipeline.run()
elasticsearch_pipeline.raise_from_status()
print("Elasticsearch ingestion pipeline completed successfully.")

# ===============================
# Elasticsearch Connection Setup
# ===============================
es_host = "xxx"
es_port = xxxx
es_user = "xxx"
es_password = "xxx"

# Connect to Elasticsearch
es = Elasticsearch(
    [f"https://{es_host}:{es_port}"], http_auth=(es_user, es_password), verify_certs=False
)

# Retrieve all indices
all_indices = es.indices.get_alias().keys()

# Define index patterns of interest
index_patterns = [r"watch-syslog-2024-\d{2}"]

# Filter relevant indices based on patterns
relevant_indices = [
    index for index in all_indices if any(re.match(pattern, index) for pattern in index_patterns)
]

print(f"Relevant indices for lineage: {relevant_indices}")

# ===============================
# MySQL Upstream Details
# ===============================
mysql_platform = 'mysql'
mysql_platform_instance = 'c1customer_info'
mysql_database = 'xxx'
mysql_table = 'CUSTOMER'
mysql_field = 'OGCC_CODE'

# Construct MySQL dataset URN
mysql_dataset_urn = builder.make_dataset_urn(
    platform=mysql_platform, name=f"{mysql_platform_instance}.{mysql_database}.{mysql_table}"
)

print(f"MySQL Dataset URN: {mysql_dataset_urn}")

# ===============================
# DataHub Emitter Setup
# ===============================
emitter = DatahubRestEmitter("<http://xxxx:8080>")

# ===============================
# Lineage Emission Loop
# ===============================
for es_index in relevant_indices:
    # Construct Elasticsearch dataset URN
    es_dataset_urn = builder.make_dataset_urn(
        platform="elasticsearch", name=f"c1one_customer_data.{es_index}", env="PROD"
    )
    print(f"Processing Elasticsearch Index: {es_index}")
    print(f"Elasticsearch Dataset URN: {es_dataset_urn}")

    # Define Upstream for dataset-level lineage
    upstream = Upstream(dataset=mysql_dataset_urn, type=DatasetLineageType.TRANSFORMED)

    # Create UpstreamLineage object
    upstreamLineage = UpstreamLineage(upstreams=[upstream])

    # Create FineGrainedLineage object for column-level lineage
    fineGrainedLineage = FineGrainedLineage(
        upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
        upstreams=[builder.make_schema_field_urn(mysql_dataset_urn, mysql_field)],
        downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
        downstreams=[builder.make_schema_field_urn(es_dataset_urn, 'og_customercode')],
    )
    
    # Add fine-grained lineage to upstream lineage object
    upstreamLineage.fineGrainedLineages = [fineGrainedLineage]

    # Define default dataset properties with a meaningful description 
    dataset_properties = DatasetPropertiesClass(description="Generated Elasticsearch dataset")

    # Create MetadataChangeProposals for lineage and properties
    lineageMcp = MetadataChangeProposalWrapper(entityUrn=es_dataset_urn, aspect=upstreamLineage)
    propertiesMcp = MetadataChangeProposalWrapper(entityUrn=es_dataset_urn, aspect=dataset_properties)

    # Emit the lineage and properties MCPs with error handling and logging 
    try:
        emitter.emit_mcp(lineageMcp)
        emitter.emit_mcp(propertiesMcp)
        print(f"Successfully emitted lineage and properties for index: {es_index}")
    except Exception as e:
        print(f"Error emitting lineage for index {es_index}: {e}")

print("Lineage and properties emission completed for all relevant indices.")```

Hey there! :wave:

<@U06TM7M40RK> Can you help with the questions in this thread?

:information_source: Please continue to tag RunLLM (e.g <@U06TM7M40RK>) to have it continue generating responses!

To address the issue of not seeing column-level lineage in DataHub after running your code, let’s go through some potential areas to check and ensure everything is set up correctly:

  1. Configuration and Setup:

    • Ensure that the DataHub server URL is correctly specified in the DatahubRestEmitter. The URL should be accessible and correctly configured to receive metadata.
    • Verify that the Elasticsearch and MySQL configurations are correct and that the ingestion pipelines are running without errors.
  2. Lineage Emission:

    • Check that the FineGrainedLineage object is correctly constructed. Ensure that the upstreams and downstreams fields are correctly specified with the appropriate URNs.
    • Make sure that the emit_mcp method is successfully sending the metadata change proposals to the DataHub server. Check for any exceptions or errors in the logs.
  3. DataHub UI:

    • After emitting the lineage, verify in the DataHub UI that the datasets and their respective lineages are visible. Sometimes, it might take a moment for the changes to reflect in the UI.
  4. Debugging:

    • Add additional logging to ensure that each step of the process is executing as expected. This can help identify where the process might be failing.
    • Check the DataHub server logs for any errors or warnings that might indicate issues with receiving or processing the lineage data.
  5. Documentation and Examples:

If you have verified these steps and still do not see the expected lineage, it might be helpful to provide more specific error messages or logs for further troubleshooting.

Would you like more detailed steps or further assistance with specific configurations?

Sources:

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><