How to Fetch and Merge Existing Tags in Datahub Emitter

Original Slack Thread

Hi! I’m using Datahub emitter to tag sources. But one thing I noticed is the it overwrite existing tags. One thing I tried to do was fetch existing tags from the dataset properties using datahub graph and merge the new tag to that list. However the tags always shows up empty even though I see the source tagged in the UI. Is there a way to fetch existing tags?

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Are you using UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g. 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g. BigQuery)

<@U06TM7M40RK> Can you help with the questions in this thread?


:information_source: We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a :thumbsup: or :thumbsdown: so other folks in the Community know if you’re still looking for help on the topic!

To fetch existing tags from a dataset and merge them with new tags, you can use the get_aspects_for_entity method from the DataHubGraph object. This method allows you to retrieve specific aspects of an entity, such as tags. Here’s a step-by-step guide on how to achieve this:

  1. Fetch Existing Tags: Use the get_aspects_for_entity method to get the current tags associated with the dataset.
  2. Merge Tags: Combine the existing tags with the new tags you want to add.
  3. Update Tags: Use the emit method to update the dataset with the merged tags.

Here is an example in Python:

from datahub.emitter.mce_builder import make_tag_urn
from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass
from datahub.ingestion.graph.client import DataHubGraph

# Initialize DataHubGraph
graph = DataHubGraph("http://localhost:8080")

# Define the dataset URN
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:s3,mybucket/data.json,PROD)"

# Fetch existing tags
existing_tags_aspect = graph.get_aspects_for_entity(
    entity_urn=dataset_urn,
    aspects=["globalTags"],
    aspect_types=[GlobalTagsClass]
)

# Extract existing tags
existing_tags = []
if existing_tags_aspect:
    existing_tags = existing_tags_aspect[0].tags

# Define new tags
new_tags = [make_tag_urn("new_tag")]

# Merge existing and new tags
merged_tags = list(set(existing_tags + new_tags))

# Create GlobalTagsClass object
global_tags = GlobalTagsClass(tags=[TagAssociationClass(tag) for tag in merged_tags])

# Emit the updated tags
mcpw = MetadataChangeProposalWrapper(
    entityType="dataset",
    entityUrn=dataset_urn,
    aspect=global_tags,
    changeType=ChangeTypeClass.UPSERT
)
emitter.emit(mcpw)

This code snippet demonstrates how to fetch existing tags, merge them with new tags, and update the dataset with the combined tags.

Would you like more details on any specific part of this process?

Sources:

<@U06TM7M40RK> How can I set the description of a dataset property

To set the description of a dataset property in DataHub, you can use the updateDataset mutation in GraphQL. This mutation allows you to update various properties of a dataset, including its description. Here’s a step-by-step guide on how to achieve this:

Using GraphQL

  1. Define the Dataset URN: Identify the URN of the dataset you want to update.
  2. Create the GraphQL Mutation: Use the updateDataset mutation to set the description.

Here is an example of how to do this using a GraphQL mutation:

mutation updateDataset {
  updateDataset(
    urn: "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
    input: {
      editableProperties: {
        description: "## The Real Estate Sales Dataset\nThis is a really important Dataset that contains all the relevant information about sales that have happened organized by address.\n"
      }
    }
  ) {
    urn
  }
}

Using Curl

You can also use a curl command to send the GraphQL mutation:

curl --location --request POST 'http://localhost:8080/api/graphql' \
--header 'Authorization: Bearer <my-access-token>' \
--header 'Content-Type: application/json' \
--data-raw '{
  "query": "mutation updateDataset { updateDataset(urn: \"urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)\", input: { editableProperties: { description: \"## The Real Estate Sales Dataset\nThis is a really important Dataset that contains all the relevant information about sales that have happened organized by address.\n\" } }) { urn } }",
  "variables": {}
}'

Using Python

If you prefer using Python, you can use the DataHub emitter to achieve the same:

import logging
import time
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.schema_classes import (
    AuditStampClass,
    EditableDatasetPropertiesClass,
)

# Initialize logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

# Define the dataset URN
dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")

# Define the new description
documentation_to_add = "## The Real Estate Sales Dataset\nThis is a really important Dataset that contains all the relevant information about sales that have happened organized by address.\n"

# Initialize DataHubGraph
gms_endpoint = "http://localhost:8080"
graph = DataHubGraph(config=DatahubClientConfig(server=gms_endpoint))

# Get the current editable properties
current_editable_properties = graph.get_aspect(
    entity_urn=dataset_urn, aspect_type=EditableDatasetPropertiesClass
)

# Update the description if needed
need_write = False
if current_editable_properties:
    if documentation_to_add != current_editable_properties.description:
        current_editable_properties.description = documentation_to_add
        need_write = True

else:
    # Create a new editable dataset properties aspect
    now = int(time.time() * 1000)  # milliseconds since epoch
    current_timestamp = AuditStampClass(time=now, actor="urn:li:corpuser:ingestion")
    current_editable_properties = EditableDatasetPropertiesClass(
        created=current_timestamp, description=documentation_to_add
    )
    need_write = True

# Emit the updated properties
if need_write:
    event = MetadataChangeProposalWrapper(
        entityUrn=dataset_urn, aspect=current_editable_properties
    )
    graph.emit(event)
    <http://log.info|log.info>(f"Documentation added to dataset {dataset_urn}")
else:
    <http://log.info|log.info>("Documentation already exists and is identical, omitting write")

This Python script demonstrates how to update the description of a dataset property using the DataHub emitter.

Would you like more details on any specific part of this process?

Sources:

  • <Description | DataHub Description on Dataset> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.