Troubleshooting SQL Lineage and UpstreamClass Issues in DataHub

Original Slack Thread

<@U06TM7M40RK> datahub 0.14.0.4를 사용하고 있는데, source타입이 sql-queries를 사용해서, upstreamLineage가 생겼는데, upstreamClass에, query는 나오지 않는다. 왜 이런가?

To address your issue with the sql-queries source type in DataHub 0.14.0.4, where the upstreamLineage is created but the query is not appearing in the upstreamClass, let’s break down the possible reasons and solutions.

Possible Reasons:

  1. Query Parsing Issues: The SQL parser might not be correctly parsing the query, leading to incomplete lineage information.
  2. Configuration Issues: There might be a misconfiguration in your ingestion recipe that is causing the query details to be omitted.
  3. Schema Resolution: If the schema resolution is not correctly configured, it might affect the completeness of the lineage information.

Steps to Troubleshoot:

  1. Check SQL Query Syntax:
    Ensure that your SQL queries are syntactically correct and adhere to the SQL dialect of the platform you are targeting.

  2. Review Ingestion Recipe:
    Verify that your ingestion recipe is correctly configured. Here is an example configuration for the sql-queries source:

    datahub_api:
      server: http://localhost:8080
      timeout_sec: 60
    source:
      type: sql-queries
      config:
        platform: "mysql"  # Change to your platform
        default_db: "your_database"
        query_file: "/path/to/your/queries.json"
    
  3. Enable Schema Resolution:
    Ensure that the use_schema_resolver is enabled in your configuration. This helps in resolving the schema information which is crucial for accurate lineage extraction.

  4. Check the Query File Format:
    Ensure that your query file follows the correct format. Each line should be a separate JSON object with the required fields. Here is an example:

    {
      "query": "INSERT INTO A(col1) SELECT abc FROM B;",
      "timestamp": 1689232738.051,
      "user": "user_a",
      "downstream_tables": ["your_database.your_schema.A"],
      "upstream_tables": ["your_database.your_schema.B"]
    }
    
  5. Run in Debug Mode:
    Run the ingestion in debug mode to get detailed logs. This can help identify if there were any issues during the lineage extraction process.

Example Ingestion Command:

datahub ingest -c /path/to/your/recipe.yaml --verbose

Example Code Snippet for Manual Lineage Emission:
If the automatic parsing is not working as expected, you can manually emit lineage using the DataHub Python SDK:

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
    DatasetLineageTypeClass,
    UpstreamClass,
    UpstreamLineageClass,
)

# Initialize the DataHub emitter
emitter = DatahubRestEmitter("<http://localhost:8080>")

# Define the source and target datasets
source_dataset_urn = make_dataset_urn(platform="mysql", name="your_database.your_schema.B", env="PROD")
target_dataset_urn = make_dataset_urn(platform="mysql", name="your_database.your_schema.A", env="PROD")

# Create the lineage relationship
upstream = UpstreamClass(
    dataset=source_dataset_urn,
    type=DatasetLineageTypeClass.TRANSFORMED,
)

lineage = UpstreamLineageClass(
    upstreams=[upstream],
)

# Emit the lineage to DataHub
emitter.emit_mce(
    {
        "entityType": "dataset",
        "entityUrn": target_dataset_urn,
        "aspectName": "upstreamLineage",
        "aspect": lineage,
    }
)

Would you like more detailed steps on any specific part of this process?

Sources:

