Hi, I’m looking for a way to detect when a new dataset is created using events emitted to the MetadataChangeLog_Versioned_v1
topic. I tried looking at the aspect_diff the field. Any advice?
Hey there! Make sure your message includes the following information if relevant, so we can help more effectively!
- Are you using UI or CLI for ingestion?
- Which DataHub version are you using? (e.g. 0.12.0)
- What data source(s) are you integrating with DataHub? (e.g. BigQuery)
<@U06TM7M40RK> Can you help with the questions in this thread?
We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a or so other folks in the Community know if you’re still looking for help on the topic!
To detect when a new dataset is created using events emitted to the MetadataChangeLog_Versioned_v1
topic, you can look for events with changeType
set to CREATE
and entityType
set to dataset
. The MetadataChangeLogEvent
captures these changes, and you can subscribe to this topic to monitor such events.
Here is a summary of what you need to do:
- Subscribe to the
MetadataChangeLog_Versioned_v1
Topic: Ensure your application is listening to this Kafka topic. - Filter Events: Look for events where:
changeType
isCREATE
entityType
isdataset
Here is an example of what such an event might look like:
{
"entityUrn": "urn:li:dataset:abc",
"entityType": "dataset",
"changeType": "CREATE",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"name\": \"Sample Dataset\"}",
"contentType": "application/json"
},
"created": {
"time": 1649953100653,
"actor": "urn:li:corpuser:jdoe"
}
}
Example Code to Filter Events:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'MetadataChangeLog_Versioned_v1',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
event = message.value
if event['changeType'] == 'CREATE' and event['entityType'] == 'dataset':
print(f"New dataset created: {event['entityUrn']}")
Would you like more detailed guidance on setting up the Kafka consumer or handling these events?
Sources:
- <https://datahubproject.io/docs/advanced/mcp-mcl/|Metadata Change Log Event documentation>
- <Entity Change Event V1 | DataHub Change Event> 0 button 1 button Hint: Mention <@U06TM7M40RK> in the thread for followups.
That doesn’t always work because when we ingest new datasets the changeType used is UPSERT.
<@U06TM7M40RK>
You are correct that in many cases, especially during ingestion, the changeType
used can be UPSERT
rather than CREATE
. To detect the creation of new datasets, you will need to look at the combination of changeType
being UPSERT
and the absence of a previous aspect value, which indicates that the dataset did not exist before.
Here is how you can adjust your approach:
- Subscribe to the
MetadataChangeLog_Versioned_v1
Topic: Ensure your application is listening to this Kafka topic. - Filter Events: Look for events where:
changeType
isUPSERT
entityType
isdataset
previousAspectValue
isnull
(indicating it’s a new dataset)
Here is an example of what such an event might look like:
{
"entityUrn": "urn:li:dataset:abc",
"entityType": "dataset",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"name\": \"Sample Dataset\"}",
"contentType": "application/json"
},
"previousAspectValue": null,
"created": {
"time": 1649953100653,
"actor": "urn:li:corpuser:jdoe"
}
}
Example Code to Filter Events:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'MetadataChangeLog_Versioned_v1',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
event = message.value
if event['changeType'] == 'UPSERT' and event['entityType'] == 'dataset' and event['previousAspectValue'] is None:
print(f"New dataset created: {event['entityUrn']}")
This approach ensures that you capture the creation of new datasets even when the changeType
is UPSERT
.
Would you like more detailed information on setting up the Kafka consumer or handling these events?
Sources:
- <https://datahubproject.io/docs/advanced/mcp-mcl/|Metadata Change Log Event documentation>
- <Entity Change Event V1 | DataHub Change Event> 1 button 0 button Hint: Mention <@U06TM7M40RK> in the thread for followups.
How about when an owner of a dataset is added or deleted - how can i detect that through the Metadata Change Log Event?
To detect when an owner of a dataset is added or deleted using the MetadataChangeLog_Versioned_v1
topic, you need to monitor changes to the ownership
aspect of the dataset. Specifically, you can look for UPSERT
or DELETE
events where the aspectName
is ownership
.
Here’s how you can adjust your approach:
- Subscribe to the
MetadataChangeLog_Versioned_v1
Topic: Ensure your application is listening to this Kafka topic. - Filter Events: Look for events where:
changeType
isUPSERT
orDELETE
entityType
isdataset
aspectName
isownership
Here is an example of what such an event might look like:
Owner Added Event Example:
{
"entityUrn": "urn:li:dataset:abc",
"entityType": "dataset",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"value": "{\"owners\":[{\"type\":\"DATAOWNER\",\"owner\":\"urn:li:corpuser:datahub\"}],\"lastModified\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1651516640488}}",
"contentType": "application/json"
},
"previousAspectValue": {
"value": "{\"owners\":[],\"lastModified\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1651516640488}}",
"contentType": "application/json"
},
"created": {
"time": 1651516640490,
"actor": "urn:li:corpuser:datahub"
}
}
Owner Deleted Event Example:
{
"entityUrn": "urn:li:dataset:abc",
"entityType": "dataset",
"changeType": "DELETE",
"aspectName": "ownership",
"aspect": {
"value": null,
"contentType": "application/json"
},
"previousAspectValue": {
"value": "{\"owners\":[{\"type\":\"DATAOWNER\",\"owner\":\"urn:li:corpuser:datahub\"}],\"lastModified\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1651516640488}}",
"contentType": "application/json"
},
"created": {
"time": 1651516640490,
"actor": "urn:li:corpuser:datahub"
}
}
Example Code to Filter Events:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'MetadataChangeLog_Versioned_v1',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
event = message.value
if event['entityType'] == 'dataset' and event['aspectName'] == 'ownership':
if event['changeType'] == 'UPSERT':
print(f"Owner added or updated for dataset: {event['entityUrn']}")
elif event['changeType'] == 'DELETE':
print(f"Owner deleted for dataset: {event['entityUrn']}")
This approach ensures that you capture both the addition and deletion of dataset owners.
Would you like more detailed information on setting up the Kafka consumer or handling these events?
Sources:
- <https://datahubproject.io/docs/0.13.1/actions/events/metadata-change-log-event|Metadata Change Log Event documentation>
- <Metadata Events | DataHub Change Log (MCL)> 0 button 0 button Hint: Mention @runllm in the thread for followups.
How about when a new glossary term is created? <@U06TM7M40RK>
To detect when a new glossary term is created using the MetadataChangeLog_Versioned_v1
topic, you need to monitor changes where the entityType
is glossaryTerm
and the changeType
is CREATE
or UPSERT
. Additionally, you can check if the previousAspectValue
is null
to confirm that it is a new creation.
Here’s how you can adjust your approach:
- Subscribe to the
MetadataChangeLog_Versioned_v1
Topic: Ensure your application is listening to this Kafka topic. - Filter Events: Look for events where:
changeType
isCREATE
orUPSERT
entityType
isglossaryTerm
previousAspectValue
isnull
(forUPSERT
events)
Here is an example of what such an event might look like:
New Glossary Term Creation Event Example:
{
"entityUrn": "urn:li:glossaryTerm:abc",
"entityType": "glossaryTerm",
"changeType": "UPSERT",
"aspectName": "glossaryTermInfo",
"aspect": {
"value": "{\"name\":\"Sample Term\",\"definition\":\"A sample glossary term\"}",
"contentType": "application/json"
},
"previousAspectValue": null,
"created": {
"time": 1651516640490,
"actor": "urn:li:corpuser:datahub"
}
}
Example Code to Filter Events:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'MetadataChangeLog_Versioned_v1',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
event = message.value
if event['entityType'] == 'glossaryTerm' and event['aspectName'] == 'glossaryTermInfo':
if event['changeType'] == 'CREATE' or (event['changeType'] == 'UPSERT' and event['previousAspectValue'] is None):
print(f"New glossary term created: {event['entityUrn']}")
This approach ensures that you capture the creation of new glossary terms even when the changeType
is UPSERT
.
Would you like more detailed information on setting up the Kafka consumer or handling these events?
Sources:
- <https://datahubproject.io/docs/0.13.1/actions/events/metadata-change-log-event|Metadata Change Log Event documentation>
- <Metadata Events | DataHub Change Log (MCL)> 0 button 0 button Hint: Mention @runllm in the thread for followups.