<@U06TM7M40RK> I want to create lineage from the logic inside the S3 files. These S3 files have stored procedure logic inside. I have extracted source and target tables present under one of the stored procedure code file. Below is the sample :
'target_table_2': ['source_table_1', 'source_table_2',...],
'target_table_3': ['source_table_1']
}```
How can I show the lineage for this stored procedure using the above details.
To create lineage from the logic inside S3 files containing stored procedure logic, you can use the DataHub SDK to programmatically emit lineage information. Since you have already extracted the source and target tables, you can represent this lineage in DataHub by creating lineage edges between the datasets.
Here’s a step-by-step approach to achieve this:
Initialize the DataHub Emitter: Use the DataHub SDK to initialize an emitter that will send lineage data to your DataHub instance.
Create Dataset URNs: For each source and target table, create a unique dataset URN using the make_dataset_urn function provided by the SDK.
Define Lineage Relationships: For each target table, define the upstream datasets (source tables) that contribute to it. This is done by creating UpstreamClass objects for each source table.
Emit Lineage: Use the emitter to send the lineage information to DataHub.
Here is a sample code snippet to illustrate this process:
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import UpstreamClass, UpstreamLineageClass, DatasetLineageTypeClass
# Initialize the DataHub emitter
emitter = DatahubRestEmitter("http://localhost:8080")
# Sample lineage data
lineage_data = {
'target_table_1': ['source_table_1', 'source_table_2'],
'target_table_2': ['source_table_1', 'source_table_2'],
'target_table_3': ['source_table_1']
}
# Emit lineage for each target table
for target_table, source_tables in lineage_data.items():
target_urn = make_dataset_urn(platform="s3", name=target_table, env="PROD")
upstreams = [UpstreamClass(dataset=make_dataset_urn(platform="s3", name=source, env="PROD"), type=DatasetLineageTypeClass.TRANSFORMED) for source in source_tables]
lineage = UpstreamLineageClass(upstreams=upstreams)
# Emit the lineage to DataHub
emitter.emit_mce({
"entityType": "dataset",
"entityUrn": target_urn,
"aspectName": "upstreamLineage",
"aspect": lineage,
})
This code will create lineage relationships in DataHub, showing how each target table is derived from its respective source tables.
Would you like more detailed steps on any specific part of this process?
It seems likely that the error you’re encountering is due to the incorrect structure of the object being passed to the emit_mce method. The emit_mce method expects a MetadataChangeEvent object, not a dictionary.
To resolve this, you should construct a MetadataChangeEvent object properly. Here’s how you can modify the code to use the correct structure:
Create a DatasetSnapshot: This represents the state of the dataset, including its lineage.
Wrap it in a MetadataChangeEvent: This is the object that the emit_mce method expects.
Here’s the corrected code snippet:
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import UpstreamClass, UpstreamLineageClass, DatasetLineageTypeClass, DatasetSnapshotClass, MetadataChangeEventClass
# Initialize the DataHub emitter
emitter = DatahubRestEmitter("http://localhost:8080")
# Sample lineage data
lineage_data = {
'target_table_1': ['source_table_1', 'source_table_2'],
'target_table_2': ['source_table_1', 'source_table_2'],
'target_table_3': ['source_table_1']
}
# Emit lineage for each target table
for target_table, source_tables in lineage_data.items():
target_urn = make_dataset_urn(platform="s3", name=target_table, env="PROD")
upstreams = [UpstreamClass(dataset=make_dataset_urn(platform="s3", name=source, env="PROD"), type=DatasetLineageTypeClass.TRANSFORMED) for source in source_tables]
lineage = UpstreamLineageClass(upstreams=upstreams)
# Create a DatasetSnapshot
dataset_snapshot = DatasetSnapshotClass(
urn=target_urn,
aspects=[lineage]
)
# Create a MetadataChangeEvent
mce = MetadataChangeEventClass(proposedSnapshot=dataset_snapshot)
# Emit the lineage to DataHub
emitter.emit_mce(mce)
This should resolve the AttributeError by ensuring that the emit_mce method receives a MetadataChangeEvent object with the correct structure.
Would you like more detailed steps or further assistance on any specific part of this process?
<@U06TM7M40RK> i don’t see this error anymore, however, I am not able to see the lineage for the target. I can see target dataset on the UI but the sources are missing. There is no lineage captured in UI.
If you are not seeing the lineage in the DataHub UI, even though the target dataset is visible, there are a few potential reasons and troubleshooting steps you can consider:
Check Dataset Existence: Ensure that all source datasets are ingested into DataHub. Lineage will not be displayed if the source datasets do not exist in DataHub. According to <Slack Slack discussion>, lineage information may not be displayed if the datasets do not exist in DataHub.
Verify Lineage Data: Double-check that the lineage data is correctly structured and emitted. Ensure that the URNs for both source and target datasets are correctly formatted and match the datasets in DataHub.
Review Logs: Check the logs of the DataHub ingestion process and the DataHub backend services (GMS, frontend) for any errors or warnings related to lineage data processing. This can provide insights into any issues during the ingestion process.
Manual Lineage Addition: If automatic lineage extraction is not working, you can manually add lineage using the DataHub UI. As per the <https://datahubproject.io/docs/0.14.0/features/feature-guides/ui-lineage|DataHub UI Lineage Guide>, you can manually add or edit lineage edges in the Lineage Visualization screen or the Lineage tab on entity pages.
Ensure Correct Version Compatibility: Make sure that you are using compatible versions of DataHub and the ingestion framework. Sometimes, version mismatches can cause issues with specific features like lineage.
Would you like more detailed steps on any of these points or further assistance with specific configurations?
<@U06TM7M40RK> I used this sample code mentioned in this thread but unfortunately, I don’t see any datasets created for my data job. I can see only ‘data task’ under ‘airflow’ platform but under this data task i dont see anything. Also, there is no name for this data task.
Can you please answer, ideally what should be reflected as part of this sample code? and what could be the issue in my case?
Based on the sample code and your description, the expected outcome should be the creation of datasets and lineage relationships in DataHub. Specifically, you should see:
Datasets: The source and target tables should appear as datasets in DataHub.
Lineage: The lineage relationships between these datasets should be visible, showing how each target table is derived from its respective source tables.
Data Task: The data task (or data job) should be associated with the datasets, reflecting the transformation process.
If you are only seeing a “data task” under the “airflow” platform without any associated datasets or lineage, here are some potential issues and troubleshooting steps:
Dataset URNs: Ensure that the URNs for both source and target datasets are correctly formatted and match the datasets in DataHub. The URNs should be unique and consistent with the platform and environment specified.
Lineage Emission: Verify that the lineage is being emitted correctly. Check the logs of the DataHub emitter to ensure that the MetadataChangeEvent objects are being sent without errors.
DataHub Ingestion: Confirm that the DataHub ingestion process is running correctly and that there are no errors in the DataHub backend services (GMS, frontend). Check the logs for any warnings or errors related to dataset creation or lineage processing.
Airflow Plugin Configuration: If you are using the Airflow plugin, ensure that it is configured correctly. According to the <Airflow Integration | DataHub Lineage Plugin Guide>, the plugin should be set up to capture lineage information from Airflow tasks.
Version Compatibility: Ensure that you are using compatible versions of DataHub and the ingestion framework. Sometimes, version mismatches can cause issues with specific features like lineage.
Manual Verification: Manually verify that the datasets exist in DataHub. You can use the DataHub UI to search for the datasets and check their details.
Would you like more detailed steps on any of these points or further assistance with specific configurations?
Yes, the URNs for the input datasets should already exist in DataHub before running the code to create lineage. According to the <Slack Slack discussion>, if the datasets do not exist in DataHub, the lineage will not show up. This is because lineage information relies on the existence of the datasets to establish relationships between them.
Here are the steps you should follow:
Ingest Datasets: Ensure that all datasets (both source and target) are ingested into DataHub. This can be done using the DataHub ingestion framework or manually through the DataHub UI.
Verify Dataset Existence: Use the DataHub UI or API to verify that the datasets exist and have the correct URNs. This ensures that the lineage relationships can be established correctly.
Run Lineage Code: Once the datasets are confirmed to exist, you can run the lineage code to create the lineage relationships between the datasets.
If you need more detailed steps on how to ingest datasets or verify their existence, please let me know.
Yes, your understanding is correct. If the source and target tables are not created and ingested into DataHub beforehand, the lineage will not show up in the UI when you run the code. According to the <Slack Slack discussion>, lineage information relies on the existence of the datasets in DataHub to establish and display relationships between them.
Here’s a brief summary of the process:
Ingest Datasets: Before running the lineage code, ensure that all source and target datasets are ingested into DataHub. This can be done using the DataHub ingestion framework or manually through the DataHub UI.
Verify Dataset Existence: Confirm that the datasets exist in DataHub with the correct URNs. This is crucial for the lineage relationships to be established correctly.
Run Lineage Code: Once the datasets are confirmed to exist, you can run the lineage code to create and visualize the lineage relationships in the DataHub UI.
If you need further assistance on how to ingest datasets or verify their existence, feel free to ask.
Sources:
<Slack Slack Discussion> 0 button 0 button Hint: Mention @runllm in the thread for followups.
If the source and target tables are not created beforehand and you want to show their lineage in DataHub, you can modify the code to create placeholder datasets for these tables. This involves emitting metadata for the datasets themselves before establishing the lineage relationships. Here’s how you can do it:
Create Placeholder Datasets: Emit metadata for the source and target datasets to create them in DataHub. This involves creating DatasetSnapshot objects for each dataset.
Emit Lineage: Once the datasets are created, you can emit the lineage information as before.
Here’s an updated version of the code to include the creation of placeholder datasets:
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
UpstreamClass,
UpstreamLineageClass,
DatasetLineageTypeClass,
DatasetSnapshotClass,
MetadataChangeEventClass,
DatasetPropertiesClass
)
# Initialize the DataHub emitter
emitter = DatahubRestEmitter("http://localhost:8080")
# Sample lineage data
lineage_data = {
'target_table_1': ['source_table_1', 'source_table_2'],
'target_table_2': ['source_table_1', 'source_table_2'],
'target_table_3': ['source_table_1']
}
# Function to create a dataset
def create_dataset(urn):
dataset_snapshot = DatasetSnapshotClass(
urn=urn,
aspects=[
DatasetPropertiesClass(description="Placeholder dataset")
]
)
mce = MetadataChangeEventClass(proposedSnapshot=dataset_snapshot)
emitter.emit_mce(mce)
# Create datasets and emit lineage
for target_table, source_tables in lineage_data.items():
target_urn = make_dataset_urn(platform="s3", name=target_table, env="PROD")
create_dataset(target_urn)
upstreams = []
for source in source_tables:
source_urn = make_dataset_urn(platform="s3", name=source, env="PROD")
create_dataset(source_urn)
upstreams.append(UpstreamClass(dataset=source_urn, type=DatasetLineageTypeClass.TRANSFORMED))
lineage = UpstreamLineageClass(upstreams=upstreams)
# Emit the lineage to DataHub
lineage_mce = MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn=target_urn,
aspects=[lineage]
)
)
emitter.emit_mce(lineage_mce)
Create Placeholder Datasets: The create_dataset function is used to emit metadata for each dataset, creating it in DataHub if it doesn’t already exist.
Emit Lineage: After creating the datasets, the lineage is emitted as before.
This approach ensures that both the datasets and their lineage are visible in DataHub, even if the datasets were not pre-existing.
Would you like more detailed steps or further assistance with this process?
To handle the scenario where both the source and target datasets are not present beforehand, and to accommodate re-runs where they might already exist, you can modify the code to first check for the existence of the datasets before creating them. This ensures that datasets are only created if they do not already exist, preventing duplicate entries.
Here’s how you can modify the code to handle both scenarios:
Check Dataset Existence: Before creating a dataset, check if it already exists in DataHub. If it does not exist, create it.
Create Datasets and Emit Lineage: For each target and source table, ensure they are created if not already present, and then emit the lineage.