<@U06TM7M40RK> Our tech set up is bigquery as the data warehouse, dbt core as the transformation layer and looker as the visualisation. I have a bigquery source which is ingesting data, how does the table lineage handle incremental logic created by dbt core? in particular, there is an insert_overwrite incremental strategy which creates a tmp table which it deletes e.g. customers__dbt_tmp, how is this factored into the lineage
To address how DataHub handles table lineage for incremental logic created by dbt core, particularly with the insert_overwrite
incremental strategy that creates and deletes temporary tables, here are the key points:
-
Lineage Extraction from dbt:
- DataHub can ingest metadata from dbt, including lineage information. The dbt ingestion source uses artifacts like the
manifest.json
andcatalog.json
files to capture model, source, tests, and lineage data (https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/dbt|source).
- DataHub can ingest metadata from dbt, including lineage information. The dbt ingestion source uses artifacts like the
-
Handling Temporary Tables:
- When dbt uses the
insert_overwrite
strategy, it creates temporary tables (e.g.,customers__dbt_tmp
). These temporary tables are typically not included in the final lineage because they are transient and do not persist beyond the transformation process. - According to <Issues · datahub-project/datahub · GitHub GitHub issue>, lineage for temporary tables created during dbt transformations is not always captured accurately. The lineage might only show the final persistent tables, omitting the temporary tables used in the process.
- When dbt uses the
-
Lineage Visualization:
- DataHub’s lineage visualization will show the flow of data from source tables to the final target tables. However, the intermediate steps involving temporary tables might not be visible unless explicitly configured.
- For more accurate lineage, including temporary tables, you might need to customize the ingestion process or use SQL parsing to capture these intermediate steps (https://datahubproject.io/docs/0.14.0/generated/lineage/lineage-feature-guide|source).
-
Custom Transformers:
- If you need to include temporary tables in the lineage, you can create custom transformers to parse the SQL and generate fine-grained lineage. This involves parsing the SQL statements to identify the creation and deletion of temporary tables and mapping these to the final lineage (https://datahubproject.io/docs/0.14.0/generated/lineage/lineage-feature-guide|source).
Would you like more detailed steps on how to implement custom transformers or configure the ingestion process to capture temporary tables in the lineage?
Sources:
- https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/dbt|https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/dbt
- <Issues · datahub-project/datahub · GitHub GitHub issue>
- https://datahubproject.io/docs/0.14.0/generated/lineage/lineage-feature-guide|https://datahubproject.io/docs/0.14.0/generated/lineage/lineage-feature-guide
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> will the data ingested from dbt get around the temporary table issue?
Ingesting data from dbt into DataHub can help mitigate some of the issues related to temporary tables, but it may not completely resolve them. Here are the key points:
-
dbt Ingestion:
- DataHub’s dbt ingestion source captures metadata from dbt artifacts like
manifest.json
andcatalog.json
. This includes model definitions, sources, tests, and lineage information (https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/dbt|source).
- DataHub’s dbt ingestion source captures metadata from dbt artifacts like
-
Temporary Tables Handling:
- dbt’s
insert_overwrite
strategy creates temporary tables (e.g.,customers__dbt_tmp
) that are used during the transformation process and then deleted. These temporary tables are typically not included in the final lineage captured by dbt. - According to <Issues · datahub-project/datahub · GitHub GitHub issue>, the lineage for temporary tables created during dbt transformations might not be fully captured. The lineage might only show the final persistent tables, omitting the temporary tables used in the process.
- dbt’s
-
Lineage Visualization:
- DataHub’s lineage visualization will show the flow of data from source tables to the final target tables. However, the intermediate steps involving temporary tables might not be visible unless explicitly configured.
- For more accurate lineage, including temporary tables, you might need to customize the ingestion process or use SQL parsing to capture these intermediate steps (https://datahubproject.io/docs/0.14.0/generated/lineage/lineage-feature-guide|source).
-
Custom Transformers:
- If you need to include temporary tables in the lineage, you can create custom transformers to parse the SQL and generate fine-grained lineage. This involves parsing the SQL statements to identify the creation and deletion of temporary tables and mapping these to the final lineage (https://datahubproject.io/docs/0.14.0/generated/lineage/lineage-feature-guide|source).
In summary, while dbt ingestion into DataHub can help capture some lineage information, it may not fully address the temporary table issue. Custom solutions, such as SQL parsing and custom transformers, may be required for more comprehensive lineage capture.
Would you like more detailed steps on how to implement custom transformers or configure the ingestion process to capture temporary tables in the lineage?
Sources:
- <Issues · datahub-project/datahub · GitHub GitHub issue>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> give me an example of a custom transformer and how that works for temporary tables created by dbts logic
To create a custom transformer that handles temporary tables created by dbt’s logic, you need to write a custom transformer in Python. This transformer will parse the SQL statements to identify the creation and deletion of temporary tables and map these to the final lineage.
Here’s a step-by-step example:
-
Set Up Your Environment: Ensure you have a Python environment set up with the necessary dependencies for DataHub.
-
Create the Custom Transformer: Write a custom transformer class that parses the SQL and generates fine-grained lineage.
-
Define the Configuration: Create a configuration class for your transformer.
-
Implement the Transformation Logic: Write the logic to handle temporary tables and generate lineage.
-
Integrate the Transformer into Your Ingestion Recipe: Add the custom transformer to your ingestion recipe.
Step 1: Set Up Your Environment
Ensure you have a Python environment set up with the necessary dependencies for DataHub. You can install the DataHub ingestion package using pip:
pip install acryl-datahub
Step 2: Create the Custom Transformer
Create a Python file, e.g., custom_transformer.py
, and define your custom transformer class:
import re
from typing import List
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass
class HandleTemporaryTablesConfig(ConfigModel):
# Add any configuration parameters your transformer needs here
pass
class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
ctx: PipelineContext
config: HandleTemporaryTablesConfig
def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "HandleTemporaryTables":
config = HandleTemporaryTablesConfig.parse_obj(config_dict)
return cls(config, ctx)
def entity_types(self) -> List[str]:
return ["dataset"]
def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: MetadataChangeEventClass) -> MetadataChangeEventClass:
# Implement your logic to handle temporary tables and generate lineage
if "dbt_tmp" in entity_urn:
# Example logic to handle temporary tables
new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
aspect.urn = new_urn
return aspect
Step 3: Define the Configuration
Define a configuration class for your transformer. This class will hold any configuration parameters your transformer needs.
Step 4: Implement the Transformation Logic
In the transform_aspect
method, implement the logic to handle temporary tables and generate lineage. In this example, we use a regular expression to replace the temporary table suffix in the URN.
Step 5: Integrate the Transformer into Your Ingestion Recipe
Add the custom transformer to your ingestion recipe YAML file:
source:
type: "dbt"
config:
manifest_path: "/path/to/manifest.json"
catalog_path: "/path/to/catalog.json"
sources_path: "/path/to/sources.json"
transformers:
- type: "custom_transformer.HandleTemporaryTables"
config:
# Add any configuration parameters your transformer needs here
sink:
type: "datahub-rest"
config:
server: "<http://localhost:8080>"
Running the Ingestion
To run the ingestion with the above recipe, use the following command:
datahub ingest -c /path/to/your/recipe.yaml
Summary
This example demonstrates how to create a custom transformer to handle temporary tables created by dbt’s logic and generate lineage before they are ingested into DataHub. You can customize the transformation logic as needed to fit your specific requirements.
Would you like more detailed steps or a code example on how to implement this?
Sources:
- https://datahubspace.slack.com/archives/CUMUWQU66/p1720625480.052599|https://datahubspace.slack.com/archives/CUMUWQU66/p1720625480.052599
- https://datahubspace.slack.com/archives/CUMUWQU66/p1717504405.515149|https://datahubspace.slack.com/archives/CUMUWQU66/p1717504405.515149
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> where should the custom_transformer.py be saved? can you expand on step 3 & 4
To ensure your custom transformer is correctly integrated into your DataHub ingestion pipeline, you need to save the custom_transformer.py
file in a location where it can be discovered by the ingestion process. Typically, this would be in the same directory as your ingestion recipe or in a directory that is included in your Python path.
Step 3: Define the Configuration
In this step, you define a configuration class for your transformer. This class will hold any configuration parameters your transformer needs. Here’s an example of how to define the configuration class:
from datahub.configuration.common import ConfigModel
class HandleTemporaryTablesConfig(ConfigModel):
# Add any configuration parameters your transformer needs here
pass
This class inherits from ConfigModel
, which is part of the DataHub configuration framework. You can add any parameters that your transformer might need. For example, if you need to pass specific patterns or rules for handling temporary tables, you can define them here.
Step 4: Implement the Transformation Logic
In this step, you implement the logic to handle temporary tables and generate lineage. The transform_aspect
method is where you write the core logic of your transformer. Here’s an expanded example:
import re
from typing import List, Optional
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass
class HandleTemporaryTablesConfig(ConfigModel):
# Add any configuration parameters your transformer needs here
pass
class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
ctx: PipelineContext
config: HandleTemporaryTablesConfig
def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "HandleTemporaryTables":
config = HandleTemporaryTablesConfig.parse_obj(config_dict)
return cls(config, ctx)
def entity_types(self) -> List[str]:
return ["dataset"]
def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[MetadataChangeEventClass]) -> Optional[MetadataChangeEventClass]:
# Implement your logic to handle temporary tables and generate lineage
if "dbt_tmp" in entity_urn:
# Example logic to handle temporary tables
new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
aspect.urn = new_urn
return aspect
Detailed Explanation of transform_aspect
Method
-
Check for Temporary Tables:
- The method checks if the
entity_urn
contains the substringdbt_tmp
, which indicates a temporary table created by dbt.
- The method checks if the
-
Modify the URN:
- If a temporary table is detected, the method uses a regular expression to remove the
__dbt_tmp
suffix from theentity_urn
.
- If a temporary table is detected, the method uses a regular expression to remove the
-
Return the Modified Aspect:
- The modified URN is then set in the
aspect
object, and the method returns the updated aspect.
- The modified URN is then set in the
Saving the custom_transformer.py
File
Save the custom_transformer.py
file in the same directory as your ingestion recipe or in a directory that is included in your Python path. For example, if your ingestion recipe is located at /path/to/your/recipe.yaml
, you can save the custom_transformer.py
file in the same directory:
/path/to/your/recipe.yaml
/path/to/your/custom_transformer.py
Integrating the Transformer into Your Ingestion Recipe
Add the custom transformer to your ingestion recipe YAML file:
source:
type: "dbt"
config:
manifest_path: "/path/to/manifest.json"
catalog_path: "/path/to/catalog.json"
sources_path: "/path/to/sources.json"
transformers:
- type: "custom_transformer.HandleTemporaryTables"
config:
# Add any configuration parameters your transformer needs here
sink:
type: "datahub-rest"
config:
server: "<http://localhost:8080>"
Running the Ingestion
To run the ingestion with the above recipe, use the following command:
datahub ingest -c /path/to/your/recipe.yaml
This setup ensures that your custom transformer is correctly integrated into the DataHub ingestion pipeline and can handle temporary tables created by dbt’s logic.
Would you like more detailed steps or further assistance on any specific part of this process?
Sources:
- https://datahubproject.io/docs/0.14.0/actions/guides/developing-a-transformer|https://datahubproject.io/docs/0.14.0/actions/guides/developing-a-transformer
- https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/dataset_transformer.md|https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/transformer/dataset_transformer.md
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> if i am doing UI based ingestion, how can i save the custom_transformer.py
file
Currently, DataHub’s UI-based ingestion does not support the direct use of custom transformers. Custom transformers need to be installed in the Python environment where the ingestion process runs, which is typically done via the CLI. Here are the key points and a workaround:
Key Points
-
UI-Based Ingestion Limitation:
- As per the <Ingestion | DataHub documentation> and <Slack discussions>, it is not currently possible to install custom transformers directly in the action pod where UI ingestion runs.
-
Workaround:
- You can use the CLI for ingestion, which allows you to include custom transformers. This involves setting up your Python environment, writing the custom transformer, and running the ingestion process via the CLI.
Workaround: Using CLI for Ingestion
Here’s how you can set up and use a custom transformer with the CLI:
-
Set Up Your Environment:
- Ensure you have a Python environment set up with the necessary dependencies for DataHub.
-
Create the Custom Transformer:
- Write your custom transformer in a Python file, e.g.,
custom_transformer.py
.
- Write your custom transformer in a Python file, e.g.,
-
Save the Custom Transformer:
- Save the
custom_transformer.py
file in the same directory as your ingestion recipe or in a directory that is included in your Python path.
- Save the
-
Ingestion Recipe:
- Create an ingestion recipe YAML file that includes the custom transformer.
-
Run the Ingestion:
- Use the DataHub CLI to run the ingestion process.
Example Steps
Step 1: Set Up Your Environment
Install the DataHub ingestion package using pip:
pip install acryl-datahub
Step 2: Create the Custom Transformer
Create a Python file, e.g., custom_transformer.py
, with the following content:
import re
from typing import List, Optional
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass
class HandleTemporaryTablesConfig(ConfigModel):
# Add any configuration parameters your transformer needs here
pass
class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
ctx: PipelineContext
config: HandleTemporaryTablesConfig
def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "HandleTemporaryTables":
config = HandleTemporaryTablesConfig.parse_obj(config_dict)
return cls(config, ctx)
def entity_types(self) -> List[str]:
return ["dataset"]
def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[MetadataChangeEventClass]) -> Optional[MetadataChangeEventClass]:
# Implement your logic to handle temporary tables and generate lineage
if "dbt_tmp" in entity_urn:
# Example logic to handle temporary tables
new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
aspect.urn = new_urn
return aspect
Step 3: Save the Custom Transformer
Save the custom_transformer.py
file in the same directory as your ingestion recipe:
/path/to/your/recipe.yaml
/path/to/your/custom_transformer.py
Step 4: Ingestion Recipe
Create an ingestion recipe YAML file, e.g., recipe.yaml
, with the following content:
source:
type: "dbt"
config:
manifest_path: "/path/to/manifest.json"
catalog_path: "/path/to/catalog.json"
sources_path: "/path/to/sources.json"
transformers:
- type: "custom_transformer.HandleTemporaryTables"
config:
# Add any configuration parameters your transformer needs here
sink:
type: "datahub-rest"
config:
server: "<http://localhost:8080>"
Step 5: Run the Ingestion
Use the DataHub CLI to run the ingestion process:
datahub ingest -c /path/to/your/recipe.yaml
Summary
While UI-based ingestion does not currently support custom transformers, you can achieve the desired functionality using the CLI. This involves setting up your environment, writing the custom transformer, and running the ingestion process via the CLI.
Would you like more detailed steps or further assistance on any specific part of this process?
Sources:
- https://datahubspace.slack.com/archives/CUMUWQU66/p1692688994.436319|https://datahubspace.slack.com/archives/CUMUWQU66/p1692688994.436319
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> how do i save the custom_transformer.py to the same place if im running datahub in a kubernetes cluster
To use a custom transformer in a Kubernetes cluster where DataHub is deployed, you need to ensure that the custom transformer code is accessible to the ingestion process. Here are the steps to achieve this:
- Create a Custom Docker Image: Build a Docker image that includes your custom transformer.
- Push the Docker Image to a Registry: Push the custom Docker image to a container registry.
- Update the Kubernetes Deployment: Modify your Kubernetes deployment to use the custom Docker image.
Step-by-Step Guide
Step 1: Create a Custom Docker Image
-
Create the Custom Transformer:
- Write your custom transformer in a Python file, e.g.,
custom_transformer.py
.
- Write your custom transformer in a Python file, e.g.,
-
Create a Dockerfile:
- Create a Dockerfile that includes the custom transformer and the necessary dependencies.
Here’s an example Dockerfile:
FROM python:3.10-slim
WORKDIR /app
# Copy the custom transformer code
COPY custom_transformer.py /app/custom_transformer.py
# Install DataHub ingestion package
RUN pip install acryl-datahub
# Install any other dependencies your transformer might need
# RUN pip install <other-dependencies>
ENTRYPOINT ["datahub"]
- Build the Docker Image:
- Build the Docker image using the Dockerfile.
docker build -t your-docker-repo/datahub-ingestion-custom:latest .
Step 2: Push the Docker Image to a Registry
- Push the Docker Image:
- Push the custom Docker image to a container registry (e.g., Docker Hub, Google Container Registry, Amazon ECR).
docker push your-docker-repo/datahub-ingestion-custom:latest
Step 3: Update the Kubernetes Deployment
- Modify the Helm
values.yaml
File:- Update the
values.yaml
file of your Helm chart to use the custom Docker image for the ingestion job.
- Update the
Here’s an example values.yaml
file:
# values.yaml
# Ingestion job configuration
ingestion:
image:
repository: your-docker-repo/datahub-ingestion-custom
tag: latest
config:
# Add any configuration parameters your transformer needs here
resources:
limits:
cpu: "2"
memory: "4Gi"
requests:
cpu: "1"
memory: "2Gi"
# Other configurations
datahub-gms:
extraEnv:
- name: METADATA_SERVICE_AUTH_ENABLED
value: "true"
datahub-frontend:
extraEnv:
- name: METADATA_SERVICE_AUTH_ENABLED
value: "true"
# Add any other necessary configurations here
- Deploy the Helm Chart:
- Apply the updated Helm chart to your Kubernetes cluster.
helm upgrade --install datahub datahub/datahub -f values.yaml
- Verify the Deployment:
- Ensure that the ingestion job is using the custom Docker image and that it has the necessary resources allocated.
kubectl get pods
kubectl describe pod <ingestion-pod-name>
- Monitor Logs:
- Monitor the logs of the ingestion job to ensure that it is running correctly.
kubectl logs -f <ingestion-pod-name>
Example Custom Transformer Code
Here’s an example of the custom_transformer.py
file:
import re
from typing import List, Optional
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass
class HandleTemporaryTablesConfig(ConfigModel):
# Add any configuration parameters your transformer needs here
pass
class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
ctx: PipelineContext
config: HandleTemporaryTablesConfig
def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "HandleTemporaryTables":
config = HandleTemporaryTablesConfig.parse_obj(config_dict)
return cls(config, ctx)
def entity_types(self) -> List[str]:
return ["dataset"]
def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[MetadataChangeEventClass]) -> Optional[MetadataChangeEventClass]:
# Implement your logic to handle temporary tables and generate lineage
if "dbt_tmp" in entity_urn:
# Example logic to handle temporary tables
new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
aspect.urn = new_urn
return aspect
Summary
By following these steps, you can create a custom Docker image that includes your custom transformer and deploy it in your Kubernetes cluster. This ensures that the custom transformer is accessible to the ingestion process.
Would you like more detailed steps or further assistance on any specific part of this process?
Sources:
- https://datahubspace.slack.com/archives/CV2UVAPPG/p1720770485.192089|https://datahubspace.slack.com/archives/CV2UVAPPG/p1720770485.192089
- https://datahubspace.slack.com/archives/CUMUWQU66/p1720625480.052599|https://datahubspace.slack.com/archives/CUMUWQU66/p1720625480.052599
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> is there any way to do it other than a docker image
If you prefer not to create a custom Docker image, you can use Kubernetes ConfigMaps to store your custom transformer code and mount it into the ingestion pod. Here’s how you can achieve this:
Step-by-Step Guide
- Create a ConfigMap: Store the custom transformer code in a ConfigMap.
- Update the Kubernetes Deployment: Modify your Kubernetes deployment to mount the ConfigMap into the ingestion pod.
- Update the Ingestion Recipe: Ensure the ingestion recipe references the custom transformer.
Step 1: Create a ConfigMap
- Write the Custom Transformer Code:
- Create a Python file, e.g.,
custom_transformer.py
, with your custom transformer code.
- Create a Python file, e.g.,
# custom_transformer.py
import re
from typing import List, Optional
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer.base_transformer import BaseTransformer, SingleAspectTransformer
from datahub.metadata.schema_classes import MetadataChangeEventClass
class HandleTemporaryTablesConfig(ConfigModel):
# Add any configuration parameters your transformer needs here
pass
class HandleTemporaryTables(BaseTransformer, SingleAspectTransformer):
ctx: PipelineContext
config: HandleTemporaryTablesConfig
def __init__(self, config: HandleTemporaryTablesConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "HandleTemporaryTables":
config = HandleTemporaryTablesConfig.parse_obj(config_dict)
return cls(config, ctx)
def entity_types(self) -> List[str]:
return ["dataset"]
def transform_aspect(self, entity_urn: str, aspect_name: str, aspect: Optional[MetadataChangeEventClass]) -> Optional[MetadataChangeEventClass]:
# Implement your logic to handle temporary tables and generate lineage
if "dbt_tmp" in entity_urn:
# Example logic to handle temporary tables
new_urn = re.sub(r'__dbt_tmp', '', entity_urn)
aspect.urn = new_urn
return aspect
- Create the ConfigMap:
- Use
kubectl
to create a ConfigMap from thecustom_transformer.py
file.
- Use
kubectl create configmap custom-transformer --from-file=custom_transformer.py -n <namespace>
Step 2: Update the Kubernetes Deployment
- Modify the Helm
values.yaml
File:- Update the
values.yaml
file of your Helm chart to mount the ConfigMap into the ingestion pod.
- Update the
Here’s an example values.yaml
file:
# values.yaml
# Ingestion job configuration
ingestion:
extraVolumes:
- name: custom-transformer
configMap:
name: custom-transformer
extraVolumeMounts:
- name: custom-transformer
mountPath: /app/custom_transformer
config:
# Add any configuration parameters your transformer needs here
resources:
limits:
cpu: "2"
memory: "4Gi"
requests:
cpu: "1"
memory: "2Gi"
# Other configurations
datahub-gms:
extraEnv:
- name: METADATA_SERVICE_AUTH_ENABLED
value: "true"
datahub-frontend:
extraEnv:
- name: METADATA_SERVICE_AUTH_ENABLED
value: "true"
# Add any other necessary configurations here
- Deploy the Helm Chart:
- Apply the updated Helm chart to your Kubernetes cluster.
helm upgrade --install datahub datahub/datahub -f values.yaml
- Verify the Deployment:
- Ensure that the ingestion pod has the ConfigMap mounted correctly.
kubectl get pods
kubectl describe pod <ingestion-pod-name>
- Monitor Logs:
- Monitor the logs of the ingestion job to ensure that it is running correctly.
kubectl logs -f <ingestion-pod-name>
Step 3: Update the Ingestion Recipe
Ensure the ingestion recipe references the custom transformer. Here’s an example ingestion recipe YAML file:
source:
type: "dbt"
config:
manifest_path: "/path/to/manifest.json"
catalog_path: "/path/to/catalog.json"
sources_path: "/path/to/sources.json"
transformers:
- type: "custom_transformer.HandleTemporaryTables"
config:
# Add any configuration parameters your transformer needs here
sink:
type: "datahub-rest"
config:
server: "<http://localhost:8080>"
Running the Ingestion
To run the ingestion with the above recipe, use the following command:
datahub ingest -c /path/to/your/recipe.yaml
Summary
By following these steps, you can use Kubernetes ConfigMaps to store your custom transformer code and mount it into the ingestion pod. This ensures that the custom transformer is accessible to the ingestion process without needing to create a custom Docker image.
Would you like more detailed steps or further assistance on any specific part of this process?
Sources:
- https://datahubspace.slack.com/archives/CUMUWQU66/p1720625480.052599|https://datahubspace.slack.com/archives/CUMUWQU66/p1720625480.052599
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> what this work on the bigquery source as well as the dbt source