<@U06TM7M40RK>

           result,
           query=entry.query,
           query_timestamp=entry.timestamp,```
여기에서 query가 추가 되는데, 왜 sql_parsing_builder.py에서는 누락될까?

The issue you’re encountering seems to be related to the handling of the query parameter in the process_sql_parsing_result method within the sql_parsing_builder.py file. Let’s break down the relevant parts of the code to understand why the query might be missing in the upstreamClass.

Code Analysis

  1. Ingestion Source Code (sql_queries.py):
    The query is passed to the process_sql_parsing_result method:

    yield from self.builder.process_sql_parsing_result(
        result,
        query=entry.query,
        query_timestamp=entry.timestamp,
    )
    
  2. Processing in sql_parsing_builder.py:
    The process_sql_parsing_result method in sql_parsing_builder.py is responsible for handling the parsed SQL result and generating the necessary work units.

    def process_sql_parsing_result(
        self,
        result: SqlParsingResult,
        *,
        query: str,
        query_timestamp: Optional[datetime] = None,
        is_view_ddl: bool = False,
        user: Optional[UserUrn] = None,
        custom_operation_type: Optional[str] = None,
        include_urns: Optional[Set[DatasetUrn]] = None,
        include_column_lineage: bool = True,
    ) -> Iterable[MetadataWorkUnit]:
        # Processing logic here
    

Possible Reasons for Missing query:

  1. Query Not Stored in Aspect:
    The query might not be stored in the aspect that is being emitted. Ensure that the query is included in the aspect being created and emitted.

  2. Aspect Emission:
    Verify that the aspect containing the query is correctly emitted and associated with the dataset URN.

Steps to Troubleshoot:

  1. Verify Aspect Creation:
    Ensure that the query is included in the aspect being created. For example, if you are creating a SqlParsingResultClass aspect, make sure the query is part of it.

    sql_parse_result_aspect = SqlParsingResultClass(
        query=query,
        queryType=result.query_type,
        queryTypeProps=result.query_type_props,
        queryFingerprint=result.query_fingerprint,
        inTables=result.in_tables,
        outTables=result.out_tables,
        columnLineage=result.column_lineage,
    )
    
  2. Check Aspect Emission:
    Ensure that the aspect is correctly emitted and associated with the dataset URN.

    mce = MetadataChangeEvent(
        proposedSnapshot=DatasetSnapshot(
            urn=dataset_urn,
            aspects=[sql_parse_result_aspect]
        )
    )
    yield MetadataWorkUnit(id=dataset_urn, mce=mce)
    

Example Code Snippet:

Here is an example of how to ensure the query is included in the aspect and emitted correctly:

def process_sql_parsing_result(
    self,
    result: SqlParsingResult,
    *,
    query: str,
    query_timestamp: Optional[datetime] = None,
    is_view_ddl: bool = False,
    user: Optional[UserUrn] = None,
    custom_operation_type: Optional[str] = None,
    include_urns: Optional[Set[DatasetUrn]] = None,
    include_column_lineage: bool = True,
) -&gt; Iterable[MetadataWorkUnit]:
    # Create the SqlParsingResultClass aspect
    sql_parse_result_aspect = SqlParsingResultClass(
        query=query,
        queryType=result.query_type,
        queryTypeProps=result.query_type_props,
        queryFingerprint=result.query_fingerprint,
        inTables=result.in_tables,
        outTables=result.out_tables,
        columnLineage=result.column_lineage,
    )

    # Emit the aspect
    for downstream_urn in result.out_tables:
        mce = MetadataChangeEvent(
            proposedSnapshot=DatasetSnapshot(
                urn=downstream_urn,
                aspects=[sql_parse_result_aspect]
            )
        )
        yield MetadataWorkUnit(id=downstream_urn, mce=mce)

Sources:

<@U06TM7M40RK>

class LineageEdge:
    """Stores information about a single lineage edge, from an upstream table to a downstream table."""

    downstream_urn: DatasetUrn
    upstream_urn: DatasetUrn
    audit_stamp: Optional[datetime]
    actor: Optional[UserUrn]
    type: str = DatasetLineageTypeClass.TRANSFORMED

    # Maps downstream_col -&gt; {upstream_col}
    column_map: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set))

    def gen_upstream_aspect(self) -&gt; UpstreamClass:
        return UpstreamClass(
            auditStamp=(
                AuditStampClass(
                    time=int(self.audit_stamp.timestamp() * 1000),
                    actor=self.actor or "",
                )
                if self.audit_stamp
                else None
            ),
            dataset=self.upstream_urn,
            type=self.type,
        )```
위, sql_parsing_builder.py인데, 여기서 gen_upstream_aspect에서 query가 반영되지 않는 것으로 보인다.

The issue you’re encountering is that the gen_upstream_aspect method in the LineageEdge class does not include the query field when generating the UpstreamClass aspect. To resolve this, you need to modify the gen_upstream_aspect method to include the query field.

Steps to Modify gen_upstream_aspect:

  1. Add a query Field to LineageEdge:
    Ensure that the LineageEdge class has a query field to store the query information.

  2. Modify gen_upstream_aspect Method:
    Update the gen_upstream_aspect method to include the query field when creating the UpstreamClass aspect.

Example Code:

