Preserving Existing Fields in Datasets Using Custom Transformers

Original Slack Thread

<@U06TM7M40RK> Do you know if there exists a transformer that I can apply to a recipe to help preserve existing fields on the dataset if they exists. For more context, I have a service that adds some additional field to an existing tables datasets schemaMetadata, and in parallel, there is a recipe that runs regularly to sync the datasets from BigQuery. I need a transformer that will lookup specific fields I’ve configured to be preserved, such that if they exist in the datasets schema, it will be included in the final schema to be submitted by the BigQuery recipe.

To achieve the preservation of existing fields in a dataset’s schemaMetadata while running a regular ingestion job, you can use a transformer with PATCH semantics. This approach allows you to update the dataset’s schema without overwriting existing fields that you want to preserve.

According to the <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/intro.md|DataHub documentation on transformers>, transformers allow you to modify metadata before it reaches the ingestion sink. You can write a custom transformer to check for specific fields in the schemaMetadata and ensure they are preserved during the ingestion process.

Here’s a general approach to achieve this:

  1. Create a Custom Transformer: Write a custom transformer that checks for the existence of specific fields in the schemaMetadata and ensures they are included in the final schema.

  2. Use PATCH Semantics: Configure the transformer to use PATCH semantics, which allows you to update the schema without overwriting existing fields.

  3. Integrate the Transformer into Your Ingestion Recipe: Add the custom transformer to your ingestion recipe to ensure it is applied during the ingestion process.

Here is a basic example of how you might define a custom transformer in your ingestion recipe:

transformers:
  - type: "custom_transformer.PreserveSchemaFields"
    config:
      fields_to_preserve:
        - "field1"
        - "field2"

In this example, the PreserveSchemaFields transformer would be responsible for checking the schemaMetadata for the specified fields (field1, field2) and ensuring they are preserved during the ingestion process.

Would you like more detailed steps or a code example on how to implement this custom transformer?

Sources:

When you say configure the transformer to use PATCH semantics, can you give an example of this? Since schemaMetadata doesn’t support patching, so how will that work?

<@U06TM7M40RK> When you say configure the transformer to use PATCH semantics, can you give an example of this? Since schemaMetadata doesn’t support patching, so how will that work?

You’re correct that schemaMetadata does not inherently support PATCH semantics in the same way that some other aspects do. However, you can still achieve a similar effect by implementing a custom transformer that manually merges the existing schema fields with the new ones being ingested, effectively preserving the fields you want to keep.

Here’s a conceptual approach to how you might implement this:

  1. Fetch Existing Schema: Before applying the new schema, fetch the existing schemaMetadata from DataHub.

  2. Merge Schemas: Compare the existing schema with the new schema being ingested. For fields that you want to preserve, ensure they are included in the final schema.

  3. Apply Merged Schema: Use the merged schema as the final schema to be submitted during the ingestion process.

Here’s a simplified example of how you might implement this in a custom transformer:

from datahub.ingestion.api.transform import Transformer
from datahub.metadata.schema_classes import SchemaMetadataClass

class PreserveSchemaFields(Transformer):
    def __init__(self, config, ctx):
        self.fields_to_preserve = config.get("fields_to_preserve", [])
        self.ctx = ctx

    def transform(self, entity):
        if hasattr(entity, 'schemaMetadata') and entity.schemaMetadata:
            existing_schema = self.ctx.graph.get_schema_metadata(entity.urn)
            if existing_schema:
                # Merge existing fields with new fields
                for field in existing_schema.fields:
                    if field.fieldPath in self.fields_to_preserve:
                        # Ensure the field is preserved
                        entity.schemaMetadata.fields.append(field)
        return entity

    @classmethod
    def create(cls, config_dict, ctx):
        return cls(config_dict, ctx)

In this example, the PreserveSchemaFields transformer checks for fields specified in fields_to_preserve and ensures they are included in the final schema. The ctx.graph.get_schema_metadata(entity.urn) call is used to fetch the existing schema from DataHub.

Note: This approach requires that you have access to the DataHub graph API to fetch existing metadata, which is typically available when using a REST sink.

Would you like more detailed steps or further assistance on implementing this approach?

Sources:

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><