<@U06TM7M40RK> Do you know if there exists a transformer that I can apply to a recipe to help preserve existing fields on the dataset if they exists. For more context, I have a service that adds some additional field to an existing tables datasets schemaMetadata, and in parallel, there is a recipe that runs regularly to sync the datasets from BigQuery. I need a transformer that will lookup specific fields I’ve configured to be preserved, such that if they exist in the datasets schema, it will be included in the final schema to be submitted by the BigQuery recipe.
To achieve the preservation of existing fields in a dataset’s schemaMetadata
while running a regular ingestion job, you can use a transformer with PATCH semantics. This approach allows you to update the dataset’s schema without overwriting existing fields that you want to preserve.
According to the <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/intro.md|DataHub documentation on transformers>, transformers allow you to modify metadata before it reaches the ingestion sink. You can write a custom transformer to check for specific fields in the schemaMetadata
and ensure they are preserved during the ingestion process.
Here’s a general approach to achieve this:
-
Create a Custom Transformer: Write a custom transformer that checks for the existence of specific fields in the
schemaMetadata
and ensures they are included in the final schema. -
Use PATCH Semantics: Configure the transformer to use PATCH semantics, which allows you to update the schema without overwriting existing fields.
-
Integrate the Transformer into Your Ingestion Recipe: Add the custom transformer to your ingestion recipe to ensure it is applied during the ingestion process.
Here is a basic example of how you might define a custom transformer in your ingestion recipe:
transformers:
- type: "custom_transformer.PreserveSchemaFields"
config:
fields_to_preserve:
- "field1"
- "field2"
In this example, the PreserveSchemaFields
transformer would be responsible for checking the schemaMetadata
for the specified fields (field1
, field2
) and ensuring they are preserved during the ingestion process.
Would you like more detailed steps or a code example on how to implement this custom transformer?
Sources:
- <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/intro.md|DataHub documentation on transformers>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
When you say configure the transformer to use PATCH semantics, can you give an example of this? Since schemaMetadata doesn’t support patching, so how will that work?
<@U06TM7M40RK> When you say configure the transformer to use PATCH semantics, can you give an example of this? Since schemaMetadata doesn’t support patching, so how will that work?
You’re correct that schemaMetadata
does not inherently support PATCH semantics in the same way that some other aspects do. However, you can still achieve a similar effect by implementing a custom transformer that manually merges the existing schema fields with the new ones being ingested, effectively preserving the fields you want to keep.
Here’s a conceptual approach to how you might implement this:
-
Fetch Existing Schema: Before applying the new schema, fetch the existing
schemaMetadata
from DataHub. -
Merge Schemas: Compare the existing schema with the new schema being ingested. For fields that you want to preserve, ensure they are included in the final schema.
-
Apply Merged Schema: Use the merged schema as the final schema to be submitted during the ingestion process.
Here’s a simplified example of how you might implement this in a custom transformer:
from datahub.ingestion.api.transform import Transformer
from datahub.metadata.schema_classes import SchemaMetadataClass
class PreserveSchemaFields(Transformer):
def __init__(self, config, ctx):
self.fields_to_preserve = config.get("fields_to_preserve", [])
self.ctx = ctx
def transform(self, entity):
if hasattr(entity, 'schemaMetadata') and entity.schemaMetadata:
existing_schema = self.ctx.graph.get_schema_metadata(entity.urn)
if existing_schema:
# Merge existing fields with new fields
for field in existing_schema.fields:
if field.fieldPath in self.fields_to_preserve:
# Ensure the field is preserved
entity.schemaMetadata.fields.append(field)
return entity
@classmethod
def create(cls, config_dict, ctx):
return cls(config_dict, ctx)
In this example, the PreserveSchemaFields
transformer checks for fields specified in fields_to_preserve
and ensures they are included in the final schema. The ctx.graph.get_schema_metadata(entity.urn)
call is used to fetch the existing schema from DataHub.
Note: This approach requires that you have access to the DataHub graph API to fetch existing metadata, which is typically available when using a REST sink.
Would you like more detailed steps or further assistance on implementing this approach?
Sources:
- https://datahubspace.slack.com/archives/CUMUWQU66/p1722867483.803079|https://datahubspace.slack.com/archives/CUMUWQU66/p1722867483.803079
- https://datahubspace.slack.com/archives/CUMUWQU66/p1722410900.756849|https://datahubspace.slack.com/archives/CUMUWQU66/p1722410900.756849
1 button
0 button
Hint: Mention <@U06TM7M40RK> in the thread for followups.
Hi ! Did this interaction resolve your issue or do you still require assistance ?
Please tag @ RunLLM if you still need further assistance !
Or tag @Datahub Community Support if you require a person to take a look
Hope this helps ! ><