Accessing `datahub_api` Configuration Settings in a Custom Source Module with DataHub Version 0.14.0.4

Original Slack Thread

<@U06TM7M40RK> datahub -0.14.0.4를 사용하고 있는데,

    "server": DATAHUB_URL,
    "timeout_sec": 60
}```
custom source모듈에서 위 config설정을 읽을려면?

To read the datahub_api configuration settings in your custom source module while using DataHub version 0.14.0.4, you need to access the configuration parameters from the config object passed to your custom source class. Here’s a step-by-step guide on how to achieve this:

  1. Define the Configuration Class: Ensure that your custom source configuration class includes fields for datahub_api.

  2. Access Configuration in Custom Source: In your custom source class, access the datahub_api settings from the configuration object.

Here is an example of how you can structure your custom source module to read the datahub_api configuration settings:

Step 1: Define the Configuration Class

Define a configuration class that includes the datahub_api settings.

from pydantic import BaseModel

class DatahubApiConfig(BaseModel):
    server: str
    timeout_sec: int

class CustomSqlQueriesSourceConfig(BaseModel):
    datahub_api: DatahubApiConfig
    # Add other configuration fields as needed
    query_file: str
    platform: str
    platform_instance: str
    default_db: str
    default_schema: str
    env: str
    usage: dict

Step 2: Access Configuration in Custom Source

In your custom source class, access the datahub_api settings from the configuration object.

from datahub.ingestion.api.source import Source
from datahub.ingestion.api.common import PipelineContext
from typing import Dict, Any

class CustomSqlQueriesSource(Source):
    def __init__(self, config: CustomSqlQueriesSourceConfig, ctx: PipelineContext):
        super().__init__(ctx)
        self.config = config
        self.datahub_api_config = config.datahub_api

    def get_workunits(self) -> Iterable[MetadataWorkUnit]:
        # Access the datahub_api settings
        server = self.datahub_api_config.server
        timeout_sec = self.datahub_api_config.timeout_sec

        # Your custom logic here
        # ...

        yield from []

    @classmethod
    def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "CustomSqlQueriesSource":
        config = CustomSqlQueriesSourceConfig.parse_obj(config_dict)
        return cls(config, ctx)

Example YAML Configuration

Ensure your YAML configuration file includes the datahub_api settings.

datahub_api:
  server: "http://localhost:8080"
  timeout_sec: 60

source:
  type: custom-sql-queries
  config:
    datahub_api:
      server: "http://localhost:8080"
      timeout_sec: 60
    query_file: "/mnt/d/zeta/ingest/queries.json"
    platform: "postgres"
    platform_instance: "zeta"
    default_db: "postgres"
    default_schema: "dlusr"
    env: "PROD"
    usage:
      format_sql_queries: "True"

sink:
  type: file
  config:
    filename: "/mnt/d/zeta/logs/custom_queries_ingestion.log"

Register the Custom Source

Ensure that your custom source is registered correctly.

from datahub.ingestion.source.source_registry import source_registry

# Register the custom source
source_registry.register("custom-sql-queries", CustomSqlQueriesSource)

Would you like more detailed steps or examples on any specific part of this process?

Sources: