Yes, the approach of using a custom transformer can work for both the BigQuery source and the dbt source. The custom transformer will process metadata events regardless of the source, as long as the transformer logic is correctly implemented and integrated into the ingestion pipeline.
Step-by-Step Guide for BigQuery Source
Here’s how you can set up and use a custom transformer with the BigQuery source in addition to the dbt source:
- Create a ConfigMap: Store the custom transformer code in a ConfigMap.
- Update the Kubernetes Deployment: Modify your Kubernetes deployment to mount the ConfigMap into the ingestion pod.
- Update the Ingestion Recipe: Ensure the ingestion recipe references the custom transformer.
Step 1: Create a ConfigMap
- Write the Custom Transformer Code:
- Create a Python file, e.g.,
custom_transformer.py
, with your custom transformer code.
- Create a Python file, e.g.,
# custom_transformer.py
import re
from typing import List, Optional
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass
class HandleTemporaryTablesConfig(ConfigModel):
# Add any configuration parameters your transformer needs here
pass
class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
ctx: PipelineContext
config: HandleTemporaryTablesConfig
def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "HandleTemporaryTables":
config = HandleTemporaryTablesConfig.parse_obj(config_dict)
return cls(config, ctx)
def entity_types(self) -> List[str]:
return ["dataset"]
def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[MetadataChangeEventClass]) -> Optional[MetadataChangeEventClass]:
# Implement your logic to handle temporary tables and generate lineage
if "dbt_tmp" in entity_urn:
# Example logic to handle temporary tables
new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
aspect.urn = new_urn
return aspect
- Create the ConfigMap:
- Use
kubectl
to create a ConfigMap from thecustom_transformer.py
file.
- Use
kubectl create configmap custom-transformer --from-file=custom_transformer.py -n <namespace>
Step 2: Update the Kubernetes Deployment
- Modify the Helm
values.yaml
File:- Update the
values.yaml
file of your Helm chart to mount the ConfigMap into the ingestion pod.
- Update the
Here’s an example values.yaml
file: