<@U06TM7M40RK> I have a custom transformer to add some tags to Glue entities. However, the tags that I’m going to add in the transformer depend on the location of the table, which is a property in the glue entity? How can I access to that property in the custom transformer?
It seems likely that you can access properties of Glue entities within a custom transformer by utilizing the DataHub ingestion framework. Specifically, you can extend the Transformer
base class and implement the transform_aspect
method to access and modify the metadata aspects of the entities.
Based on the <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/dataset_transformer.md|DataHub documentation>, you can define a custom transformer by creating a class that inherits from BaseTransformer
and SingleAspectTransformer
. This allows you to focus on transforming specific aspects of the metadata, such as tags.
Here’s a basic outline of how you might implement this:
-
Define the Transformer Class: Create a class that extends
BaseTransformer
andSingleAspectTransformer
. -
Implement the
transform_aspect
Method: This method will be called with the entity’s URN, aspect name, and the current aspect value. You can use this method to access the properties of the Glue entity and decide which tags to add based on the location property. -
Access Entity Properties: Within the
transform_aspect
method, you can access the properties of the Glue entity. You might need to parse the aspect to extract the location property.
Here’s a simplified example in Python:
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import TagAssociationClass
class CustomTagTransformer(BaseTransformer, SingleAspectTransformer):
def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[dict]) -> Optional[dict]:
# Check if the aspect contains the location property
if aspect and 'location' in aspect:
location = aspect['location']
# Determine tags based on location
tags_to_add = self.determine_tags_based_on_location(location)
# Add tags to the aspect
if 'tags' not in aspect:
aspect['tags'] = []
aspect['tags'].extend(tags_to_add)
return aspect
def determine_tags_based_on_location(self, location: str) -> List[TagAssociationClass]:
# Custom logic to determine tags based on location
tags = []
if location == 'desired_location':
tags.append(TagAssociationClass(tag='urn:li:tag:desiredTag'))
return tags
Would you like more detailed guidance on implementing this or any specific part of the process?
Sources:
- <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/dataset_transformer.md|DataHub documentation>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> The point is that my aspect argument won’t contain all aspects, only the aspects previously filtered out by aspect_name
method right?
It seems likely that your understanding is correct. In the DataHub ingestion framework, when you implement a custom transformer by extending BaseTransformer
and SingleAspectTransformer
, the transform_aspect
method is designed to operate on a specific aspect of an entity. This means that the aspect
argument you receive in the transform_aspect
method will only contain the aspect that matches the aspect_name
method you define in your transformer.
According to the <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/dataset_transformer.md|DataHub documentation>, the aspect_name
method is used to specify which aspect the transformer is interested in. The framework will then ensure that the transform_aspect
method is only called for entities that have this specific aspect.
Here’s a brief overview of how this works:
-
Define
aspect_name
Method: In your transformer, you define theaspect_name
method to return the name of the aspect you want to transform. For example, if you’re interested in transforming the “tags” aspect, you would return “tags” from this method. -
Receive Specific Aspect: The
transform_aspect
method will be called with the specific aspect that matches the name returned byaspect_name
. You won’t receive all aspects of the entity, only the one you’re interested in. -
Transform the Aspect: You can then perform your transformation logic on this specific aspect.
If you need to access other aspects of the entity, you might need to make additional queries to the DataHub graph to retrieve them, as the transform_aspect
method will not provide them directly.
Would you like more detailed guidance on how to implement this or any specific part of the process?
Sources:
- <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/dataset_transformer.md|DataHub documentation>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> Yeah, but datahub graph is going to get the entity (Glue table) from DataHub, but the first time that the entity is loaded, that entity doesn’t exist in DataHub yet
It seems likely that you’re encountering a common challenge when dealing with metadata ingestion for entities that are being ingested into DataHub for the first time. In such cases, the entity does not yet exist in DataHub, so you cannot rely on querying the DataHub graph to retrieve additional aspects or properties.
To address this, you can consider the following approach:
-
Accessing Properties During Ingestion: When you are ingesting metadata for the first time, you should have access to the raw metadata from the source system (e.g., AWS Glue). This raw metadata should include all the necessary properties, such as the location of the table, which you can use to determine the tags to add.
-
Custom Transformer Logic: In your custom transformer, you can implement logic to extract the necessary properties from the raw metadata during the ingestion process. This means you would need to ensure that your ingestion pipeline is set up to pass the relevant metadata to your transformer.
-
Use of Pipeline Context: According to the <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/dataset_transformer.md|DataHub documentation>, you can use the
PipelineContext
to access additional information during the ingestion process. This context can be used to pass any additional data you might need for your transformation logic.
Here’s a conceptual example of how you might implement this:
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.ingestion.api.common import PipelineContext
from datahub.metadata.schema_classes import TagAssociationClass
class CustomTagTransformer(BaseTransformer, SingleAspectTransformer):
def __init__(self, config, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[dict]) -> Optional[dict]:
# Access raw metadata from the context or config
raw_metadata = self.ctx.get_raw_metadata(entity_urn)
if raw_metadata and 'location' in raw_metadata:
location = raw_metadata['location']
# Determine tags based on location
tags_to_add = self.determine_tags_based_on_location(location)
# Add tags to the aspect
if 'tags' not in aspect:
aspect['tags'] = []
aspect['tags'].extend(tags_to_add)
return aspect
def determine_tags_based_on_location(self, location: str) -> List[TagAssociationClass]:
# Custom logic to determine tags based on location
tags = []
if location == 'desired_location':
tags.append(TagAssociationClass(tag='urn:li:tag:desiredTag'))
return tags
This example assumes that you have a way to access the raw metadata through the PipelineContext
or some other mechanism in your ingestion pipeline.
Would you like more detailed guidance on implementing this or any specific part of the process?
Sources:
- <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/dataset_transformer.md|DataHub documentation>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> context doesn’t have get_raw_metadata
method
It seems that the PipelineContext
in DataHub does not have a get_raw_metadata
method, as you correctly pointed out. My apologies for the oversight. In the DataHub ingestion framework, the PipelineContext
is primarily used to provide shared resources and configurations across different components of the ingestion pipeline, but it does not directly provide access to raw metadata.
To access properties of the Glue entity, such as the location, during the ingestion process, you will need to ensure that your ingestion source is set up to pass this information to your transformer. Here’s how you can approach this:
-
Modify the Ingestion Source: Ensure that your ingestion source is extracting the necessary properties (e.g., location) from the Glue entities and including them in the metadata that is passed to the transformer.
-
Pass Metadata to Transformer: When the ingestion source creates metadata work units, it should include the necessary properties in the aspects or in a way that the transformer can access them.
-
Access Properties in Transformer: In your custom transformer, you can access these properties from the aspect being transformed. If the properties are not part of the aspect you are transforming, you may need to modify the ingestion source to include them in a way that the transformer can access.
Here’s a conceptual example of how you might implement this:
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import TagAssociationClass
class CustomTagTransformer(BaseTransformer, SingleAspectTransformer):
def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[dict]) -> Optional[dict]:
# Check if the aspect contains the location property
if aspect and 'location' in aspect:
location = aspect['location']
# Determine tags based on location
tags_to_add = self.determine_tags_based_on_location(location)
# Add tags to the aspect
if 'tags' not in aspect:
aspect['tags'] = []
aspect['tags'].extend(tags_to_add)
return aspect
def determine_tags_based_on_location(self, location: str) -> List[TagAssociationClass]:
# Custom logic to determine tags based on location
tags = []
if location == 'desired_location':
tags.append(TagAssociationClass(tag='urn:li:tag:desiredTag'))
return tags
In this example, it is assumed that the location
property is included in the aspect being transformed. You may need to adjust your ingestion source to ensure this property is available to the transformer.
Would you like more detailed guidance on modifying the ingestion source or any specific part of the process?
Sources:
- <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/dataset_transformer.md|DataHub documentation>
0 button
1 button
Hint: Mention <@U06TM7M40RK> in the thread for followups.
Hi ! Did this interaction resolve your issue or do you still require assistance ?
Please tag @ RunLLM if you still need further assistance !
Or tag @Datahub Community Support if you require a person to take a look
Hope this helps ! ><