Handling Table Lineage for Incremental Logic with dbt Core

Original Slack Thread

<@U06TM7M40RK> Our tech set up is bigquery as the data warehouse, dbt core as the transformation layer and looker as the visualisation. I have a bigquery source which is ingesting data, how does the table lineage handle incremental logic created by dbt core? in particular, there is an insert_overwrite incremental strategy which creates a tmp table which it deletes e.g. customers__dbt_tmp, how is this factored into the lineage

To address how DataHub handles table lineage for incremental logic created by dbt core, particularly with the insert_overwrite incremental strategy that creates and deletes temporary tables, here are the key points:

  1. Lineage Extraction from dbt:

  2. Handling Temporary Tables:

    • When dbt uses the insert_overwrite strategy, it creates temporary tables (e.g., customers__dbt_tmp). These temporary tables are typically not included in the final lineage because they are transient and do not persist beyond the transformation process.
    • According to <Issues · datahub-project/datahub · GitHub GitHub issue>, lineage for temporary tables created during dbt transformations is not always captured accurately. The lineage might only show the final persistent tables, omitting the temporary tables used in the process.
  3. Lineage Visualization:

    • DataHub’s lineage visualization will show the flow of data from source tables to the final target tables. However, the intermediate steps involving temporary tables might not be visible unless explicitly configured.
    • For more accurate lineage, including temporary tables, you might need to customize the ingestion process or use SQL parsing to capture these intermediate steps (https://datahubproject.io/docs/0.14.0/generated/lineage/lineage-feature-guide|source).
  4. Custom Transformers:

Would you like more detailed steps on how to implement custom transformers or configure the ingestion process to capture temporary tables in the lineage?

Sources:

<@U06TM7M40RK> will the data ingested from dbt get around the temporary table issue?

Ingesting data from dbt into DataHub can help mitigate some of the issues related to temporary tables, but it may not completely resolve them. Here are the key points:

  1. dbt Ingestion:

  2. Temporary Tables Handling:

    • dbt’s insert_overwrite strategy creates temporary tables (e.g., customers__dbt_tmp) that are used during the transformation process and then deleted. These temporary tables are typically not included in the final lineage captured by dbt.
    • According to <Issues · datahub-project/datahub · GitHub GitHub issue>, the lineage for temporary tables created during dbt transformations might not be fully captured. The lineage might only show the final persistent tables, omitting the temporary tables used in the process.
  3. Lineage Visualization:

    • DataHub’s lineage visualization will show the flow of data from source tables to the final target tables. However, the intermediate steps involving temporary tables might not be visible unless explicitly configured.
    • For more accurate lineage, including temporary tables, you might need to customize the ingestion process or use SQL parsing to capture these intermediate steps (https://datahubproject.io/docs/0.14.0/generated/lineage/lineage-feature-guide|source).
  4. Custom Transformers:

In summary, while dbt ingestion into DataHub can help capture some lineage information, it may not fully address the temporary table issue. Custom solutions, such as SQL parsing and custom transformers, may be required for more comprehensive lineage capture.

Would you like more detailed steps on how to implement custom transformers or configure the ingestion process to capture temporary tables in the lineage?

Sources:

<@U06TM7M40RK> give me an example of a custom transformer and how that works for temporary tables created by dbts logic

To create a custom transformer that handles temporary tables created by dbt’s logic, you need to write a custom transformer in Python. This transformer will parse the SQL statements to identify the creation and deletion of temporary tables and map these to the final lineage.

Here’s a step-by-step example:

  1. Set Up Your Environment: Ensure you have a Python environment set up with the necessary dependencies for DataHub.

  2. Create the Custom Transformer: Write a custom transformer class that parses the SQL and generates fine-grained lineage.

  3. Define the Configuration: Create a configuration class for your transformer.

  4. Implement the Transformation Logic: Write the logic to handle temporary tables and generate lineage.

  5. Integrate the Transformer into Your Ingestion Recipe: Add the custom transformer to your ingestion recipe.

Step 1: Set Up Your Environment

Ensure you have a Python environment set up with the necessary dependencies for DataHub. You can install the DataHub ingestion package using pip:

pip install acryl-datahub

Step 2: Create the Custom Transformer

Create a Python file, e.g., custom_transformer.py, and define your custom transformer class:

import re
from typing import List

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass

class HandleTemporaryTablesConfig(ConfigModel):
    # Add any configuration parameters your transformer needs here
    pass

class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
    ctx: PipelineContext
    config: HandleTemporaryTablesConfig

    def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
        super().__init__()
        self.ctx = ctx
        self.config = config

    @classmethod
    def create(cls, config_dict: dict, ctx: PipelineContext) -> "HandleTemporaryTables":
        config = HandleTemporaryTablesConfig.parse_obj(config_dict)
        return cls(config, ctx)

    def entity_types(self) -> List[str]:
        return ["dataset"]

    def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: MetadataChangeEventClass) -> MetadataChangeEventClass:
        # Implement your logic to handle temporary tables and generate lineage
        if "dbt_tmp" in entity_urn:
            # Example logic to handle temporary tables
            new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
            aspect.urn = new_urn
        return aspect

Step 3: Define the Configuration

Define a configuration class for your transformer. This class will hold any configuration parameters your transformer needs.

Step 4: Implement the Transformation Logic

In the transform_aspect method, implement the logic to handle temporary tables and generate lineage. In this example, we use a regular expression to replace the temporary table suffix in the URN.

Step 5: Integrate the Transformer into Your Ingestion Recipe

Add the custom transformer to your ingestion recipe YAML file:

source:
  type: "dbt"
  config:
    manifest_path: "/path/to/manifest.json"
    catalog_path: "/path/to/catalog.json"
    sources_path: "/path/to/sources.json"

transformers:
  - type: "custom_transformer.HandleTemporaryTables"
    config:
      # Add any configuration parameters your transformer needs here

sink:
  type: "datahub-rest"
  config:
    server: "<http://localhost:8080>"

Running the Ingestion

To run the ingestion with the above recipe, use the following command:

datahub ingest -c /path/to/your/recipe.yaml

Summary

This example demonstrates how to create a custom transformer to handle temporary tables created by dbt’s logic and generate lineage before they are ingested into DataHub. You can customize the transformation logic as needed to fit your specific requirements.

Would you like more detailed steps or a code example on how to implement this?

Sources:

<@U06TM7M40RK> where should the custom_transformer.py be saved? can you expand on step 3 & 4

To ensure your custom transformer is correctly integrated into your DataHub ingestion pipeline, you need to save the custom_transformer.py file in a location where it can be discovered by the ingestion process. Typically, this would be in the same directory as your ingestion recipe or in a directory that is included in your Python path.

Step 3: Define the Configuration

In this step, you define a configuration class for your transformer. This class will hold any configuration parameters your transformer needs. Here’s an example of how to define the configuration class:

from datahub.configuration.common import ConfigModel

class HandleTemporaryTablesConfig(ConfigModel):
    # Add any configuration parameters your transformer needs here
    pass

This class inherits from ConfigModel, which is part of the DataHub configuration framework. You can add any parameters that your transformer might need. For example, if you need to pass specific patterns or rules for handling temporary tables, you can define them here.

Step 4: Implement the Transformation Logic

In this step, you implement the logic to handle temporary tables and generate lineage. The transform_aspect method is where you write the core logic of your transformer. Here’s an expanded example:

import re
from typing import List, Optional

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass

class HandleTemporaryTablesConfig(ConfigModel):
    # Add any configuration parameters your transformer needs here
    pass

class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
    ctx: PipelineContext
    config: HandleTemporaryTablesConfig

    def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
        super().__init__()
        self.ctx = ctx
        self.config = config

    @classmethod
    def create(cls, config_dict: dict, ctx: PipelineContext) -> "HandleTemporaryTables":
        config = HandleTemporaryTablesConfig.parse_obj(config_dict)
        return cls(config, ctx)

    def entity_types(self) -> List[str]:
        return ["dataset"]

    def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[MetadataChangeEventClass]) -> Optional[MetadataChangeEventClass]:
        # Implement your logic to handle temporary tables and generate lineage
        if "dbt_tmp" in entity_urn:
            # Example logic to handle temporary tables
            new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
            aspect.urn = new_urn
        return aspect

