Detecting Creation of New Datasets and Glossary Terms using `MetadataChangeLog_Versioned_v1` Topic

Original Slack Thread

Hi, I’m looking for a way to detect when a new dataset is created using events emitted to the MetadataChangeLog_Versioned_v1 topic. I tried looking at the aspect_diff the field. Any advice?

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Are you using UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g. 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g. BigQuery)

<@U06TM7M40RK> Can you help with the questions in this thread?


:information_source: We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a :thumbsup: or :thumbsdown: so other folks in the Community know if you’re still looking for help on the topic!

To detect when a new dataset is created using events emitted to the MetadataChangeLog_Versioned_v1 topic, you can look for events with changeType set to CREATE and entityType set to dataset. The MetadataChangeLogEvent captures these changes, and you can subscribe to this topic to monitor such events.

Here is a summary of what you need to do:

  1. Subscribe to the MetadataChangeLog_Versioned_v1 Topic: Ensure your application is listening to this Kafka topic.
  2. Filter Events: Look for events where:
    • changeType is CREATE
    • entityType is dataset

Here is an example of what such an event might look like:

{
  "entityUrn": "urn:li:dataset:abc",
  "entityType": "dataset",
  "changeType": "CREATE",
  "aspectName": "datasetProperties",
  "aspect": {
    "value": "{\"name\": \"Sample Dataset\"}",
    "contentType": "application/json"
  },
  "created": {
    "time": 1649953100653,
    "actor": "urn:li:corpuser:jdoe"
  }
}

Example Code to Filter Events:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'MetadataChangeLog_Versioned_v1',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
    event = message.value
    if event['changeType'] == 'CREATE' and event['entityType'] == 'dataset':
        print(f"New dataset created: {event['entityUrn']}")

Would you like more detailed guidance on setting up the Kafka consumer or handling these events?

Sources:

That doesn’t always work because when we ingest new datasets the changeType used is UPSERT.

<@U06TM7M40RK>

You are correct that in many cases, especially during ingestion, the changeType used can be UPSERT rather than CREATE. To detect the creation of new datasets, you will need to look at the combination of changeType being UPSERT and the absence of a previous aspect value, which indicates that the dataset did not exist before.

Here is how you can adjust your approach:

  1. Subscribe to the MetadataChangeLog_Versioned_v1 Topic: Ensure your application is listening to this Kafka topic.
  2. Filter Events: Look for events where:
    • changeType is UPSERT
    • entityType is dataset
    • previousAspectValue is null (indicating it’s a new dataset)

Here is an example of what such an event might look like:

{
  "entityUrn": "urn:li:dataset:abc",
  "entityType": "dataset",
  "changeType": "UPSERT",
  "aspectName": "datasetProperties",
  "aspect": {
    "value": "{\"name\": \"Sample Dataset\"}",
    "contentType": "application/json"
  },
  "previousAspectValue": null,
  "created": {
    "time": 1649953100653,
    "actor": "urn:li:corpuser:jdoe"
  }
}

Example Code to Filter Events:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'MetadataChangeLog_Versioned_v1',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
    event = message.value
    if event['changeType'] == 'UPSERT' and event['entityType'] == 'dataset' and event['previousAspectValue'] is None:
        print(f"New dataset created: {event['entityUrn']}")

This approach ensures that you capture the creation of new datasets even when the changeType is UPSERT.

Would you like more detailed information on setting up the Kafka consumer or handling these events?

Sources:

How about when an owner of a dataset is added or deleted - how can i detect that through the Metadata Change Log Event?

To detect when an owner of a dataset is added or deleted using the MetadataChangeLog_Versioned_v1 topic, you need to monitor changes to the ownership aspect of the dataset. Specifically, you can look for UPSERT or DELETE events where the aspectName is ownership.

