Hi! I’m using Datahub emitter to tag sources. But one thing I noticed is the it overwrite existing tags. One thing I tried to do was fetch existing tags from the dataset properties using datahub graph and merge the new tag to that list. However the tags always shows up empty even though I see the source tagged in the UI. Is there a way to fetch existing tags?
Hey there! Make sure your message includes the following information if relevant, so we can help more effectively!
- Are you using UI or CLI for ingestion?
- Which DataHub version are you using? (e.g. 0.12.0)
- What data source(s) are you integrating with DataHub? (e.g. BigQuery)
<@U06TM7M40RK> Can you help with the questions in this thread?
We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a
or
so other folks in the Community know if you’re still looking for help on the topic!
To fetch existing tags from a dataset and merge them with new tags, you can use the get_aspects_for_entity
method from the DataHubGraph
object. This method allows you to retrieve specific aspects of an entity, such as tags. Here’s a step-by-step guide on how to achieve this:
- Fetch Existing Tags: Use the
get_aspects_for_entity
method to get the current tags associated with the dataset. - Merge Tags: Combine the existing tags with the new tags you want to add.
- Update Tags: Use the
emit
method to update the dataset with the merged tags.
Here is an example in Python:
from datahub.emitter.mce_builder import make_tag_urn
from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass
from datahub.ingestion.graph.client import DataHubGraph
# Initialize DataHubGraph
graph = DataHubGraph("http://localhost:8080")
# Define the dataset URN
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:s3,mybucket/data.json,PROD)"
# Fetch existing tags
existing_tags_aspect = graph.get_aspects_for_entity(
entity_urn=dataset_urn,
aspects=["globalTags"],
aspect_types=[GlobalTagsClass]
)
# Extract existing tags
existing_tags = []
if existing_tags_aspect:
existing_tags = existing_tags_aspect[0].tags
# Define new tags
new_tags = [make_tag_urn("new_tag")]
# Merge existing and new tags
merged_tags = list(set(existing_tags + new_tags))
# Create GlobalTagsClass object
global_tags = GlobalTagsClass(tags=[TagAssociationClass(tag) for tag in merged_tags])
# Emit the updated tags
mcpw = MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn=dataset_urn,
aspect=global_tags,
changeType=ChangeTypeClass.UPSERT
)
emitter.emit(mcpw)
This code snippet demonstrates how to fetch existing tags, merge them with new tags, and update the dataset with the combined tags.
Would you like more details on any specific part of this process?
Sources:
- <Slack Slack Discussion>
- <https://github.com/datahub-project/datahub/blob/master/docs/tags.md|DataHub Tags Documentation>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> How can I set the description of a dataset property
To set the description of a dataset property in DataHub, you can use the updateDataset
mutation in GraphQL. This mutation allows you to update various properties of a dataset, including its description. Here’s a step-by-step guide on how to achieve this:
Using GraphQL
- Define the Dataset URN: Identify the URN of the dataset you want to update.
- Create the GraphQL Mutation: Use the
updateDataset
mutation to set the description.
Here is an example of how to do this using a GraphQL mutation:
mutation updateDataset {
updateDataset(
urn: "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
input: {
editableProperties: {
description: "## The Real Estate Sales Dataset\nThis is a really important Dataset that contains all the relevant information about sales that have happened organized by address.\n"
}
}
) {
urn
}
}
Using Curl
You can also use a curl
command to send the GraphQL mutation:
curl --location --request POST 'http://localhost:8080/api/graphql' \
--header 'Authorization: Bearer <my-access-token>' \
--header 'Content-Type: application/json' \
--data-raw '{
"query": "mutation updateDataset { updateDataset(urn: \"urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)\", input: { editableProperties: { description: \"## The Real Estate Sales Dataset\nThis is a really important Dataset that contains all the relevant information about sales that have happened organized by address.\n\" } }) { urn } }",
"variables": {}
}'
Using Python
If you prefer using Python, you can use the DataHub emitter to achieve the same:
import logging
import time
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.schema_classes import (
AuditStampClass,
EditableDatasetPropertiesClass,
)
# Initialize logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
# Define the dataset URN
dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")
# Define the new description
documentation_to_add = "## The Real Estate Sales Dataset\nThis is a really important Dataset that contains all the relevant information about sales that have happened organized by address.\n"
# Initialize DataHubGraph
gms_endpoint = "http://localhost:8080"
graph = DataHubGraph(config=DatahubClientConfig(server=gms_endpoint))
# Get the current editable properties
current_editable_properties = graph.get_aspect(
entity_urn=dataset_urn, aspect_type=EditableDatasetPropertiesClass
)
# Update the description if needed
need_write = False
if current_editable_properties:
if documentation_to_add != current_editable_properties.description:
current_editable_properties.description = documentation_to_add
need_write = True
else:
# Create a new editable dataset properties aspect
now = int(time.time() * 1000) # milliseconds since epoch
current_timestamp = AuditStampClass(time=now, actor="urn:li:corpuser:ingestion")
current_editable_properties = EditableDatasetPropertiesClass(
created=current_timestamp, description=documentation_to_add
)
need_write = True
# Emit the updated properties
if need_write:
event = MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=current_editable_properties
)
graph.emit(event)
<http://log.info|log.info>(f"Documentation added to dataset {dataset_urn}")
else:
<http://log.info|log.info>("Documentation already exists and is identical, omitting write")
This Python script demonstrates how to update the description of a dataset property using the DataHub emitter.
Would you like more details on any specific part of this process?
Sources:
- <Description | DataHub Description on Dataset>
0 button
0 button
Hint: Mention @runllm in the thread for followups.