Detailed Explanation of transform_aspect Method

  1. Check for Temporary Tables:

    • The method checks if the entity_urn contains the substring dbt_tmp, which indicates a temporary table created by dbt.
  2. Modify the URN:

    • If a temporary table is detected, the method uses a regular expression to remove the __dbt_tmp suffix from the entity_urn.
  3. Return the Modified Aspect:

    • The modified URN is then set in the aspect object, and the method returns the updated aspect.

Saving the custom_transformer.py File

Save the custom_transformer.py file in the same directory as your ingestion recipe or in a directory that is included in your Python path. For example, if your ingestion recipe is located at /path/to/your/recipe.yaml, you can save the custom_transformer.py file in the same directory:

/path/to/your/recipe.yaml
/path/to/your/custom_transformer.py

Integrating the Transformer into Your Ingestion Recipe

Add the custom transformer to your ingestion recipe YAML file:

source:
  type: "dbt"
  config:
    manifest_path: "/path/to/manifest.json"
    catalog_path: "/path/to/catalog.json"
    sources_path: "/path/to/sources.json"

transformers:
  - type: "custom_transformer.HandleTemporaryTables"
    config:
      # Add any configuration parameters your transformer needs here

sink:
  type: "datahub-rest"
  config:
    server: "<http://localhost:8080>"

Running the Ingestion

To run the ingestion with the above recipe, use the following command:

datahub ingest -c /path/to/your/recipe.yaml

This setup ensures that your custom transformer is correctly integrated into the DataHub ingestion pipeline and can handle temporary tables created by dbt’s logic.

Would you like more detailed steps or further assistance on any specific part of this process?

Sources:

<@U06TM7M40RK> if i am doing UI based ingestion, how can i save the custom_transformer.py file

Currently, DataHub’s UI-based ingestion does not support the direct use of custom transformers. Custom transformers need to be installed in the Python environment where the ingestion process runs, which is typically done via the CLI. Here are the key points and a workaround:

Key Points

  1. UI-Based Ingestion Limitation:

    • As per the <Ingestion | DataHub documentation> and <Slack discussions>, it is not currently possible to install custom transformers directly in the action pod where UI ingestion runs.
  2. Workaround:

    • You can use the CLI for ingestion, which allows you to include custom transformers. This involves setting up your Python environment, writing the custom transformer, and running the ingestion process via the CLI.

Workaround: Using CLI for Ingestion

Here’s how you can set up and use a custom transformer with the CLI:

  1. Set Up Your Environment:

    • Ensure you have a Python environment set up with the necessary dependencies for DataHub.
  2. Create the Custom Transformer:

    • Write your custom transformer in a Python file, e.g., custom_transformer.py.
  3. Save the Custom Transformer:

    • Save the custom_transformer.py file in the same directory as your ingestion recipe or in a directory that is included in your Python path.
  4. Ingestion Recipe:

    • Create an ingestion recipe YAML file that includes the custom transformer.
  5. Run the Ingestion:

    • Use the DataHub CLI to run the ingestion process.

Example Steps

Step 1: Set Up Your Environment

Install the DataHub ingestion package using pip:

pip install acryl-datahub

Step 2: Create the Custom Transformer

Create a Python file, e.g., custom_transformer.py, with the following content:

import re
from typing import List, Optional

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass

class HandleTemporaryTablesConfig(ConfigModel):
    # Add any configuration parameters your transformer needs here
    pass

class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
    ctx: PipelineContext
    config: HandleTemporaryTablesConfig

    def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
        super().__init__()
        self.ctx = ctx
        self.config = config

    @classmethod
    def create(cls, config_dict: dict, ctx: PipelineContext) -&gt; "HandleTemporaryTables":
        config = HandleTemporaryTablesConfig.parse_obj(config_dict)
        return cls(config, ctx)

    def entity_types(self) -&gt; List[str]:
        return ["dataset"]

    def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[MetadataChangeEventClass]) -&gt; Optional[MetadataChangeEventClass]:
        # Implement your logic to handle temporary tables and generate lineage
        if "dbt_tmp" in entity_urn:
            # Example logic to handle temporary tables
            new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
            aspect.urn = new_urn
        return aspect

Step 3: Save the Custom Transformer

Save the custom_transformer.py file in the same directory as your ingestion recipe:

/path/to/your/recipe.yaml
/path/to/your/custom_transformer.py

Step 4: Ingestion Recipe

Create an ingestion recipe YAML file, e.g., recipe.yaml, with the following content:

source:
  type: "dbt"
  config:
    manifest_path: "/path/to/manifest.json"
    catalog_path: "/path/to/catalog.json"
    sources_path: "/path/to/sources.json"

transformers:
  - type: "custom_transformer.HandleTemporaryTables"
    config:
      # Add any configuration parameters your transformer needs here

sink:
  type: "datahub-rest"
  config:
    server: "<http://localhost:8080>"

Step 5: Run the Ingestion

Use the DataHub CLI to run the ingestion process:

datahub ingest -c /path/to/your/recipe.yaml

Summary

While UI-based ingestion does not currently support custom transformers, you can achieve the desired functionality using the CLI. This involves setting up your environment, writing the custom transformer, and running the ingestion process via the CLI.

Would you like more detailed steps or further assistance on any specific part of this process?

Sources:

<@U06TM7M40RK> how do i save the custom_transformer.py to the same place if im running datahub in a kubernetes cluster

To use a custom transformer in a Kubernetes cluster where DataHub is deployed, you need to ensure that the custom transformer code is accessible to the ingestion process. Here are the steps to achieve this:

  1. Create a Custom Docker Image: Build a Docker image that includes your custom transformer.
  2. Push the Docker Image to a Registry: Push the custom Docker image to a container registry.
  3. Update the Kubernetes Deployment: Modify your Kubernetes deployment to use the custom Docker image.

Step-by-Step Guide

Step 1: Create a Custom Docker Image

  1. Create the Custom Transformer:

    • Write your custom transformer in a Python file, e.g., custom_transformer.py.
  2. Create a Dockerfile:

    • Create a Dockerfile that includes the custom transformer and the necessary dependencies.

Here’s an example Dockerfile:

FROM python:3.10-slim

WORKDIR /app

# Copy the custom transformer code
COPY custom_transformer.py /app/custom_transformer.py

# Install DataHub ingestion package
RUN pip install acryl-datahub

# Install any other dependencies your transformer might need
# RUN pip install <other-dependencies>

ENTRYPOINT ["datahub"]
  1. Build the Docker Image:
    • Build the Docker image using the Dockerfile.
docker build -t your-docker-repo/datahub-ingestion-custom:latest .

Step 2: Push the Docker Image to a Registry

  1. Push the Docker Image:
    • Push the custom Docker image to a container registry (e.g., Docker Hub, Google Container Registry, Amazon ECR).
docker push your-docker-repo/datahub-ingestion-custom:latest

Step 3: Update the Kubernetes Deployment

  1. Modify the Helm values.yaml File:
    • Update the values.yaml file of your Helm chart to use the custom Docker image for the ingestion job.

Here’s an example values.yaml file:

# values.yaml

# Ingestion job configuration
ingestion:
  image:
    repository: your-docker-repo/datahub-ingestion-custom
    tag: latest
  config:
    # Add any configuration parameters your transformer needs here
  resources:
    limits:
      cpu: "2"
      memory: "4Gi"
    requests:
      cpu: "1"
      memory: "2Gi"

# Other configurations
datahub-gms:
  extraEnv:
    - name: METADATA_SERVICE_AUTH_ENABLED
      value: "true"

datahub-frontend:
  extraEnv:
    - name: METADATA_SERVICE_AUTH_ENABLED
      value: "true"

# Add any other necessary configurations here
  1. Deploy the Helm Chart:
    • Apply the updated Helm chart to your Kubernetes cluster.
helm upgrade --install datahub datahub/datahub -f values.yaml
  1. Verify the Deployment:
    • Ensure that the ingestion job is using the custom Docker image and that it has the necessary resources allocated.
kubectl get pods
kubectl describe pod <ingestion-pod-name>
  1. Monitor Logs:
    • Monitor the logs of the ingestion job to ensure that it is running correctly.
kubectl logs -f <ingestion-pod-name>

Example Custom Transformer Code

Here’s an example of the custom_transformer.py file:

import re
from typing import List, Optional

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass

class HandleTemporaryTablesConfig(ConfigModel):
    # Add any configuration parameters your transformer needs here
    pass

class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
    ctx: PipelineContext
    config: HandleTemporaryTablesConfig

    def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
        super().__init__()
        self.ctx = ctx
        self.config = config

    @classmethod
    def create(cls, config_dict: dict, ctx: PipelineContext) -&gt; "HandleTemporaryTables":
        config = HandleTemporaryTablesConfig.parse_obj(config_dict)
        return cls(config, ctx)

    def entity_types(self) -&gt; List[str]:
        return ["dataset"]

    def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[MetadataChangeEventClass]) -&gt; Optional[MetadataChangeEventClass]:
        # Implement your logic to handle temporary tables and generate lineage
        if "dbt_tmp" in entity_urn:
            # Example logic to handle temporary tables
            new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
            aspect.urn = new_urn
        return aspect

Summary

By following these steps, you can create a custom Docker image that includes your custom transformer and deploy it in your Kubernetes cluster. This ensures that the custom transformer is accessible to the ingestion process.