Here’s how you can adjust your approach:

  1. Subscribe to the MetadataChangeLog_Versioned_v1 Topic: Ensure your application is listening to this Kafka topic.
  2. Filter Events: Look for events where:
    • changeType is UPSERT or DELETE
    • entityType is dataset
    • aspectName is ownership

Here is an example of what such an event might look like:

Owner Added Event Example:

{
  "entityUrn": "urn:li:dataset:abc",
  "entityType": "dataset",
  "changeType": "UPSERT",
  "aspectName": "ownership",
  "aspect": {
    "value": "{\"owners\":[{\"type\":\"DATAOWNER\",\"owner\":\"urn:li:corpuser:datahub\"}],\"lastModified\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1651516640488}}",
    "contentType": "application/json"
  },
  "previousAspectValue": {
    "value": "{\"owners\":[],\"lastModified\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1651516640488}}",
    "contentType": "application/json"
  },
  "created": {
    "time": 1651516640490,
    "actor": "urn:li:corpuser:datahub"
  }
}

Owner Deleted Event Example:

{
  "entityUrn": "urn:li:dataset:abc",
  "entityType": "dataset",
  "changeType": "DELETE",
  "aspectName": "ownership",
  "aspect": {
    "value": null,
    "contentType": "application/json"
  },
  "previousAspectValue": {
    "value": "{\"owners\":[{\"type\":\"DATAOWNER\",\"owner\":\"urn:li:corpuser:datahub\"}],\"lastModified\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1651516640488}}",
    "contentType": "application/json"
  },
  "created": {
    "time": 1651516640490,
    "actor": "urn:li:corpuser:datahub"
  }
}

Example Code to Filter Events:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'MetadataChangeLog_Versioned_v1',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
    event = message.value
    if event['entityType'] == 'dataset' and event['aspectName'] == 'ownership':
        if event['changeType'] == 'UPSERT':
            print(f"Owner added or updated for dataset: {event['entityUrn']}")
        elif event['changeType'] == 'DELETE':
            print(f"Owner deleted for dataset: {event['entityUrn']}")

This approach ensures that you capture both the addition and deletion of dataset owners.

Would you like more detailed information on setting up the Kafka consumer or handling these events?

Sources:

How about when a new glossary term is created? <@U06TM7M40RK>

To detect when a new glossary term is created using the MetadataChangeLog_Versioned_v1 topic, you need to monitor changes where the entityType is glossaryTerm and the changeType is CREATE or UPSERT. Additionally, you can check if the previousAspectValue is null to confirm that it is a new creation.

Here’s how you can adjust your approach:

  1. Subscribe to the MetadataChangeLog_Versioned_v1 Topic: Ensure your application is listening to this Kafka topic.
  2. Filter Events: Look for events where:
    • changeType is CREATE or UPSERT
    • entityType is glossaryTerm
    • previousAspectValue is null (for UPSERT events)

Here is an example of what such an event might look like:

New Glossary Term Creation Event Example:

{
  "entityUrn": "urn:li:glossaryTerm:abc",
  "entityType": "glossaryTerm",
  "changeType": "UPSERT",
  "aspectName": "glossaryTermInfo",
  "aspect": {
    "value": "{\"name\":\"Sample Term\",\"definition\":\"A sample glossary term\"}",
    "contentType": "application/json"
  },
  "previousAspectValue": null,
  "created": {
    "time": 1651516640490,
    "actor": "urn:li:corpuser:datahub"
  }
}

Example Code to Filter Events:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'MetadataChangeLog_Versioned_v1',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
    event = message.value
    if event['entityType'] == 'glossaryTerm' and event['aspectName'] == 'glossaryTermInfo':
        if event['changeType'] == 'CREATE' or (event['changeType'] == 'UPSERT' and event['previousAspectValue'] is None):
            print(f"New glossary term created: {event['entityUrn']}")

This approach ensures that you capture the creation of new glossary terms even when the changeType is UPSERT.

Would you like more detailed information on setting up the Kafka consumer or handling these events?

Sources: