Handling Table Lineage for Incremental Logic with dbt Core

Yes, the approach of using a custom transformer can work for both the BigQuery source and the dbt source. The custom transformer will process metadata events regardless of the source, as long as the transformer logic is correctly implemented and integrated into the ingestion pipeline.

Step-by-Step Guide for BigQuery Source

Here’s how you can set up and use a custom transformer with the BigQuery source in addition to the dbt source:

  1. Create a ConfigMap: Store the custom transformer code in a ConfigMap.
  2. Update the Kubernetes Deployment: Modify your Kubernetes deployment to mount the ConfigMap into the ingestion pod.
  3. Update the Ingestion Recipe: Ensure the ingestion recipe references the custom transformer.

Step 1: Create a ConfigMap

  1. Write the Custom Transformer Code:
    • Create a Python file, e.g., custom_transformer.py, with your custom transformer code.
# custom_transformer.py
import re
from typing import List, Optional

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass

class HandleTemporaryTablesConfig(ConfigModel):
    # Add any configuration parameters your transformer needs here
    pass

class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
    ctx: PipelineContext
    config: HandleTemporaryTablesConfig

    def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
        super().__init__()
        self.ctx = ctx
        self.config = config

    @classmethod
    def create(cls, config_dict: dict, ctx: PipelineContext) -> "HandleTemporaryTables":
        config = HandleTemporaryTablesConfig.parse_obj(config_dict)
        return cls(config, ctx)

    def entity_types(self) -> List[str]:
        return ["dataset"]

    def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[MetadataChangeEventClass]) -> Optional[MetadataChangeEventClass]:
        # Implement your logic to handle temporary tables and generate lineage
        if "dbt_tmp" in entity_urn:
            # Example logic to handle temporary tables
            new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
            aspect.urn = new_urn
        return aspect
  1. Create the ConfigMap:
    • Use kubectl to create a ConfigMap from the custom_transformer.py file.
kubectl create configmap custom-transformer --from-file=custom_transformer.py -n <namespace>

Step 2: Update the Kubernetes Deployment

  1. Modify the Helm values.yaml File:
    • Update the values.yaml file of your Helm chart to mount the ConfigMap into the ingestion pod.

Here’s an example values.yaml file:

# values.yaml

# Ingestion job configuration
ingestion:
  extraVolumes:
    - name: custom-transformer
      configMap:
        name: custom-transformer
  extraVolumeMounts:
    - name: custom-transformer
      mountPath: /app/custom_transformer

  config:
    # Add any configuration parameters your transformer needs here
  resources:
    limits:
      cpu: "2"
      memory: "4Gi"
    requests:
      cpu: "1"
      memory: "2Gi"

# Other configurations
datahub-gms:
  extraEnv:
    - name: METADATA_SERVICE_AUTH_ENABLED
      value: "true"

datahub-frontend:
  extraEnv:
    - name: METADATA_SERVICE_AUTH_ENABLED
      value: "true"

# Add any other necessary configurations here
  1. Deploy the Helm Chart:
    • Apply the updated Helm chart to your Kubernetes cluster.
helm upgrade --install datahub datahub/datahub -f values.yaml
  1. Verify the Deployment:
    • Ensure that the ingestion pod has the ConfigMap mounted correctly.
kubectl get pods
kubectl describe pod &lt;ingestion-pod-name&gt;
  1. Monitor Logs:
    • Monitor the logs of the ingestion job to ensure that it is running correctly.
kubectl logs -f &lt;ingestion-pod-name&gt;

Step 3: Update the Ingestion Recipe

Ensure the ingestion recipe references the custom transformer. Here’s an example ingestion recipe YAML file for both BigQuery and dbt sources:

BigQuery Source

source:
  type: "bigquery"
  config:
    project_id: "your_project_id"
    options:
      credentials_path: "/path/to/credentials.json"

transformers:
  - type: "custom_transformer.HandleTemporaryTables"
    config:
      # Add any configuration parameters your transformer needs here

sink:
  type: "datahub-rest"
  config:
    server: "<http://localhost:8080>"

dbt Source

source:
  type: "dbt"
  config:
    manifest_path: "/path/to/manifest.json"
    catalog_path: "/path/to/catalog.json"
    sources_path: "/path/to/sources.json"

transformers:
  - type: "custom_transformer.HandleTemporaryTables"
    config:
      # Add any configuration parameters your transformer needs here

sink:
  type: "datahub-rest"
  config:
    server: "<http://localhost:8080>"

Running the Ingestion

To run the ingestion with the above recipes, use the following command:

datahub ingest -c /path/to/your/recipe.yaml

Summary

By following these steps, you can use Kubernetes ConfigMaps to store your custom transformer code and mount it into the ingestion pod. This ensures that the custom transformer is accessible to the ingestion process for both BigQuery and dbt sources.

Would you like more detailed steps or further assistance on any specific part of this process?

Sources:

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><