<@U06TM7M40RK> datahub 0.14.0.4를 사용하고 있는데, source타입이 sql-queries를 사용해서, upstreamLineage가 생겼는데, upstreamClass에, query는 나오지 않는다. 왜 이런가?
To address your issue with the sql-queries
source type in DataHub 0.14.0.4, where the upstreamLineage
is created but the query
is not appearing in the upstreamClass
, let’s break down the possible reasons and solutions.
Possible Reasons:
- Query Parsing Issues: The SQL parser might not be correctly parsing the query, leading to incomplete lineage information.
- Configuration Issues: There might be a misconfiguration in your ingestion recipe that is causing the query details to be omitted.
- Schema Resolution: If the schema resolution is not correctly configured, it might affect the completeness of the lineage information.
Steps to Troubleshoot:
-
Check SQL Query Syntax:
Ensure that your SQL queries are syntactically correct and adhere to the SQL dialect of the platform you are targeting. -
Review Ingestion Recipe:
Verify that your ingestion recipe is correctly configured. Here is an example configuration for thesql-queries
source:datahub_api: server: http://localhost:8080 timeout_sec: 60 source: type: sql-queries config: platform: "mysql" # Change to your platform default_db: "your_database" query_file: "/path/to/your/queries.json"
-
Enable Schema Resolution:
Ensure that theuse_schema_resolver
is enabled in your configuration. This helps in resolving the schema information which is crucial for accurate lineage extraction. -
Check the Query File Format:
Ensure that your query file follows the correct format. Each line should be a separate JSON object with the required fields. Here is an example:{ "query": "INSERT INTO A(col1) SELECT abc FROM B;", "timestamp": 1689232738.051, "user": "user_a", "downstream_tables": ["your_database.your_schema.A"], "upstream_tables": ["your_database.your_schema.B"] }
-
Run in Debug Mode:
Run the ingestion in debug mode to get detailed logs. This can help identify if there were any issues during the lineage extraction process.
Example Ingestion Command:
datahub ingest -c /path/to/your/recipe.yaml --verbose
Example Code Snippet for Manual Lineage Emission:
If the automatic parsing is not working as expected, you can manually emit lineage using the DataHub Python SDK:
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
UpstreamClass,
UpstreamLineageClass,
)
# Initialize the DataHub emitter
emitter = DatahubRestEmitter("<http://localhost:8080>")
# Define the source and target datasets
source_dataset_urn = make_dataset_urn(platform="mysql", name="your_database.your_schema.B", env="PROD")
target_dataset_urn = make_dataset_urn(platform="mysql", name="your_database.your_schema.A", env="PROD")
# Create the lineage relationship
upstream = UpstreamClass(
dataset=source_dataset_urn,
type=DatasetLineageTypeClass.TRANSFORMED,
)
lineage = UpstreamLineageClass(
upstreams=[upstream],
)
# Emit the lineage to DataHub
emitter.emit_mce(
{
"entityType": "dataset",
"entityUrn": target_dataset_urn,
"aspectName": "upstreamLineage",
"aspect": lineage,
}
)
Would you like more detailed steps on any specific part of this process?
Sources:
- <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql_queries.py|DataHub SQL Parsing and Lineage> 0 button 0 button Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK>
result,
query=entry.query,
query_timestamp=entry.timestamp,```
여기에서 query가 추가 되는데, 왜 sql_parsing_builder.py에서는 누락될까?
The issue you’re encountering seems to be related to the handling of the query
parameter in the process_sql_parsing_result
method within the sql_parsing_builder.py
file. Let’s break down the relevant parts of the code to understand why the query
might be missing in the upstreamClass
.
Code Analysis
-
Ingestion Source Code (
sql_queries.py
):
Thequery
is passed to theprocess_sql_parsing_result
method:yield from self.builder.process_sql_parsing_result( result, query=entry.query, query_timestamp=entry.timestamp, )
-
Processing in
sql_parsing_builder.py
:
Theprocess_sql_parsing_result
method insql_parsing_builder.py
is responsible for handling the parsed SQL result and generating the necessary work units.def process_sql_parsing_result( self, result: SqlParsingResult, *, query: str, query_timestamp: Optional[datetime] = None, is_view_ddl: bool = False, user: Optional[UserUrn] = None, custom_operation_type: Optional[str] = None, include_urns: Optional[Set[DatasetUrn]] = None, include_column_lineage: bool = True, ) -> Iterable[MetadataWorkUnit]: # Processing logic here
Possible Reasons for Missing query
:
-
Query Not Stored in Aspect:
Thequery
might not be stored in the aspect that is being emitted. Ensure that thequery
is included in the aspect being created and emitted. -
Aspect Emission:
Verify that the aspect containing thequery
is correctly emitted and associated with the dataset URN.
Steps to Troubleshoot:
-
Verify Aspect Creation:
Ensure that thequery
is included in the aspect being created. For example, if you are creating aSqlParsingResultClass
aspect, make sure thequery
is part of it.sql_parse_result_aspect = SqlParsingResultClass( query=query, queryType=result.query_type, queryTypeProps=result.query_type_props, queryFingerprint=result.query_fingerprint, inTables=result.in_tables, outTables=result.out_tables, columnLineage=result.column_lineage, )
-
Check Aspect Emission:
Ensure that the aspect is correctly emitted and associated with the dataset URN.mce = MetadataChangeEvent( proposedSnapshot=DatasetSnapshot( urn=dataset_urn, aspects=[sql_parse_result_aspect] ) ) yield MetadataWorkUnit(id=dataset_urn, mce=mce)
Example Code Snippet:
Here is an example of how to ensure the query
is included in the aspect and emitted correctly:
def process_sql_parsing_result(
self,
result: SqlParsingResult,
*,
query: str,
query_timestamp: Optional[datetime] = None,
is_view_ddl: bool = False,
user: Optional[UserUrn] = None,
custom_operation_type: Optional[str] = None,
include_urns: Optional[Set[DatasetUrn]] = None,
include_column_lineage: bool = True,
) -> Iterable[MetadataWorkUnit]:
# Create the SqlParsingResultClass aspect
sql_parse_result_aspect = SqlParsingResultClass(
query=query,
queryType=result.query_type,
queryTypeProps=result.query_type_props,
queryFingerprint=result.query_fingerprint,
inTables=result.in_tables,
outTables=result.out_tables,
columnLineage=result.column_lineage,
)
# Emit the aspect
for downstream_urn in result.out_tables:
mce = MetadataChangeEvent(
proposedSnapshot=DatasetSnapshot(
urn=downstream_urn,
aspects=[sql_parse_result_aspect]
)
)
yield MetadataWorkUnit(id=downstream_urn, mce=mce)
Sources:
- https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py|sql_parsing_builder.py
- https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql_queries.py|sql_queries.py 0 button 0 button Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK>
class LineageEdge:
"""Stores information about a single lineage edge, from an upstream table to a downstream table."""
downstream_urn: DatasetUrn
upstream_urn: DatasetUrn
audit_stamp: Optional[datetime]
actor: Optional[UserUrn]
type: str = DatasetLineageTypeClass.TRANSFORMED
# Maps downstream_col -> {upstream_col}
column_map: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set))
def gen_upstream_aspect(self) -> UpstreamClass:
return UpstreamClass(
auditStamp=(
AuditStampClass(
time=int(self.audit_stamp.timestamp() * 1000),
actor=self.actor or "",
)
if self.audit_stamp
else None
),
dataset=self.upstream_urn,
type=self.type,
)```
위, sql_parsing_builder.py인데, 여기서 gen_upstream_aspect에서 query가 반영되지 않는 것으로 보인다.
The issue you’re encountering is that the gen_upstream_aspect
method in the LineageEdge
class does not include the query
field when generating the UpstreamClass
aspect. To resolve this, you need to modify the gen_upstream_aspect
method to include the query
field.
Steps to Modify gen_upstream_aspect
:
-
Add a
query
Field toLineageEdge
:
Ensure that theLineageEdge
class has aquery
field to store the query information. -
Modify
gen_upstream_aspect
Method:
Update thegen_upstream_aspect
method to include thequery
field when creating theUpstreamClass
aspect.
Example Code:
Here is how you can modify the LineageEdge
class and the gen_upstream_aspect
method:
-
Add
query
Field toLineageEdge
:@dataclass class LineageEdge: """Stores information about a single lineage edge, from an upstream table to a downstream table.""" downstream_urn: DatasetUrn upstream_urn: DatasetUrn audit_stamp: Optional[datetime] actor: Optional[UserUrn] type: str = DatasetLineageTypeClass.TRANSFORMED query: Optional[str] = None # Add query field # Maps downstream_col -> {upstream_col} column_map: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set)) def gen_upstream_aspect(self) -> UpstreamClass: return UpstreamClass( auditStamp=( AuditStampClass( time=int(self.audit_stamp.timestamp() * 1000), actor=self.actor or "", ) if self.audit_stamp else None ), dataset=self.upstream_urn, type=self.type, query=self.query, # Include query field )
-
Update
process_sql_parsing_result
Method:
Ensure that thequery
is passed to theLineageEdge
instances.
def process_sql_parsing_result(
self,
result: SqlParsingResult,
*,
query: str,
query_timestamp: Optional[datetime] = None,
is_view_ddl: bool = False,
user: Optional[UserUrn] = None,
custom_operation_type: Optional[str] = None,
include_urns: Optional[Set[DatasetUrn]] = None,
include_column_lineage: bool = True,
) -> Iterable[MetadataWorkUnit]:
downstreams_to_ingest = result.out_tables
upstreams_to_ingest = result.in_tables
if include_urns:
logger.debug(f"Skipping urns {set(downstreams_to_ingest) - include_urns}")
downstreams_to_ingest = list(set(downstreams_to_ingest) & include_urns)
upstreams_to_ingest = list(set(upstreams_to_ingest) & include_urns)
if self.generate_lineage:
for downstream_urn in downstreams_to_ingest:
# Set explicitly so that FileBackedDict registers any mutations
self._lineage_map[downstream_urn] = _merge_lineage_data(
downstream_urn=downstream_urn,
upstream_urns=result.in_tables,
column_lineage=(
result.column_lineage if include_column_lineage else None
),
upstream_edges=self._lineage_map.get(downstream_urn, {}),
query_timestamp=query_timestamp,
is_view_ddl=is_view_ddl,
user=user,
query=query, # Pass the query to the lineage data
)
if self.generate_usage_statistics and query_timestamp is not None:
upstream_fields = compute_upstream_fields(result)
for upstream_urn in upstreams_to_ingest:
self._usage_aggregator.aggregate_event(
resource=upstream_urn,
start_time=query_timestamp,
query=query,
user=user,
fields=sorted(upstream_fields.get(upstream_urn, [])),
)
if self.generate_operations and query_timestamp is not None:
for downstream_urn in downstreams_to_ingest:
yield from _gen_operation_workunit(
result,
downstream_urn=downstream_urn,
query_timestamp=query_timestamp,
user=user,
custom_operation_type=custom_operation_type,
)
- Update
_merge_lineage_data
Function:
Ensure that thequery
is included when merging lineage data.
def _merge_lineage_data(
downstream_urn: DatasetUrn,
*,
upstream_urns: Collection[DatasetUrn],
column_lineage: Optional[List[ColumnLineageInfo]],
upstream_edges: Dict[DatasetUrn, LineageEdge],
query_timestamp: Optional[datetime],
is_view_ddl: bool,
user: Optional[UserUrn],
query: Optional[str] = None, # Add query parameter
) -> Dict[str, LineageEdge]:
for upstream_urn in upstream_urns:
edge = upstream_edges.setdefault(
upstream_urn,
LineageEdge(
downstream_urn=downstream_urn,
upstream_urn=upstream_urn,
audit_stamp=query_timestamp,
actor=user,
type=(
DatasetLineageTypeClass.VIEW
if is_view_ddl
else DatasetLineageTypeClass.TRANSFORMED
),
query=query, # Set the query
),
)
if query_timestamp and ( # Use the most recent query
edge.audit_stamp is None or query_timestamp > edge.audit_stamp
):
edge.audit_stamp = query_timestamp
if user:
edge.actor = user
edge.query = query # Update the query
# Note: Inefficient as we loop through all column_lineage entries for each downstream table
for cl in column_lineage or []:
if cl.downstream.table == downstream_urn:
for upstream_column_info in cl.upstreams:
if upstream_column_info.table not in upstream_urns:
continue
column_map = upstream_edges[upstream_column_info.table].column_map
column_map[cl.downstream.column].add(upstream_column_info.column)
return upstream_edges
By following these steps, you should be able to ensure that the query
is included in the UpstreamClass
aspect when generating lineage information.
Would you like more detailed steps or examples on any specific part of this process?
Sources:
- https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py|sql_parsing_builder.py 0 button 0 button Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> upstreamClass에 추가정보를 넣고 싶으면 ?
To add additional information to the UpstreamClass
in DataHub, you need to extend the UpstreamClass
schema to include the new fields you want to add. This involves modifying the PDL (Pegasus Data Language) schema, updating the ingestion code to populate these fields, and ensuring that the DataHub backend and frontend can handle and display the new fields.
Steps to Add Additional Information to UpstreamClass
:
-
Modify the PDL Schema:
Update the PDL file to include the new fields in theUpstreamClass
. -
Update the Ingestion Code:
Modify the ingestion code to populate the new fields in theUpstreamClass
. -
Build and Deploy the Custom Model:
Build and deploy the updated model to DataHub. -
Update the Frontend (Optional):
If you want the new fields to be displayed in the DataHub UI, you may need to update the frontend code.
Step-by-Step Guide:
Step 1: Modify the PDL Schema
-
Locate the PDL file that defines the
UpstreamClass
. This is typically found in themetadata-models
directory. -
Add the new fields to the
UpstreamClass
. For example, let’s add adescription
field and asource
field:namespace com.linkedin.metadata.aspect; import com.linkedin.common.urn.Urn; /** * Represents an upstream dataset in a lineage relationship. */ @Aspect = { "name": "upstream" } record UpstreamClass { /** * The URN of the upstream dataset. */ dataset: Urn /** * The type of lineage relationship. */ type: string /** * The audit stamp for the lineage relationship. */ auditStamp: optional AuditStamp /** * The query that generated the lineage relationship. */ query: optional string /** * A description of the upstream dataset. */ description: optional string // New field /** * The source of the upstream dataset. */ source: optional string // New field }
Step 2: Update the Ingestion Code
- Modify the ingestion code to populate the new fields in the
UpstreamClass
. For example, update thegen_upstream_aspect
method in theLineageEdge
class:
@dataclass
class LineageEdge:
"""Stores information about a single lineage edge, from an upstream table to a downstream table."""
downstream_urn: DatasetUrn
upstream_urn: DatasetUrn
audit_stamp: Optional[datetime]
actor: Optional[UserUrn]
type: str = DatasetLineageTypeClass.TRANSFORMED
query: Optional[str] = None # Add query field
description: Optional[str] = None # Add description field
source: Optional[str] = None # Add source field
# Maps downstream_col -> {upstream_col}
column_map: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set))
def gen_upstream_aspect(self) -> UpstreamClass:
return UpstreamClass(
auditStamp=(
AuditStampClass(
time=int(self.audit_stamp.timestamp() * 1000),
actor=self.actor or "",
)
if self.audit_stamp
else None
),
dataset=self.upstream_urn,
type=self.type,
query=self.query, # Include query field
description=self.description, # Include description field
source=self.source, # Include source field
)
- Ensure that the new fields are populated when creating
LineageEdge
instances.
Step 3: Build and Deploy the Custom Model
-
Navigate to the
metadata-models
directory and build the custom model:cd metadata-models ./gradlew build
-
Deploy the updated model to your DataHub instance:
./gradlew :metadata-models:modelDeploy
Step 4: Update the Frontend (Optional)
-
If you want the new fields to be displayed in the DataHub UI, you may need to update the frontend code. This typically involves modifying the React components that render the lineage information.
-
Locate the relevant React components in the
datahub-frontend
module and update them to display the new fields.
Example Code Snippet for Emitting Lineage with Additional Fields
Here is an example of how to emit lineage with the additional fields using the DataHub Python SDK:
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
UpstreamClass,
UpstreamLineageClass,
AuditStampClass
)
from datetime import datetime
# Initialize the DataHub emitter
emitter = DatahubRestEmitter("<http://localhost:8080>")
# Define the source and target datasets
source_dataset_urn = make_dataset_urn(platform="mysql", name="your_database.your_schema.B", env="PROD")
target_dataset_urn = make_dataset_urn(platform="mysql", name="your_database.your_schema.A", env="PROD")
# Create the lineage relationship with additional fields
upstream = UpstreamClass(
dataset=source_dataset_urn,
type=DatasetLineageTypeClass.TRANSFORMED,
query="SELECT * FROM B",
description="This is a description of the upstream dataset.",
source="Generated from ETL job",
auditStamp=AuditStampClass(
time=int(datetime.now().timestamp() * 1000),
actor="urn:li:corpuser:datahub"
)
)
lineage = UpstreamLineageClass(
upstreams=[upstream],
)
# Emit the lineage to DataHub
emitter.emit_mce(
{
"entityType": "dataset",
"entityUrn": target_dataset_urn,
"aspectName": "upstreamLineage",
"aspect": lineage,
}
)
Would you like more detailed steps or further assistance on any specific part of this process?
Sources:
- https://datahubspace.slack.com/archives/CUMUWQU66/p1720501030.807459|https://datahubspace.slack.com/archives/CUMUWQU66/p1720501030.807459
- https://datahubspace.slack.com/archives/CUMUWQU66/p1710322456.152259|https://datahubspace.slack.com/archives/CUMUWQU66/p1710322456.152259 0 button 0 button Hint: Mention @runllm in the thread for followups.