Guide on Extracting and Storing Lineage Information with Custom Aspects in DataHub

Original Slack Thread

<@U06TM7M40RK> datahub 0.14.0.2를 사용중인데, source type을 sql-queries로 하고, 이를 수집할 소스로 queries.json 을 설정해서, metadata-ingestion cli를 통해 리니지를 추출했는데, 분석할 때 사용한 original query와 쿼리 종류, 테이블종류까지 _ColumnLineageWithDebugInfo에서 나온 걸로 기억하는데, 이 정보를 저장하는 건 없나?

To address your question about extracting and storing lineage information, including the original query, query type, and table types using the sql-queries source in DataHub 0.14.0.2, here are the key points:

  1. Query File Format: The queries.json file should contain one JSON object per line, with fields such as query, timestamp, user, downstream_tables, and upstream_tables. This format is essential for the sql-queries source to parse and generate lineage.

  2. Ingestion Configuration: Ensure your recipe.yaml is correctly configured to point to the queries.json file and specify the platform (e.g., Snowflake, BigQuery).

  3. Lineage Extraction: The sql-queries source reads the queries from the JSON file and parses them to generate lineage, including column-level lineage if supported by the SQL dialect.

  4. Storing Lineage Information: The lineage information, including the original query and other details, is stored in DataHub’s metadata store. This information can be accessed and visualized through the DataHub UI.

Here is a sample queries.json and recipe.yaml configuration:

queries.json:

[
    {
        "query": "SELECT col1, col2 FROM my_project.my_dataset.my_table1",
        "timestamp": 1689232738.051,
        "user": "user_a",
        "downstream_tables": ["my_project.my_dataset.my_table2"],
        "upstream_tables": ["my_project.my_dataset.my_table1"]
    }
]

recipe.yaml:

source:
    type: sql-queries
    config:
        platform: "bigquery"
        query_file: "./queries.json"
sink:
    type: "datahub-rest"
    config:
        server: "http://localhost:8000"

Ingestion Command:

datahub ingest -c recipe.yaml

Verifying Lineage Information:
After running the ingestion, you can verify the lineage information in the DataHub UI. The original query, query type, and table types should be visible in the lineage details.

Would you like more detailed guidance on any specific part of this process?

Sources:

<@U06TM7M40RK> Query history를 나타내는 aspect가 뭐야?

The aspect that represents query history in DataHub is the datasetUsageStatistics aspect. This aspect captures various usage statistics, including the top queries that referenced a dataset.

According to the <https://datahubproject.io/docs/0.14.0/features/dataset-usage-and-query-history|Dataset Usage & Query History documentation>, the datasetUsageStatistics aspect can include information about the top queries, top users, and overall query counts. This aspect is part of the metadata model and can be ingested using the Python SDK or other ingestion mechanisms.

Here is an example of how to emit the datasetUsageStatistics aspect using the Python SDK:

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import DatasetUsageStatisticsClass, TopSqlQueryClass

# Initialize the DataHub emitter
emitter = DatahubRestEmitter("http://localhost:8080")

# Create a dataset URN
dataset_urn = make_dataset_urn(platform="bigquery", name="my_project.my_dataset.my_table", env="PROD")

# Create a TopSqlQueryClass instance
top_sql_query = TopSqlQueryClass(
    sql="SELECT col1, col2 FROM my_project.my_dataset.my_table",
    count=10,
    user_email="user@example.com"
)

# Create a DatasetUsageStatisticsClass instance
usage_statistics = DatasetUsageStatisticsClass(
    timestampMillis=1689232738000,
    topSqlQueries=[top_sql_query]
)

# Emit the usage statistics aspect
emitter.emit_mcp(
    entityType="dataset",
    entityUrn=dataset_urn,
    aspectName="datasetUsageStatistics",
    aspect=usage_statistics
)

Would you like more detailed guidance on any specific part of this process?

Sources:

<@U06TM7M40RK> 그러면, sq-queries타입으로 queries.json을 ingestion했을 때, 추출한 쿼리에 대한 aspect는 어디에 기록 되나?

When you ingest SQL queries using the sql-queries source type in DataHub, the extracted query information is recorded in the datasetUsageStatistics aspect. This aspect captures various usage statistics, including the top queries that referenced a dataset.

Here is a detailed breakdown of how the ingestion process works and where the query information is stored:

  1. Ingestion Configuration: You configure the ingestion using a recipe.yaml file that specifies the sql-queries source and points to the queries.json file.

  2. Query File Format: The queries.json file contains one JSON object per line, with fields such as query, timestamp, user, downstream_tables, and upstream_tables.

  3. Ingestion Process: When you run the ingestion command, the sql-queries source reads the queries from the JSON file and parses them to generate lineage and usage statistics.

  4. Storing Query Information: The parsed query information, including the original query, is stored in the datasetUsageStatistics aspect. This aspect is part of the metadata model and can be accessed and visualized through the DataHub UI.

Here is an example of how the queries.json and recipe.yaml files might look:

queries.json:

[
    {
        "query": "SELECT col1, col2 FROM my_project.my_dataset.my_table1",
        "timestamp": 1689232738.051,
        "user": "user_a",
        "downstream_tables": ["my_project.my_dataset.my_table2"],
        "upstream_tables": ["my_project.my_dataset.my_table1"]
    }
]

recipe.yaml:

source:
    type: sql-queries
    config:
        platform: "bigquery"
        query_file: "./queries.json"
sink:
    type: "datahub-rest"
    config:
        server: "http://localhost:8000"

Ingestion Command:

datahub ingest -c recipe.yaml

Example Code for Emitting Query Information:

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import DatasetUsageStatisticsClass, TopSqlQueryClass

# Initialize the DataHub emitter
emitter = DatahubRestEmitter("http://localhost:8080")

# Create a dataset URN
dataset_urn = make_dataset_urn(platform="bigquery", name="my_project.my_dataset.my_table", env="PROD")

# Create a TopSqlQueryClass instance
top_sql_query = TopSqlQueryClass(
    sql="SELECT col1, col2 FROM my_project.my_dataset.my_table",
    count=10,
    user_email="user@example.com"
)

# Create a DatasetUsageStatisticsClass instance
usage_statistics = DatasetUsageStatisticsClass(
    timestampMillis=1689232738000,
    topSqlQueries=[top_sql_query]
)

# Emit the usage statistics aspect
emitter.emit_mcp(
    entityType="dataset",
    entityUrn=dataset_urn,
    aspectName="datasetUsageStatistics",
    aspect=usage_statistics
)

Would you like more detailed guidance on any specific part of this process?

Sources:

<@U06TM7M40RK> 그러면, source에서 sink로 보내는 custom aspect를 만들려면?

To create and send a custom aspect from the source to the sink in DataHub, you need to follow these steps:

  1. Define the Custom Aspect: Create a PDL (Pegasus Data Language) file to define your custom aspect.
  2. Register the Custom Aspect: Add the custom aspect to the entity-registry.yaml file.
  3. Build and Deploy the Custom Model: Build and deploy the custom model to DataHub.
  4. Ingest Data with the Custom Aspect: Use the DataHub ingestion framework to ingest data with the custom aspect.

Here is a detailed breakdown of each step:

Step 1: Define the Custom Aspect

Create a PDL file (e.g., MyCustomAspect.pdl) to define your custom aspect. For example:

namespace com.mycompany.datahub.schema;

import com.linkedin.common.urn.Urn;

/**
 * Properties for my custom aspect
 */
@Aspect = {
  "name": "myCustomAspect"
}
record MyCustomAspect {
  /**
   * Unique Identifier
   */
  packageId: string
}

Step 2: Register the Custom Aspect

Add your custom aspect to the entity-registry.yaml file. For example:

entities:
  - name: dataset
    aspects:
      - myCustomAspect

Step 3: Build and Deploy the Custom Model

Navigate to the metadata-models-custom directory and build the custom model:

cd metadata-models-custom
../gradlew build
./gradlew :metadata-models-custom:modelDeploy
../gradlew -PprojVersion=0.0.1 build
../gradlew -PprojVersion=0.0.1 install

Step 4: Ingest Data with the Custom Aspect

Use the DataHub ingestion framework to ingest data with the custom aspect. Here is an example using the Python SDK:

import json
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
    ChangeTypeClass,
    GenericAspectClass,
    MetadataChangeProposalClass,
)

# Define the custom aspect
custom_aspect = {
    "packageId": "12345"
}

# Initialize the DataHub emitter
emitter = DatahubRestEmitter("http://localhost:8080")

# Define the dataset URN
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)"

# Create the MetadataChangeProposalClass for the custom aspect
mcp_raw = MetadataChangeProposalClass(
    entityType="dataset",
    entityUrn=dataset_urn,
    changeType=ChangeTypeClass.UPSERT,
    aspectName="myCustomAspect",
    aspect=GenericAspectClass(
        contentType="application/json",
        value=json.dumps(custom_aspect).encode("utf-8"),
    ),
)