Here is how you can modify the LineageEdge class and the gen_upstream_aspect method:

  1. Add query Field to LineageEdge:

    @dataclass
    class LineageEdge:
        """Stores information about a single lineage edge, from an upstream table to a downstream table."""
    
        downstream_urn: DatasetUrn
        upstream_urn: DatasetUrn
        audit_stamp: Optional[datetime]
        actor: Optional[UserUrn]
        type: str = DatasetLineageTypeClass.TRANSFORMED
        query: Optional[str] = None  # Add query field
    
        # Maps downstream_col -> {upstream_col}
        column_map: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set))
    
        def gen_upstream_aspect(self) -> UpstreamClass:
            return UpstreamClass(
                auditStamp=(
                    AuditStampClass(
                        time=int(self.audit_stamp.timestamp() * 1000),
                        actor=self.actor or "",
                    )
                    if self.audit_stamp
                    else None
                ),
                dataset=self.upstream_urn,
                type=self.type,
                query=self.query,  # Include query field
            )
    
  2. Update process_sql_parsing_result Method:
    Ensure that the query is passed to the LineageEdge instances.

   def process_sql_parsing_result(
       self,
       result: SqlParsingResult,
       *,
       query: str,
       query_timestamp: Optional[datetime] = None,
       is_view_ddl: bool = False,
       user: Optional[UserUrn] = None,
       custom_operation_type: Optional[str] = None,
       include_urns: Optional[Set[DatasetUrn]] = None,
       include_column_lineage: bool = True,
   ) -&gt; Iterable[MetadataWorkUnit]:
       downstreams_to_ingest = result.out_tables
       upstreams_to_ingest = result.in_tables
       if include_urns:
           logger.debug(f"Skipping urns {set(downstreams_to_ingest) - include_urns}")
           downstreams_to_ingest = list(set(downstreams_to_ingest) &amp; include_urns)
           upstreams_to_ingest = list(set(upstreams_to_ingest) &amp; include_urns)

       if self.generate_lineage:
           for downstream_urn in downstreams_to_ingest:
               # Set explicitly so that FileBackedDict registers any mutations
               self._lineage_map[downstream_urn] = _merge_lineage_data(
                   downstream_urn=downstream_urn,
                   upstream_urns=result.in_tables,
                   column_lineage=(
                       result.column_lineage if include_column_lineage else None
                   ),
                   upstream_edges=self._lineage_map.get(downstream_urn, {}),
                   query_timestamp=query_timestamp,
                   is_view_ddl=is_view_ddl,
                   user=user,
                   query=query,  # Pass the query to the lineage data
               )

       if self.generate_usage_statistics and query_timestamp is not None:
           upstream_fields = compute_upstream_fields(result)
           for upstream_urn in upstreams_to_ingest:
               self._usage_aggregator.aggregate_event(
                   resource=upstream_urn,
                   start_time=query_timestamp,
                   query=query,
                   user=user,
                   fields=sorted(upstream_fields.get(upstream_urn, [])),
               )

       if self.generate_operations and query_timestamp is not None:
           for downstream_urn in downstreams_to_ingest:
               yield from _gen_operation_workunit(
                   result,
                   downstream_urn=downstream_urn,
                   query_timestamp=query_timestamp,
                   user=user,
                   custom_operation_type=custom_operation_type,
               )
  1. Update _merge_lineage_data Function:
    Ensure that the query is included when merging lineage data.
   def _merge_lineage_data(
       downstream_urn: DatasetUrn,
       *,
       upstream_urns: Collection[DatasetUrn],
       column_lineage: Optional[List[ColumnLineageInfo]],
       upstream_edges: Dict[DatasetUrn, LineageEdge],
       query_timestamp: Optional[datetime],
       is_view_ddl: bool,
       user: Optional[UserUrn],
       query: Optional[str] = None,  # Add query parameter
   ) -&gt; Dict[str, LineageEdge]:
       for upstream_urn in upstream_urns:
           edge = upstream_edges.setdefault(
               upstream_urn,
               LineageEdge(
                   downstream_urn=downstream_urn,
                   upstream_urn=upstream_urn,
                   audit_stamp=query_timestamp,
                   actor=user,
                   type=(
                       DatasetLineageTypeClass.VIEW
                       if is_view_ddl
                       else DatasetLineageTypeClass.TRANSFORMED
                   ),
                   query=query,  # Set the query
               ),
           )
           if query_timestamp and (  # Use the most recent query
               edge.audit_stamp is None or query_timestamp &gt; edge.audit_stamp
           ):
               edge.audit_stamp = query_timestamp
               if user:
                   edge.actor = user
               edge.query = query  # Update the query

       # Note: Inefficient as we loop through all column_lineage entries for each downstream table
       for cl in column_lineage or []:
           if cl.downstream.table == downstream_urn:
               for upstream_column_info in cl.upstreams:
                   if upstream_column_info.table not in upstream_urns:
                       continue
                   column_map = upstream_edges[upstream_column_info.table].column_map
                   column_map[cl.downstream.column].add(upstream_column_info.column)

       return upstream_edges

By following these steps, you should be able to ensure that the query is included in the UpstreamClass aspect when generating lineage information.

Would you like more detailed steps or examples on any specific part of this process?

Sources:

<@U06TM7M40RK> upstreamClass에 추가정보를 넣고 싶으면 ?

To add additional information to the UpstreamClass in DataHub, you need to extend the UpstreamClass schema to include the new fields you want to add. This involves modifying the PDL (Pegasus Data Language) schema, updating the ingestion code to populate these fields, and ensuring that the DataHub backend and frontend can handle and display the new fields.

Steps to Add Additional Information to UpstreamClass:

  1. Modify the PDL Schema:
    Update the PDL file to include the new fields in the UpstreamClass.

  2. Update the Ingestion Code:
    Modify the ingestion code to populate the new fields in the UpstreamClass.

  3. Build and Deploy the Custom Model:
    Build and deploy the updated model to DataHub.

  4. Update the Frontend (Optional):
    If you want the new fields to be displayed in the DataHub UI, you may need to update the frontend code.

Step-by-Step Guide:

Step 1: Modify the PDL Schema

  1. Locate the PDL file that defines the UpstreamClass. This is typically found in the metadata-models directory.

  2. Add the new fields to the UpstreamClass. For example, let’s add a description field and a source field:

    namespace com.linkedin.metadata.aspect;
    
    import com.linkedin.common.urn.Urn;
    
    /**
     * Represents an upstream dataset in a lineage relationship.
     */
    @Aspect = {
      "name": "upstream"
    }
    record UpstreamClass {
      /**
       * The URN of the upstream dataset.
       */
      dataset: Urn
    
      /**
       * The type of lineage relationship.
       */
      type: string
    
      /**
       * The audit stamp for the lineage relationship.
       */
      auditStamp: optional AuditStamp
    
      /**
       * The query that generated the lineage relationship.
       */
      query: optional string
    
      /**
       * A description of the upstream dataset.
       */
      description: optional string  // New field
    
      /**
       * The source of the upstream dataset.
       */
      source: optional string  // New field
    }
    

Step 2: Update the Ingestion Code

  1. Modify the ingestion code to populate the new fields in the UpstreamClass. For example, update the gen_upstream_aspect method in the LineageEdge class:
   @dataclass
   class LineageEdge:
       """Stores information about a single lineage edge, from an upstream table to a downstream table."""

       downstream_urn: DatasetUrn
       upstream_urn: DatasetUrn
       audit_stamp: Optional[datetime]
       actor: Optional[UserUrn]
       type: str = DatasetLineageTypeClass.TRANSFORMED
       query: Optional[str] = None  # Add query field
       description: Optional[str] = None  # Add description field
       source: Optional[str] = None  # Add source field

       # Maps downstream_col -&gt; {upstream_col}
       column_map: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set))

       def gen_upstream_aspect(self) -&gt; UpstreamClass:
           return UpstreamClass(
               auditStamp=(
                   AuditStampClass(
                       time=int(self.audit_stamp.timestamp() * 1000),
                       actor=self.actor or "",
                   )
                   if self.audit_stamp
                   else None
               ),
               dataset=self.upstream_urn,
               type=self.type,
               query=self.query,  # Include query field
               description=self.description,  # Include description field
               source=self.source,  # Include source field
           )
  1. Ensure that the new fields are populated when creating LineageEdge instances.

Step 3: Build and Deploy the Custom Model

  1. Navigate to the metadata-models directory and build the custom model:

    cd metadata-models
    ./gradlew build
    
  2. Deploy the updated model to your DataHub instance:

    ./gradlew :metadata-models:modelDeploy
    

Step 4: Update the Frontend (Optional)

  1. If you want the new fields to be displayed in the DataHub UI, you may need to update the frontend code. This typically involves modifying the React components that render the lineage information.

  2. Locate the relevant React components in the datahub-frontend module and update them to display the new fields.

Example Code Snippet for Emitting Lineage with Additional Fields

Here is an example of how to emit lineage with the additional fields using the DataHub Python SDK:

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
    DatasetLineageTypeClass,
    UpstreamClass,
    UpstreamLineageClass,
    AuditStampClass
)
from datetime import datetime

# Initialize the DataHub emitter
emitter = DatahubRestEmitter("<http://localhost:8080>")

# Define the source and target datasets
source_dataset_urn = make_dataset_urn(platform="mysql", name="your_database.your_schema.B", env="PROD")
target_dataset_urn = make_dataset_urn(platform="mysql", name="your_database.your_schema.A", env="PROD")

# Create the lineage relationship with additional fields
upstream = UpstreamClass(
    dataset=source_dataset_urn,
    type=DatasetLineageTypeClass.TRANSFORMED,
    query="SELECT * FROM B",
    description="This is a description of the upstream dataset.",
    source="Generated from ETL job",
    auditStamp=AuditStampClass(
        time=int(datetime.now().timestamp() * 1000),
        actor="urn:li:corpuser:datahub"
    )
)

lineage = UpstreamLineageClass(
    upstreams=[upstream],
)

# Emit the lineage to DataHub
emitter.emit_mce(
    {
        "entityType": "dataset",
        "entityUrn": target_dataset_urn,
        "aspectName": "upstreamLineage",
        "aspect": lineage,
    }
)

Would you like more detailed steps or further assistance on any specific part of this process?

Sources: