Issues with SQL Queries Lineage in DataHub 0.14.1 and GMS Interaction

Original Slack Thread

<@U06TM7M40RK> datahub 0.14.1을 사용하고 있는데, source종류는 sql-queries를 가지고 ingest하고 있어. 그런데, upstreamLineage가 생성될 때, sql-queries.json에 쿼리가 여러개 있는데, 동일 downstream에 대해서 마지막 쿼리의 upstream만 리니지로 표현되는 이유는?

It seems likely that the issue you’re experiencing with the sql-queries source in DataHub 0.14.1, where only the last query’s upstream is represented in the lineage for the same downstream, could be related to how the lineage is being aggregated or overwritten during the ingestion process.

Based on the <https://datahubproject.io/docs/0.14.0/generated/ingestion/sources/sql-queries|DataHub SQL Queries documentation>, the sql-queries source reads a newline-delimited JSON file containing SQL queries and parses them to generate lineage. Each query in the file is processed to determine its upstream and downstream tables.

Here are a few potential reasons and steps to consider:

  1. Aggregation Logic: The ingestion process might be aggregating lineage data in a way that only retains the last query’s upstream for a given downstream. This could be due to how the lineage data is being stored or updated in the metadata store.

  2. Configuration Settings: Check if there are any configuration settings in your ingestion recipe that might affect how lineage is aggregated or stored. Ensure that the settings allow for multiple upstreams to be recorded for the same downstream.

  3. Debugging and Logs: Run the ingestion process in debug mode to get detailed logs. This can help identify if there are any issues during the lineage extraction or aggregation process.

  4. Manual Verification: Verify the contents of your sql-queries.json file to ensure that the queries are correctly formatted and that the upstream and downstream tables are specified as expected.

  5. Code Review: If you have access to the ingestion code, review the logic that processes the SQL queries and constructs the lineage. Look for any logic that might overwrite previous upstreams for the same downstream.

Would you like more detailed steps or examples on any specific part of this process?

Sources:

<@U06TM7M40RK> 위 내용을 한글로 설명해줘.

DataHub 0.14.1에서 sql-queries 소스를 사용하여 데이터를 수집할 때, 동일한 downstream에 대해 마지막 쿼리의 upstream만 리니지로 표현되는 문제는 주로 수집 과정에서 리니지가 집계되거나 덮어쓰여지는 방식과 관련이 있을 수 있습니다.

<https://datahubproject.io/docs/0.14.0/generated/ingestion/sources/sql-queries|DataHub SQL Queries 문서>에 따르면, sql-queries 소스는 SQL 쿼리를 포함한 줄바꿈으로 구분된 JSON 파일을 읽고 이를 파싱하여 리니지를 생성합니다. 파일의 각 쿼리는 해당 쿼리의 upstream과 downstream 테이블을 결정하기 위해 처리됩니다.

다음은 고려할 수 있는 몇 가지 잠재적인 이유와 단계입니다:

  1. 집계 로직: 수집 과정에서 리니지 데이터가 집계되는 방식이 동일한 downstream에 대해 마지막 쿼리의 upstream만 유지하도록 되어 있을 수 있습니다. 이는 메타데이터 저장소에 리니지 데이터가 저장되거나 업데이트되는 방식 때문일 수 있습니다.

  2. 구성 설정: 수집 레시피의 구성 설정을 확인하여 리니지가 집계되거나 저장되는 방식에 영향을 미칠 수 있는 설정이 있는지 확인하세요. 동일한 downstream에 대해 여러 upstream이 기록될 수 있도록 설정이 되어 있는지 확인하세요.

  3. 디버깅 및 로그: 수집 과정을 디버그 모드로 실행하여 자세한 로그를 확인하세요. 이를 통해 리니지 추출 또는 집계 과정에서 문제가 있는지 확인할 수 있습니다.

  4. 수동 검증: sql-queries.json 파일의 내용을 검증하여 쿼리가 올바르게 형식화되어 있는지, 그리고 upstream과 downstream 테이블이 예상대로 지정되어 있는지 확인하세요.

  5. 코드 검토: 수집 코드에 접근할 수 있다면, SQL 쿼리를 처리하고 리니지를 구성하는 로직을 검토하세요. 동일한 downstream에 대해 이전 upstream을 덮어쓰는 로직이 있는지 확인하세요.

이 과정의 특정 부분에 대해 더 자세한 단계나 예제가 필요하신가요?

출처:

Sources:

<@U06TM7M40RK> 리니지를 GMS에 보낼 때, downstream_urn에서 추가되는 upstream이 계속 추가되면, add되는 건지, 마지막만 갱신되는 건지?

When sending lineage to the GMS (Graph Metadata Service) in DataHub, whether upstreams are continuously added or only the last one is updated depends on how the lineage data is being ingested and processed.

According to the <https://datahubproject.io/docs/0.14.0/generated/ingestion/sources/sql-queries|DataHub documentation>, the behavior of lineage updates can vary based on the method used to emit lineage data:

  1. Overwrite Behavior: If you are using the emit_mce method to send Metadata Change Events (MCEs) to the GMS, it typically overwrites the existing lineage for the specified downstream entity. This means that if you send a new lineage event with a different set of upstreams, it will replace the previous upstreams with the new ones.

  2. GraphQL API: If you are using the GraphQL API to update lineage, it might handle updates differently, potentially allowing for more granular control over how lineage is added or updated.

  3. Configuration and Implementation: The specific behavior can also depend on how your ingestion pipeline is configured and implemented. For example, if your ingestion logic is designed to merge new upstreams with existing ones, it might add to the existing lineage rather than replacing it.

  4. Known Issues: There have been discussions in the community about similar issues where only the last upstream is retained. This could be due to how the ingestion logic is implemented or a limitation in the current version of DataHub.

If you are experiencing issues with lineage updates, consider the following steps:

  • Review Ingestion Logic: Check your ingestion code to see if it is designed to overwrite or merge lineage data.
  • Use GraphQL for Fine Control: Consider using the GraphQL API if you need more control over how lineage is updated.
  • Community Support: Engage with the DataHub community to see if others have encountered similar issues and if there are recommended solutions or workarounds.

Would you like more detailed steps or examples on how to manage lineage updates?

Sources:

  • <Slack Discussion on Lineage Updates> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

<@U06TM7M40RK> 위 내용 한글로 설명해줘.

DataHub의 GMS(Graph Metadata Service)에 리니지를 보낼 때, upstream이 계속 추가되는지 아니면 마지막 것만 갱신되는지는 리니지 데이터가 수집되고 처리되는 방식에 따라 다릅니다.

<https://datahubproject.io/docs/0.14.0/generated/ingestion/sources/sql-queries|DataHub 문서>에 따르면, 리니지 업데이트의 동작은 리니지 데이터를 전송하는 방법에 따라 달라질 수 있습니다:

  1. 덮어쓰기 동작: emit_mce 메서드를 사용하여 Metadata Change Events(MCEs)를 GMS에 전송하는 경우, 일반적으로 지정된 downstream 엔티티에 대한 기존 리니지를 덮어씁니다. 즉, 새로운 리니지 이벤트를 다른 upstream 세트와 함께 전송하면 이전 upstream이 새로운 것으로 대체됩니다.

  2. GraphQL API: GraphQL API를 사용하여 리니지를 업데이트하는 경우, 업데이트를 더 세밀하게 제어할 수 있어 추가적인 제어가 가능할 수 있습니다.

  3. 구성 및 구현: 특정 동작은 수집 파이프라인이 구성되고 구현된 방식에 따라 달라질 수 있습니다. 예를 들어, 수집 로직이 새로운 upstream을 기존 것과 병합하도록 설계된 경우, 기존 리니지에 추가될 수 있습니다.

  4. 알려진 문제: 커뮤니티에서는 마지막 upstream만 유지되는 문제에 대한 논의가 있었습니다. 이는 수집 로직이 구현된 방식이나 현재 DataHub 버전의 제한 때문일 수 있습니다.

리니지 업데이트에 문제가 있는 경우, 다음 단계를 고려해 보세요:

  • 수집 로직 검토: 수집 코드가 리니지 데이터를 덮어쓰거나 병합하도록 설계되어 있는지 확인하세요.
  • GraphQL을 통한 세밀한 제어: 리니지 업데이트를 더 세밀하게 제어해야 하는 경우 GraphQL API 사용을 고려하세요.
  • 커뮤니티 지원: DataHub 커뮤니티에 참여하여 다른 사람들이 유사한 문제를 겪었는지, 추천 솔루션이나 해결 방법이 있는지 확인하세요.

이 과정의 특정 부분에 대해 더 자세한 단계나 예제가 필요하신가요?

출처:

  • <Slack 리니지 업데이트 논의>

Sources:

<@U06TM7M40RK> sql-queries는 sql_parsing_builder.py를 사용하는데,

    result: SqlParsingResult,
    *,
    downstream_urn: DatasetUrn,
    query_timestamp: datetime,
    user: Optional[UserUrn],
    custom_operation_type: Optional[str],
) -&gt; Iterable[MetadataWorkUnit]:
    operation_type = result.query_type.to_operation_type()
    # Filter out SELECT and other undesired statements
    if operation_type is None:
        return
    elif operation_type == OperationTypeClass.UNKNOWN:
        if custom_operation_type is None:
            return
        else:
            operation_type = OperationTypeClass.CUSTOM

    aspect = OperationClass(
        timestampMillis=int(time.time() * 1000),
        operationType=operation_type,
        lastUpdatedTimestamp=int(query_timestamp.timestamp() * 1000),
        actor=user,
        customOperationType=custom_operation_type,
    )
    yield MetadataChangeProposalWrapper(
        entityUrn=downstream_urn, aspect=aspect
    ).as_workunit()```
위 함수를 이용해서, GMS로 보내고 있어.

The function _gen_operation_workunit in the sql_parsing_builder.py file is responsible for generating metadata work units that describe operations performed on datasets. This function is part of the process that sends metadata to the Graph Metadata Service (GMS) in DataHub.

Here’s a breakdown of how this function works:

  1. Operation Type Determination: The function starts by determining the type of operation from the SqlParsingResult. It uses the result.query_type.to_operation_type() method to convert the query type into an operation type.

  2. Filtering: The function filters out SELECT and other undesired statements. If the operation type is None, the function returns without yielding any work unit. If the operation type is UNKNOWN and no custom operation type is provided, it also returns.

  3. Operation Aspect Creation: If a valid operation type is determined, the function creates an OperationClass aspect. This aspect includes details such as the timestamp, operation type, last updated timestamp, actor (user), and any custom operation type.

  4. Yielding Work Unit: The function then yields a MetadataChangeProposalWrapper containing the OperationClass aspect. This wrapper is converted into a MetadataWorkUnit and sent to the GMS.

This function is crucial for capturing and sending operation metadata to DataHub, allowing for the tracking of various operations performed on datasets.

If you have specific questions about how to modify or extend this function, or if you need more detailed guidance on any part of this process, please let me know!

Sources:

<@U06TM7M40RK> 위 내용 한글로 알려줘.

sql_parsing_builder.py 파일의 _gen_operation_workunit 함수는 데이터셋에 수행된 작업을 설명하는 메타데이터 작업 단위를 생성하는 역할을 합니다. 이 함수는 DataHub의 Graph Metadata Service(GMS)로 메타데이터를 전송하는 과정의 일부입니다.

이 함수의 작동 방식을 단계별로 설명하겠습니다:

  1. 작업 유형 결정: 함수는 SqlParsingResult에서 작업 유형을 결정합니다. result.query_type.to_operation_type() 메서드를 사용하여 쿼리 유형을 작업 유형으로 변환합니다.

  2. 필터링: 함수는 SELECT 및 기타 원하지 않는 문을 필터링합니다. 작업 유형이 None인 경우, 함수는 작업 단위를 생성하지 않고 반환합니다. 작업 유형이 UNKNOWN이고 사용자 정의 작업 유형이 제공되지 않은 경우에도 반환합니다.

  3. 작업 측면 생성: 유효한 작업 유형이 결정되면, 함수는 OperationClass 측면을 생성합니다. 이 측면에는 타임스탬프, 작업 유형, 마지막 업데이트 타임스탬프, 사용자(행위자), 사용자 정의 작업 유형 등의 세부 정보가 포함됩니다.

  4. 작업 단위 생성 및 전송: 함수는 OperationClass 측면을 포함하는 MetadataChangeProposalWrapper를 생성하고 이를 MetadataWorkUnit으로 변환하여 GMS로 전송합니다.

이 함수는 데이터셋에 수행된 다양한 작업을 추적하고 DataHub에 전송하는 데 중요한 역할을 합니다.

이 과정의 특정 부분에 대해 수정하거나 확장하는 방법에 대한 질문이 있거나, 더 자세한 안내가 필요하시면 말씀해 주세요!

출처:

Sources:

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><