Would you like more detailed steps or further assistance on any specific part of this process?

Sources:

<@U06TM7M40RK> is there any way to do it other than a docker image

If you prefer not to create a custom Docker image, you can use Kubernetes ConfigMaps to store your custom transformer code and mount it into the ingestion pod. Here’s how you can achieve this:

Step-by-Step Guide

  1. Create a ConfigMap: Store the custom transformer code in a ConfigMap.
  2. Update the Kubernetes Deployment: Modify your Kubernetes deployment to mount the ConfigMap into the ingestion pod.
  3. Update the Ingestion Recipe: Ensure the ingestion recipe references the custom transformer.

Step 1: Create a ConfigMap

  1. Write the Custom Transformer Code:
    • Create a Python file, e.g., custom_transformer.py, with your custom transformer code.
# custom_transformer.py
import re
from typing import List, Optional

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass

class HandleTemporaryTablesConfig(ConfigModel):
    # Add any configuration parameters your transformer needs here
    pass

class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
    ctx: PipelineContext
    config: HandleTemporaryTablesConfig

    def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
        super().__init__()
        self.ctx = ctx
        self.config = config

    @classmethod
    def create(cls, config_dict: dict, ctx: PipelineContext) -> "HandleTemporaryTables":
        config = HandleTemporaryTablesConfig.parse_obj(config_dict)
        return cls(config, ctx)

    def entity_types(self) -> List[str]:
        return ["dataset"]

    def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[MetadataChangeEventClass]) -> Optional[MetadataChangeEventClass]:
        # Implement your logic to handle temporary tables and generate lineage
        if "dbt_tmp" in entity_urn:
            # Example logic to handle temporary tables
            new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
            aspect.urn = new_urn
        return aspect
  1. Create the ConfigMap:
    • Use kubectl to create a ConfigMap from the custom_transformer.py file.
kubectl create configmap custom-transformer --from-file=custom_transformer.py -n <namespace>

Step 2: Update the Kubernetes Deployment

  1. Modify the Helm values.yaml File:
    • Update the values.yaml file of your Helm chart to mount the ConfigMap into the ingestion pod.

Here’s an example values.yaml file:

# values.yaml

# Ingestion job configuration
ingestion:
  extraVolumes:
    - name: custom-transformer
      configMap:
        name: custom-transformer
  extraVolumeMounts:
    - name: custom-transformer
      mountPath: /app/custom_transformer

  config:
    # Add any configuration parameters your transformer needs here
  resources:
    limits:
      cpu: "2"
      memory: "4Gi"
    requests:
      cpu: "1"
      memory: "2Gi"

# Other configurations
datahub-gms:
  extraEnv:
    - name: METADATA_SERVICE_AUTH_ENABLED
      value: "true"

datahub-frontend:
  extraEnv:
    - name: METADATA_SERVICE_AUTH_ENABLED
      value: "true"

# Add any other necessary configurations here
  1. Deploy the Helm Chart:
    • Apply the updated Helm chart to your Kubernetes cluster.
helm upgrade --install datahub datahub/datahub -f values.yaml
  1. Verify the Deployment:
    • Ensure that the ingestion pod has the ConfigMap mounted correctly.
kubectl get pods
kubectl describe pod &lt;ingestion-pod-name&gt;
  1. Monitor Logs:
    • Monitor the logs of the ingestion job to ensure that it is running correctly.
kubectl logs -f &lt;ingestion-pod-name&gt;

Step 3: Update the Ingestion Recipe

Ensure the ingestion recipe references the custom transformer. Here’s an example ingestion recipe YAML file:

source:
  type: "dbt"
  config:
    manifest_path: "/path/to/manifest.json"
    catalog_path: "/path/to/catalog.json"
    sources_path: "/path/to/sources.json"

transformers:
  - type: "custom_transformer.HandleTemporaryTables"
    config:
      # Add any configuration parameters your transformer needs here

sink:
  type: "datahub-rest"
  config:
    server: "<http://localhost:8080>"

Running the Ingestion

To run the ingestion with the above recipe, use the following command:

datahub ingest -c /path/to/your/recipe.yaml

Summary

By following these steps, you can use Kubernetes ConfigMaps to store your custom transformer code and mount it into the ingestion pod. This ensures that the custom transformer is accessible to the ingestion process without needing to create a custom Docker image.

Would you like more detailed steps or further assistance on any specific part of this process?

Sources:

<@U06TM7M40RK> what this work on the bigquery source as well as the dbt source