# Emit the MCP and handle exceptions
try:
    emitter.emit(mcp_raw)
    print("Successfully wrote to DataHub")
except Exception as e:
    print("Failed to write to DataHub")
    raise e

Example Configuration for Ingestion

Here is an example recipe.yaml configuration for ingestion:

source:
  type: sql-queries
  config:
    platform: "hive"
    query_file: "./queries.json"
sink:
  type: "datahub-rest"
  config:
    server: "http://localhost:8080"

Running the Ingestion

Use the DataHub CLI to run the ingestion job with the configuration file:

datahub ingest -c recipe.yaml

Would you like more detailed guidance on any specific part of this process?

Sources:

<@U06TM7M40RK> source type을 sql-queries일 때, ingestion하면, column-level lineage를 추출하게 되는데, 좀 더 구체적인 세부목적은 query_file에서 읽어들인 각 query를 새로운 urn으로 정의하고, aspect는 sqlParsingResult로 하고, metadata는 sqlParsingResult를 넣고 싶은거야.

To achieve your goal of defining a new URN for each query read from the query_file and storing the sqlParsingResult as an aspect, you can follow these steps:

  1. Define the Custom Aspect: Create a PDL file to define the sqlParsingResult aspect.
  2. Register the Custom Aspect: Add the custom aspect to the entity-registry.yaml file.
  3. Build and Deploy the Custom Model: Build and deploy the custom model to DataHub.
  4. Ingest Data with the Custom Aspect: Use the DataHub ingestion framework to ingest data with the custom aspect.

Step 1: Define the Custom Aspect

Create a PDL file (e.g., SqlParsingResult.pdl) to define your custom aspect. For example:

namespace com.mycompany.datahub.schema;

import com.linkedin.common.urn.Urn;

/**
 * Properties for SQL Parsing Result
 */
@Aspect = {
  "name": "sqlParsingResult"
}
record SqlParsingResult {
  /**
   * The original SQL query
   */
  query: string

  /**
   * The type of the SQL query
   */
  queryType: string

  /**
   * The tables involved in the query
   */
  tables: array[string]
}

Step 2: Register the Custom Aspect

Add your custom aspect to the entity-registry.yaml file. For example:

entities:
  - name: dataset
    aspects:
      - sqlParsingResult

Step 3: Build and Deploy the Custom Model

Navigate to the metadata-models-custom directory and build the custom model:

cd metadata-models-custom
../gradlew build
./gradlew :metadata-models-custom:modelDeploy
../gradlew -PprojVersion=0.0.1 build
../gradlew -PprojVersion=0.0.1 install

Step 4: Ingest Data with the Custom Aspect

Use the DataHub ingestion framework to ingest data with the custom aspect. Here is an example using the Python SDK:

import json
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
    ChangeTypeClass,
    GenericAspectClass,
    MetadataChangeProposalClass,
)

# Initialize the DataHub emitter
emitter = DatahubRestEmitter("<http://localhost:8080>")

# Example query data
queries = [
    {
        "query": "SELECT col1, col2 FROM my_project.my_dataset.my_table1",
        "queryType": "SELECT",
        "tables": ["my_project.my_dataset.my_table1"]
    },
    {
        "query": "INSERT INTO my_project.my_dataset.my_table2 (col1, col2) SELECT col1, col2 FROM my_project.my_dataset.my_table1",
        "queryType": "INSERT",
        "tables": ["my_project.my_dataset.my_table1", "my_project.my_dataset.my_table2"]
    }
]

for i, query_data in enumerate(queries):
    # Define the custom aspect
    custom_aspect = {
        "query": query_data["query"],
        "queryType": query_data["queryType"],
        "tables": query_data["tables"]
    }

    # Create a unique URN for each query
    query_urn = f"urn:li:query:{i}"

    # Create the MetadataChangeProposalClass for the custom aspect
    mcp_raw = MetadataChangeProposalClass(
        entityType="query",
        entityUrn=query_urn,
        changeType=ChangeTypeClass.UPSERT,
        aspectName="sqlParsingResult",
        aspect=GenericAspectClass(
            contentType="application/json",
            value=json.dumps(custom_aspect).encode("utf-8"),
        ),
    )

    # Emit the MCP and handle exceptions
    try:
        emitter.emit(mcp_raw)
        print(f"Successfully wrote query {i} to DataHub")
    except Exception as e:
        print(f"Failed to write query {i} to DataHub")
        raise e

Example Configuration for Ingestion

Here is an example recipe.yaml configuration for ingestion:

source:
  type: sql-queries
  config:
    platform: "bigquery"
    query_file: "./queries.json"
sink:
  type: "datahub-rest"
  config:
    server: "<http://localhost:8080>"

Running the Ingestion

Use the DataHub CLI to run the ingestion job with the configuration file:

datahub ingest -c recipe.yaml

Would you like more detailed guidance on any specific part of this process?

Sources:

<@U06TM7M40RK> 위에서 만든 aspect에 내용을 추가할 query_file포맷에 추가 정보가 더 들어야야돼. query_file 포맷이 더 추가되어야 되는데…

To include additional information in the query_file format and ensure that this information is captured and stored in your custom aspect, you need to:

  1. Extend the query_file Format: Add the additional fields to your queries.json file.
  2. Update the Custom Aspect Definition: Modify the PDL file to include the new fields.
  3. Update the Ingestion Logic: Ensure the ingestion logic reads the new fields and includes them in the custom aspect.

Step 1: Extend the query_file Format

Add the additional fields to your queries.json file. For example, let’s add queryId and executionTime:

queries.json:

[
    {
        "queryId": "query_1",
        "query": "SELECT col1, col2 FROM my_project.my_dataset.my_table1",
        "queryType": "SELECT",
        "tables": ["my_project.my_dataset.my_table1"],
        "executionTime": 1234
    },
    {
        "queryId": "query_2",
        "query": "INSERT INTO my_project.my_dataset.my_table2 (col1, col2) SELECT col1, col2 FROM my_project.my_dataset.my_table1",
        "queryType": "INSERT",
        "tables": ["my_project.my_dataset.my_table1", "my_project.my_dataset.my_table2"],
        "executionTime": 5678
    }
]

Step 2: Update the Custom Aspect Definition

Modify the PDL file to include the new fields:

SqlParsingResult.pdl:

namespace com.mycompany.datahub.schema;

import com.linkedin.common.urn.Urn;

/**
 * Properties for SQL Parsing Result
 */
@Aspect = {
  "name": "sqlParsingResult"
}
record SqlParsingResult {
  /**
   * Unique Identifier for the query
   */
  queryId: string

  /**
   * The original SQL query
   */
  query: string

  /**
   * The type of the SQL query
   */
  queryType: string

  /**
   * The tables involved in the query
   */
  tables: array[string]

  /**
   * Execution time of the query in milliseconds
   */
  executionTime: int
}

Step 3: Update the Ingestion Logic

Ensure the ingestion logic reads the new fields and includes them in the custom aspect:

Ingestion Script:

import json
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
    ChangeTypeClass,
    GenericAspectClass,
    MetadataChangeProposalClass,
)

# Initialize the DataHub emitter
emitter = DatahubRestEmitter("<http://localhost:8080>")

# Example query data
queries = [
    {
        "queryId": "query_1",
        "query": "SELECT col1, col2 FROM my_project.my_dataset.my_table1",
        "queryType": "SELECT",
        "tables": ["my_project.my_dataset.my_table1"],
        "executionTime": 1234
    },
    {
        "queryId": "query_2",
        "query": "INSERT INTO my_project.my_dataset.my_table2 (col1, col2) SELECT col1, col2 FROM my_project.my_dataset.my_table1",
        "queryType": "INSERT",
        "tables": ["my_project.my_dataset.my_table1", "my_project.my_dataset.my_table2"],
        "executionTime": 5678
    }
]

for query_data in queries:
    # Define the custom aspect
    custom_aspect = {
        "queryId": query_data["queryId"],
        "query": query_data["query"],
        "queryType": query_data["queryType"],
        "tables": query_data["tables"],
        "executionTime": query_data["executionTime"]
    }

    # Create a unique URN for each query
    query_urn = f"urn:li:query:{query_data['queryId']}"

    # Create the MetadataChangeProposalClass for the custom aspect
    mcp_raw = MetadataChangeProposalClass(
        entityType="query",
        entityUrn=query_urn,
        changeType=ChangeTypeClass.UPSERT,
        aspectName="sqlParsingResult",
        aspect=GenericAspectClass(
            contentType="application/json",
            value=json.dumps(custom_aspect).encode("utf-8"),
        ),
    )

    # Emit the MCP and handle exceptions
    try:
        emitter.emit(mcp_raw)
        print(f"Successfully wrote query {query_data['queryId']} to DataHub")
    except Exception as e:
        print(f"Failed to write query {query_data['queryId']} to DataHub")
        raise e

Example Configuration for Ingestion

Here is an example recipe.yaml configuration for ingestion:

source:
  type: sql-queries
  config:
    platform: "bigquery"
    query_file: "./queries.json"
sink:
  type: "datahub-rest"
  config:
    server: "<http://localhost:8080>"

Running the Ingestion

Use the DataHub CLI to run the ingestion job with the configuration file:

datahub ingest -c recipe.yaml

Would you like more detailed guidance on any specific part of this process?

Sources:

<@U06TM7M40RK> 위에서
Update the Ingestion Logic: Ensure the ingestion logic reads the new fields and includes them in the custom aspect.
이 부분이 있는데, datahub의 어느 소스를 수정하거나, 새로 만들어야 돼는지 모르겠어.

To update the ingestion logic to read the new fields and include them in the custom aspect, you will need to modify or extend the sql-queries source in DataHub. Here are the steps to achieve this:

  1. Clone the DataHub Repository: Clone the DataHub repository to your local machine.
  2. Modify the sql-queries Source: Update the sql_queries.py file to handle the new fields.
  3. Build and Deploy the Custom Ingestion Source: Build and deploy the modified ingestion source.
  4. Run the Ingestion Job: Use the DataHub CLI to run the ingestion job with the updated source.

Step 1: Clone the DataHub Repository

Clone the DataHub repository to your local machine:

git clone https://github.com/datahub-project/datahub.git
cd datahub

Step 2: Modify the sql-queries Source

Locate the sql_queries.py file in the metadata-ingestion module and modify it to handle the new fields. You can find this file in metadata-ingestion/src/datahub/ingestion/source/sql_queries.py.

Here is an example of how you might modify the sql_queries.py file to handle the new fields:

sql_queries.py:

import json
from typing import Dict, Any
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
    MetadataChangeProposalWrapper,
    DatasetPropertiesClass,
    ChangeTypeClass,
    GenericAspectClass,
)
from datahub.emitter.mce_builder import make_dataset_urn

class SqlQueriesSource(Source):
    def __init__(self, config_dict: Dict[str, Any], ctx: PipelineContext):
        super().__init__(ctx)
        self.config = config_dict
        self.query_file = config_dict.get("query_file")
        self.platform = config_dict.get("platform")
        self.default_db = config_dict.get("default_db")
        self.env = config_dict.get("env")

    @classmethod
    def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -&gt; "SqlQueriesSource":
        return cls(config_dict, ctx)

    def get_workunits(self):
        with open(self.query_file, "r") as f:
            queries = json.load(f)
            for query_data in queries:
                yield from self._process_query(query_data)

    def _process_query(self, query_data: Dict[str, Any]):
        # Define the custom aspect
        custom_aspect = {
            "queryId": query_data["queryId"],
            "query": query_data["query"],
            "queryType": query_data["queryType"],
            "tables": query_data["tables"],
            "executionTime": query_data["executionTime"]
        }

        # Create a unique URN for each query
        query_urn = f"urn:li:query:{query_data['queryId']}"

        # Create the MetadataChangeProposalWrapper for the custom aspect
        mcp = MetadataChangeProposalWrapper(
            entityType="query",
            entityUrn=query_urn,
            changeType=ChangeTypeClass.UPSERT,
            aspectName="sqlParsingResult",
            aspect=GenericAspectClass(
                contentType="application/json",
                value=json.dumps(custom_aspect).encode("utf-8"),
            ),
        )

        # Create a MetadataWorkUnit and yield it
        wu = MetadataWorkUnit(id=query_urn, mcp=mcp)
        yield wu

    def get_report(self):
        return SourceReport()

# Register the custom source
from datahub.ingestion.source.source_registry import SourceRegistry
SourceRegistry.register("custom-sql-queries", SqlQueriesSource)

Step 3: Build and Deploy the Custom Ingestion Source

After modifying the source code, build and install the custom ingestion source:

cd metadata-ingestion
pip install -e .

Step 4: Run the Ingestion Job

Use the DataHub CLI to run the ingestion job with the updated source. Here is an example recipe.yaml configuration:

recipe.yaml:

source:
  type: custom-sql-queries
  config:
    platform: "bigquery"
    query_file: "./queries.json"
    default_db: "your_database"
    env: "PROD"
sink:
  type: "datahub-rest"
  config:
    server: "<http://localhost:8080>"

Run the ingestion job:

datahub ingest -c recipe.yaml

This setup will ensure that the new fields in the query_file are read and included in the custom aspect (sqlParsingResult) when ingested into DataHub.

Would you like more detailed guidance on any specific part of this process?

Sources: