<@U06TM7M40RK> datahub 0.14.0.2를 사용중인데, source type을 sql-queries로 하고, 이를 수집할 소스로 queries.json 을 설정해서, metadata-ingestion cli를 통해 리니지를 추출했는데, 분석할 때 사용한 original query와 쿼리 종류, 테이블종류까지 _ColumnLineageWithDebugInfo에서 나온 걸로 기억하는데, 이 정보를 저장하는 건 없나?
To address your question about extracting and storing lineage information, including the original query, query type, and table types using the sql-queries
source in DataHub 0.14.0.2, here are the key points:
-
Query File Format: The
queries.json
file should contain one JSON object per line, with fields such asquery
,timestamp
,user
,downstream_tables
, andupstream_tables
. This format is essential for thesql-queries
source to parse and generate lineage. -
Ingestion Configuration: Ensure your
recipe.yaml
is correctly configured to point to thequeries.json
file and specify the platform (e.g., Snowflake, BigQuery). -
Lineage Extraction: The
sql-queries
source reads the queries from the JSON file and parses them to generate lineage, including column-level lineage if supported by the SQL dialect. -
Storing Lineage Information: The lineage information, including the original query and other details, is stored in DataHub’s metadata store. This information can be accessed and visualized through the DataHub UI.
Here is a sample queries.json
and recipe.yaml
configuration:
queries.json:
[
{
"query": "SELECT col1, col2 FROM my_project.my_dataset.my_table1",
"timestamp": 1689232738.051,
"user": "user_a",
"downstream_tables": ["my_project.my_dataset.my_table2"],
"upstream_tables": ["my_project.my_dataset.my_table1"]
}
]
recipe.yaml:
source:
type: sql-queries
config:
platform: "bigquery"
query_file: "./queries.json"
sink:
type: "datahub-rest"
config:
server: "http://localhost:8000"
Ingestion Command:
datahub ingest -c recipe.yaml
Verifying Lineage Information:
After running the ingestion, you can verify the lineage information in the DataHub UI. The original query, query type, and table types should be visible in the lineage details.
Would you like more detailed guidance on any specific part of this process?
Sources:
- <https://datahubproject.io/docs/0.14.0/generated/ingestion/sources/sql-queries|SQL Queries Ingestion Source>
- <https://datahubproject.io/docs/0.14.0/features/dataset-usage-and-query-history|Dataset Usage & Query History>
- <About DataHub Lineage | DataHub Feature Guide> 0 button 0 button Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> Query history를 나타내는 aspect가 뭐야?
The aspect that represents query history in DataHub is the datasetUsageStatistics
aspect. This aspect captures various usage statistics, including the top queries that referenced a dataset.
According to the <https://datahubproject.io/docs/0.14.0/features/dataset-usage-and-query-history|Dataset Usage & Query History documentation>, the datasetUsageStatistics
aspect can include information about the top queries, top users, and overall query counts. This aspect is part of the metadata model and can be ingested using the Python SDK or other ingestion mechanisms.
Here is an example of how to emit the datasetUsageStatistics
aspect using the Python SDK:
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import DatasetUsageStatisticsClass, TopSqlQueryClass
# Initialize the DataHub emitter
emitter = DatahubRestEmitter("http://localhost:8080")
# Create a dataset URN
dataset_urn = make_dataset_urn(platform="bigquery", name="my_project.my_dataset.my_table", env="PROD")
# Create a TopSqlQueryClass instance
top_sql_query = TopSqlQueryClass(
sql="SELECT col1, col2 FROM my_project.my_dataset.my_table",
count=10,
user_email="user@example.com"
)
# Create a DatasetUsageStatisticsClass instance
usage_statistics = DatasetUsageStatisticsClass(
timestampMillis=1689232738000,
topSqlQueries=[top_sql_query]
)
# Emit the usage statistics aspect
emitter.emit_mcp(
entityType="dataset",
entityUrn=dataset_urn,
aspectName="datasetUsageStatistics",
aspect=usage_statistics
)
Would you like more detailed guidance on any specific part of this process?
Sources:
- <https://datahubproject.io/docs/0.14.0/features/dataset-usage-and-query-history|Dataset Usage & Query History> 0 button 0 button Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> 그러면, sq-queries타입으로 queries.json을 ingestion했을 때, 추출한 쿼리에 대한 aspect는 어디에 기록 되나?
When you ingest SQL queries using the sql-queries
source type in DataHub, the extracted query information is recorded in the datasetUsageStatistics
aspect. This aspect captures various usage statistics, including the top queries that referenced a dataset.
Here is a detailed breakdown of how the ingestion process works and where the query information is stored:
-
Ingestion Configuration: You configure the ingestion using a
recipe.yaml
file that specifies thesql-queries
source and points to thequeries.json
file. -
Query File Format: The
queries.json
file contains one JSON object per line, with fields such asquery
,timestamp
,user
,downstream_tables
, andupstream_tables
. -
Ingestion Process: When you run the ingestion command, the
sql-queries
source reads the queries from the JSON file and parses them to generate lineage and usage statistics. -
Storing Query Information: The parsed query information, including the original query, is stored in the
datasetUsageStatistics
aspect. This aspect is part of the metadata model and can be accessed and visualized through the DataHub UI.
Here is an example of how the queries.json
and recipe.yaml
files might look:
queries.json:
[
{
"query": "SELECT col1, col2 FROM my_project.my_dataset.my_table1",
"timestamp": 1689232738.051,
"user": "user_a",
"downstream_tables": ["my_project.my_dataset.my_table2"],
"upstream_tables": ["my_project.my_dataset.my_table1"]
}
]
recipe.yaml:
source:
type: sql-queries
config:
platform: "bigquery"
query_file: "./queries.json"
sink:
type: "datahub-rest"
config:
server: "http://localhost:8000"
Ingestion Command:
datahub ingest -c recipe.yaml
Example Code for Emitting Query Information:
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import DatasetUsageStatisticsClass, TopSqlQueryClass
# Initialize the DataHub emitter
emitter = DatahubRestEmitter("http://localhost:8080")
# Create a dataset URN
dataset_urn = make_dataset_urn(platform="bigquery", name="my_project.my_dataset.my_table", env="PROD")
# Create a TopSqlQueryClass instance
top_sql_query = TopSqlQueryClass(
sql="SELECT col1, col2 FROM my_project.my_dataset.my_table",
count=10,
user_email="user@example.com"
)
# Create a DatasetUsageStatisticsClass instance
usage_statistics = DatasetUsageStatisticsClass(
timestampMillis=1689232738000,
topSqlQueries=[top_sql_query]
)
# Emit the usage statistics aspect
emitter.emit_mcp(
entityType="dataset",
entityUrn=dataset_urn,
aspectName="datasetUsageStatistics",
aspect=usage_statistics
)
Would you like more detailed guidance on any specific part of this process?
Sources:
- https://datahubspace.slack.com/archives/CUMUWQU66/p1722487407.888879|https://datahubspace.slack.com/archives/CUMUWQU66/p1722487407.888879
- https://datahubspace.slack.com/archives/CUMUWQU66/p1724388419.376709|https://datahubspace.slack.com/archives/CUMUWQU66/p1724388419.376709
Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. 0 button 0 button Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> 그러면, source에서 sink로 보내는 custom aspect를 만들려면?
To create and send a custom aspect from the source to the sink in DataHub, you need to follow these steps:
- Define the Custom Aspect: Create a PDL (Pegasus Data Language) file to define your custom aspect.
- Register the Custom Aspect: Add the custom aspect to the
entity-registry.yaml
file. - Build and Deploy the Custom Model: Build and deploy the custom model to DataHub.
- Ingest Data with the Custom Aspect: Use the DataHub ingestion framework to ingest data with the custom aspect.
Here is a detailed breakdown of each step:
Step 1: Define the Custom Aspect
Create a PDL file (e.g., MyCustomAspect.pdl
) to define your custom aspect. For example:
namespace com.mycompany.datahub.schema;
import com.linkedin.common.urn.Urn;
/**
* Properties for my custom aspect
*/
@Aspect = {
"name": "myCustomAspect"
}
record MyCustomAspect {
/**
* Unique Identifier
*/
packageId: string
}
Step 2: Register the Custom Aspect
Add your custom aspect to the entity-registry.yaml
file. For example:
entities:
- name: dataset
aspects:
- myCustomAspect
Step 3: Build and Deploy the Custom Model
Navigate to the metadata-models-custom
directory and build the custom model:
cd metadata-models-custom
../gradlew build
./gradlew :metadata-models-custom:modelDeploy
../gradlew -PprojVersion=0.0.1 build
../gradlew -PprojVersion=0.0.1 install
Step 4: Ingest Data with the Custom Aspect
Use the DataHub ingestion framework to ingest data with the custom aspect. Here is an example using the Python SDK:
import json
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
ChangeTypeClass,
GenericAspectClass,
MetadataChangeProposalClass,
)
# Define the custom aspect
custom_aspect = {
"packageId": "12345"
}
# Initialize the DataHub emitter
emitter = DatahubRestEmitter("http://localhost:8080")
# Define the dataset URN
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)"
# Create the MetadataChangeProposalClass for the custom aspect
mcp_raw = MetadataChangeProposalClass(
entityType="dataset",
entityUrn=dataset_urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="myCustomAspect",
aspect=GenericAspectClass(
contentType="application/json",
value=json.dumps(custom_aspect).encode("utf-8"),
),
)
# Emit the MCP and handle exceptions
try:
emitter.emit(mcp_raw)
print("Successfully wrote to DataHub")
except Exception as e:
print("Failed to write to DataHub")
raise e
Example Configuration for Ingestion
Here is an example recipe.yaml
configuration for ingestion:
source:
type: sql-queries
config:
platform: "hive"
query_file: "./queries.json"
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
Running the Ingestion
Use the DataHub CLI to run the ingestion job with the configuration file:
datahub ingest -c recipe.yaml
Would you like more detailed guidance on any specific part of this process?
Sources:
- <https://github.com/datahub-project/datahub/blob/master/metadata-models-custom/README.md|Custom Metadata Model>
- <https://github.com/datahub-project/datahub/blob/master/docs/how/add-new-aspect.md|Adding a New Metadata Aspect> 0 button 0 button Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> source type을 sql-queries일 때, ingestion하면, column-level lineage를 추출하게 되는데, 좀 더 구체적인 세부목적은 query_file에서 읽어들인 각 query를 새로운 urn으로 정의하고, aspect는 sqlParsingResult로 하고, metadata는 sqlParsingResult를 넣고 싶은거야.
To achieve your goal of defining a new URN for each query read from the query_file
and storing the sqlParsingResult
as an aspect, you can follow these steps:
- Define the Custom Aspect: Create a PDL file to define the
sqlParsingResult
aspect. - Register the Custom Aspect: Add the custom aspect to the
entity-registry.yaml
file. - Build and Deploy the Custom Model: Build and deploy the custom model to DataHub.
- Ingest Data with the Custom Aspect: Use the DataHub ingestion framework to ingest data with the custom aspect.
Step 1: Define the Custom Aspect
Create a PDL file (e.g., SqlParsingResult.pdl
) to define your custom aspect. For example:
namespace com.mycompany.datahub.schema;
import com.linkedin.common.urn.Urn;
/**
* Properties for SQL Parsing Result
*/
@Aspect = {
"name": "sqlParsingResult"
}
record SqlParsingResult {
/**
* The original SQL query
*/
query: string
/**
* The type of the SQL query
*/
queryType: string
/**
* The tables involved in the query
*/
tables: array[string]
}
Step 2: Register the Custom Aspect
Add your custom aspect to the entity-registry.yaml
file. For example:
entities:
- name: dataset
aspects:
- sqlParsingResult
Step 3: Build and Deploy the Custom Model
Navigate to the metadata-models-custom
directory and build the custom model:
cd metadata-models-custom
../gradlew build
./gradlew :metadata-models-custom:modelDeploy
../gradlew -PprojVersion=0.0.1 build
../gradlew -PprojVersion=0.0.1 install
Step 4: Ingest Data with the Custom Aspect
Use the DataHub ingestion framework to ingest data with the custom aspect. Here is an example using the Python SDK:
import json
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
ChangeTypeClass,
GenericAspectClass,
MetadataChangeProposalClass,
)
# Initialize the DataHub emitter
emitter = DatahubRestEmitter("<http://localhost:8080>")
# Example query data
queries = [
{
"query": "SELECT col1, col2 FROM my_project.my_dataset.my_table1",
"queryType": "SELECT",
"tables": ["my_project.my_dataset.my_table1"]
},
{
"query": "INSERT INTO my_project.my_dataset.my_table2 (col1, col2) SELECT col1, col2 FROM my_project.my_dataset.my_table1",
"queryType": "INSERT",
"tables": ["my_project.my_dataset.my_table1", "my_project.my_dataset.my_table2"]
}
]
for i, query_data in enumerate(queries):
# Define the custom aspect
custom_aspect = {
"query": query_data["query"],
"queryType": query_data["queryType"],
"tables": query_data["tables"]
}
# Create a unique URN for each query
query_urn = f"urn:li:query:{i}"
# Create the MetadataChangeProposalClass for the custom aspect
mcp_raw = MetadataChangeProposalClass(
entityType="query",
entityUrn=query_urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="sqlParsingResult",
aspect=GenericAspectClass(
contentType="application/json",
value=json.dumps(custom_aspect).encode("utf-8"),
),
)
# Emit the MCP and handle exceptions
try:
emitter.emit(mcp_raw)
print(f"Successfully wrote query {i} to DataHub")
except Exception as e:
print(f"Failed to write query {i} to DataHub")
raise e
Example Configuration for Ingestion
Here is an example recipe.yaml
configuration for ingestion:
source:
type: sql-queries
config:
platform: "bigquery"
query_file: "./queries.json"
sink:
type: "datahub-rest"
config:
server: "<http://localhost:8080>"
Running the Ingestion
Use the DataHub CLI to run the ingestion job with the configuration file:
datahub ingest -c recipe.yaml
Would you like more detailed guidance on any specific part of this process?
Sources:
- <https://datahubproject.io/docs/0.14.0/generated/ingestion/sources/sql-queries|SQL Queries Ingestion Source> 0 button 0 button Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> 위에서 만든 aspect에 내용을 추가할 query_file포맷에 추가 정보가 더 들어야야돼. query_file 포맷이 더 추가되어야 되는데…
To include additional information in the query_file
format and ensure that this information is captured and stored in your custom aspect, you need to:
- Extend the
query_file
Format: Add the additional fields to yourqueries.json
file. - Update the Custom Aspect Definition: Modify the PDL file to include the new fields.
- Update the Ingestion Logic: Ensure the ingestion logic reads the new fields and includes them in the custom aspect.
Step 1: Extend the query_file
Format
Add the additional fields to your queries.json
file. For example, let’s add queryId
and executionTime
:
queries.json:
[
{
"queryId": "query_1",
"query": "SELECT col1, col2 FROM my_project.my_dataset.my_table1",
"queryType": "SELECT",
"tables": ["my_project.my_dataset.my_table1"],
"executionTime": 1234
},
{
"queryId": "query_2",
"query": "INSERT INTO my_project.my_dataset.my_table2 (col1, col2) SELECT col1, col2 FROM my_project.my_dataset.my_table1",
"queryType": "INSERT",
"tables": ["my_project.my_dataset.my_table1", "my_project.my_dataset.my_table2"],
"executionTime": 5678
}
]
Step 2: Update the Custom Aspect Definition
Modify the PDL file to include the new fields:
SqlParsingResult.pdl:
namespace com.mycompany.datahub.schema;
import com.linkedin.common.urn.Urn;
/**
* Properties for SQL Parsing Result
*/
@Aspect = {
"name": "sqlParsingResult"
}
record SqlParsingResult {
/**
* Unique Identifier for the query
*/
queryId: string
/**
* The original SQL query
*/
query: string
/**
* The type of the SQL query
*/
queryType: string
/**
* The tables involved in the query
*/
tables: array[string]
/**
* Execution time of the query in milliseconds
*/
executionTime: int
}
Step 3: Update the Ingestion Logic
Ensure the ingestion logic reads the new fields and includes them in the custom aspect:
Ingestion Script:
import json
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
ChangeTypeClass,
GenericAspectClass,
MetadataChangeProposalClass,
)
# Initialize the DataHub emitter
emitter = DatahubRestEmitter("<http://localhost:8080>")
# Example query data
queries = [
{
"queryId": "query_1",
"query": "SELECT col1, col2 FROM my_project.my_dataset.my_table1",
"queryType": "SELECT",
"tables": ["my_project.my_dataset.my_table1"],
"executionTime": 1234
},
{
"queryId": "query_2",
"query": "INSERT INTO my_project.my_dataset.my_table2 (col1, col2) SELECT col1, col2 FROM my_project.my_dataset.my_table1",
"queryType": "INSERT",
"tables": ["my_project.my_dataset.my_table1", "my_project.my_dataset.my_table2"],
"executionTime": 5678
}
]
for query_data in queries:
# Define the custom aspect
custom_aspect = {
"queryId": query_data["queryId"],
"query": query_data["query"],
"queryType": query_data["queryType"],
"tables": query_data["tables"],
"executionTime": query_data["executionTime"]
}
# Create a unique URN for each query
query_urn = f"urn:li:query:{query_data['queryId']}"
# Create the MetadataChangeProposalClass for the custom aspect
mcp_raw = MetadataChangeProposalClass(
entityType="query",
entityUrn=query_urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="sqlParsingResult",
aspect=GenericAspectClass(
contentType="application/json",
value=json.dumps(custom_aspect).encode("utf-8"),
),
)
# Emit the MCP and handle exceptions
try:
emitter.emit(mcp_raw)
print(f"Successfully wrote query {query_data['queryId']} to DataHub")
except Exception as e:
print(f"Failed to write query {query_data['queryId']} to DataHub")
raise e
Example Configuration for Ingestion
Here is an example recipe.yaml
configuration for ingestion:
source:
type: sql-queries
config:
platform: "bigquery"
query_file: "./queries.json"
sink:
type: "datahub-rest"
config:
server: "<http://localhost:8080>"
Running the Ingestion
Use the DataHub CLI to run the ingestion job with the configuration file:
datahub ingest -c recipe.yaml
Would you like more detailed guidance on any specific part of this process?
Sources:
- <https://datahubproject.io/docs/0.14.0/generated/ingestion/sources/sql-queries|SQL Queries Ingestion Source> 0 button 0 button Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> 위에서
Update the Ingestion Logic: Ensure the ingestion logic reads the new fields and includes them in the custom aspect.
이 부분이 있는데, datahub의 어느 소스를 수정하거나, 새로 만들어야 돼는지 모르겠어.
To update the ingestion logic to read the new fields and include them in the custom aspect, you will need to modify or extend the sql-queries
source in DataHub. Here are the steps to achieve this:
- Clone the DataHub Repository: Clone the DataHub repository to your local machine.
- Modify the
sql-queries
Source: Update thesql_queries.py
file to handle the new fields. - Build and Deploy the Custom Ingestion Source: Build and deploy the modified ingestion source.
- Run the Ingestion Job: Use the DataHub CLI to run the ingestion job with the updated source.
Step 1: Clone the DataHub Repository
Clone the DataHub repository to your local machine:
git clone https://github.com/datahub-project/datahub.git
cd datahub
Step 2: Modify the sql-queries
Source
Locate the sql_queries.py
file in the metadata-ingestion
module and modify it to handle the new fields. You can find this file in metadata-ingestion/src/datahub/ingestion/source/sql_queries.py
.
Here is an example of how you might modify the sql_queries.py
file to handle the new fields:
sql_queries.py:
import json
from typing import Dict, Any
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
MetadataChangeProposalWrapper,
DatasetPropertiesClass,
ChangeTypeClass,
GenericAspectClass,
)
from datahub.emitter.mce_builder import make_dataset_urn
class SqlQueriesSource(Source):
def __init__(self, config_dict: Dict[str, Any], ctx: PipelineContext):
super().__init__(ctx)
self.config = config_dict
self.query_file = config_dict.get("query_file")
self.platform = config_dict.get("platform")
self.default_db = config_dict.get("default_db")
self.env = config_dict.get("env")
@classmethod
def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "SqlQueriesSource":
return cls(config_dict, ctx)
def get_workunits(self):
with open(self.query_file, "r") as f:
queries = json.load(f)
for query_data in queries:
yield from self._process_query(query_data)
def _process_query(self, query_data: Dict[str, Any]):
# Define the custom aspect
custom_aspect = {
"queryId": query_data["queryId"],
"query": query_data["query"],
"queryType": query_data["queryType"],
"tables": query_data["tables"],
"executionTime": query_data["executionTime"]
}
# Create a unique URN for each query
query_urn = f"urn:li:query:{query_data['queryId']}"
# Create the MetadataChangeProposalWrapper for the custom aspect
mcp = MetadataChangeProposalWrapper(
entityType="query",
entityUrn=query_urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="sqlParsingResult",
aspect=GenericAspectClass(
contentType="application/json",
value=json.dumps(custom_aspect).encode("utf-8"),
),
)
# Create a MetadataWorkUnit and yield it
wu = MetadataWorkUnit(id=query_urn, mcp=mcp)
yield wu
def get_report(self):
return SourceReport()
# Register the custom source
from datahub.ingestion.source.source_registry import SourceRegistry
SourceRegistry.register("custom-sql-queries", SqlQueriesSource)
Step 3: Build and Deploy the Custom Ingestion Source
After modifying the source code, build and install the custom ingestion source:
cd metadata-ingestion
pip install -e .
Step 4: Run the Ingestion Job
Use the DataHub CLI to run the ingestion job with the updated source. Here is an example recipe.yaml
configuration:
recipe.yaml:
source:
type: custom-sql-queries
config:
platform: "bigquery"
query_file: "./queries.json"
default_db: "your_database"
env: "PROD"
sink:
type: "datahub-rest"
config:
server: "<http://localhost:8080>"
Run the ingestion job:
datahub ingest -c recipe.yaml
This setup will ensure that the new fields in the query_file
are read and included in the custom aspect (sqlParsingResult
) when ingested into DataHub.
Would you like more detailed guidance on any specific part of this process?
Sources:
- https://datahubspace.slack.com/archives/C029A3M079U/p1722492356.309409|https://datahubspace.slack.com/archives/C029A3M079U/p1722492356.309409
- https://datahubspace.slack.com/archives/CUMUWQU66/p1688138287.205839|https://datahubspace.slack.com/archives/CUMUWQU66/p1688138287.205839 0 button 0 button Hint: Mention @runllm in the thread for followups.