<@U03MF8MU5P0> i am currently running this code to see the column level lineage from database to elasticsearch index but when the code is completed, i don’t see the lineage. What i am doing wrong.
from elasticsearch import Elasticsearch
import re
from datahub.ingestion.run.pipeline import Pipeline
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
Upstream,
UpstreamLineage,
FineGrainedLineage,
FineGrainedLineageUpstreamType,
FineGrainedLineageDownstreamType,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass
# Disable the InsecureRequestWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ===============================
# MySQL Ingestion Pipeline
# ===============================
mysql_config = {
"pipeline_name": "alarmdb_mysql_ingest_pipeline",
"source": {
"type": "mysql",
"config": {
"host_port": "host",
"database": "xxx",
"username": "xxx",
"password": "xxx",
"platform_instance": "c1customer_info",
"include_tables": True,
"include_views": False,
"profiling": {"enabled": True, "profile_table_level_only": True},
"stateful_ingestion": {"enabled": True},
"env": "PROD",
},
},
"sink": {"type": "datahub-rest", "config": {"server": "<http://xxxx:8080>"}},
}
# Create and run MySQL ingestion pipeline
mysql_pipeline = Pipeline.create(mysql_config)
mysql_pipeline.run()
mysql_pipeline.raise_from_status()
print("MySQL ingestion pipeline completed successfully.")
# ===============================
# Elasticsearch Ingestion Pipeline
# ===============================
elasticsearch_config = {
"source": {
"type": "elasticsearch",
"config": {
"host": "<https://xxxx>",
"username": "xxx",
"password": "xxx",
"platform_instance": "c1one_customer_data",
"env": "PROD",
"use_ssl": True,
"verify_certs": False,
"index_pattern": {"allow": ["watch-syslog-2024-*"], "deny": ["partial-*", r"^\..*"]},
"ingest_index_templates": True,
"index_template_pattern": {"allow": ["customer-syslog-template"]},
},
},
"sink": {"type": "datahub-rest", "config": {"server": "<http://xxxx:8080>"}},
}
# Create and run Elasticsearch ingestion pipeline
elasticsearch_pipeline = Pipeline.create(elasticsearch_config)
elasticsearch_pipeline.run()
elasticsearch_pipeline.raise_from_status()
print("Elasticsearch ingestion pipeline completed successfully.")
# ===============================
# Elasticsearch Connection Setup
# ===============================
es_host = "xxx"
es_port = xxxx
es_user = "xxx"
es_password = "xxx"
# Connect to Elasticsearch
es = Elasticsearch(
[f"https://{es_host}:{es_port}"], http_auth=(es_user, es_password), verify_certs=False
)
# Retrieve all indices
all_indices = es.indices.get_alias().keys()
# Define index patterns of interest
index_patterns = [r"watch-syslog-2024-\d{2}"]
# Filter relevant indices based on patterns
relevant_indices = [
index for index in all_indices if any(re.match(pattern, index) for pattern in index_patterns)
]
print(f"Relevant indices for lineage: {relevant_indices}")
# ===============================
# MySQL Upstream Details
# ===============================
mysql_platform = 'mysql'
mysql_platform_instance = 'c1customer_info'
mysql_database = 'xxx'
mysql_table = 'CUSTOMER'
mysql_field = 'OGCC_CODE'
# Construct MySQL dataset URN
mysql_dataset_urn = builder.make_dataset_urn(
platform=mysql_platform, name=f"{mysql_platform_instance}.{mysql_database}.{mysql_table}"
)
print(f"MySQL Dataset URN: {mysql_dataset_urn}")
# ===============================
# DataHub Emitter Setup
# ===============================
emitter = DatahubRestEmitter("<http://xxxx:8080>")
# ===============================
# Lineage Emission Loop
# ===============================
for es_index in relevant_indices:
# Construct Elasticsearch dataset URN
es_dataset_urn = builder.make_dataset_urn(
platform="elasticsearch", name=f"c1one_customer_data.{es_index}", env="PROD"
)
print(f"Processing Elasticsearch Index: {es_index}")
print(f"Elasticsearch Dataset URN: {es_dataset_urn}")
# Define Upstream for dataset-level lineage
upstream = Upstream(dataset=mysql_dataset_urn, type=DatasetLineageType.TRANSFORMED)
# Create UpstreamLineage object
upstreamLineage = UpstreamLineage(upstreams=[upstream])
# Create FineGrainedLineage object for column-level lineage
fineGrainedLineage = FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[builder.make_schema_field_urn(mysql_dataset_urn, mysql_field)],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[builder.make_schema_field_urn(es_dataset_urn, 'og_customercode')],
)
# Add fine-grained lineage to upstream lineage object
upstreamLineage.fineGrainedLineages = [fineGrainedLineage]
# Define default dataset properties with a meaningful description
dataset_properties = DatasetPropertiesClass(description="Generated Elasticsearch dataset")
# Create MetadataChangeProposals for lineage and properties
lineageMcp = MetadataChangeProposalWrapper(entityUrn=es_dataset_urn, aspect=upstreamLineage)
propertiesMcp = MetadataChangeProposalWrapper(entityUrn=es_dataset_urn, aspect=dataset_properties)
# Emit the lineage and properties MCPs with error handling and logging
try:
emitter.emit_mcp(lineageMcp)
emitter.emit_mcp(propertiesMcp)
print(f"Successfully emitted lineage and properties for index: {es_index}")
except Exception as e:
print(f"Error emitting lineage for index {es_index}: {e}")
print("Lineage and properties emission completed for all relevant indices.")```