<@U06TM7M40RK> datahub -0.14.0.4를 사용하고 있는데,
"server": DATAHUB_URL,
"timeout_sec": 60
}```
custom source모듈에서 위 config설정을 읽을려면?
<@U06TM7M40RK> datahub -0.14.0.4를 사용하고 있는데,
"server": DATAHUB_URL,
"timeout_sec": 60
}```
custom source모듈에서 위 config설정을 읽을려면?
To read the datahub_api
configuration settings in your custom source module while using DataHub version 0.14.0.4, you need to access the configuration parameters from the config
object passed to your custom source class. Here’s a step-by-step guide on how to achieve this:
Define the Configuration Class: Ensure that your custom source configuration class includes fields for datahub_api
.
Access Configuration in Custom Source: In your custom source class, access the datahub_api
settings from the configuration object.
Here is an example of how you can structure your custom source module to read the datahub_api
configuration settings:
Step 1: Define the Configuration Class
Define a configuration class that includes the datahub_api
settings.
from pydantic import BaseModel
class DatahubApiConfig(BaseModel):
server: str
timeout_sec: int
class CustomSqlQueriesSourceConfig(BaseModel):
datahub_api: DatahubApiConfig
# Add other configuration fields as needed
query_file: str
platform: str
platform_instance: str
default_db: str
default_schema: str
env: str
usage: dict
Step 2: Access Configuration in Custom Source
In your custom source class, access the datahub_api
settings from the configuration object.
from datahub.ingestion.api.source import Source
from datahub.ingestion.api.common import PipelineContext
from typing import Dict, Any
class CustomSqlQueriesSource(Source):
def __init__(self, config: CustomSqlQueriesSourceConfig, ctx: PipelineContext):
super().__init__(ctx)
self.config = config
self.datahub_api_config = config.datahub_api
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Access the datahub_api settings
server = self.datahub_api_config.server
timeout_sec = self.datahub_api_config.timeout_sec
# Your custom logic here
# ...
yield from []
@classmethod
def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "CustomSqlQueriesSource":
config = CustomSqlQueriesSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
Example YAML Configuration
Ensure your YAML configuration file includes the datahub_api
settings.
datahub_api:
server: "http://localhost:8080"
timeout_sec: 60
source:
type: custom-sql-queries
config:
datahub_api:
server: "http://localhost:8080"
timeout_sec: 60
query_file: "/mnt/d/zeta/ingest/queries.json"
platform: "postgres"
platform_instance: "zeta"
default_db: "postgres"
default_schema: "dlusr"
env: "PROD"
usage:
format_sql_queries: "True"
sink:
type: file
config:
filename: "/mnt/d/zeta/logs/custom_queries_ingestion.log"
Register the Custom Source
Ensure that your custom source is registered correctly.
from datahub.ingestion.source.source_registry import source_registry
# Register the custom source
source_registry.register("custom-sql-queries", CustomSqlQueriesSource)
Would you like more detailed steps or examples on any specific part of this process?
Sources: