<@U06TM7M40RK> datahub 0.14.0.2를 사용하고 있는데, ingestion할 때, source type을 추가하고 싶어, 그리고 sink type도 추가하고 싶어
To add a custom source type and sink type for ingestion in DataHub version 0.14.0.2, you can follow these steps:
Adding a Custom Source Type
-
Build a Custom Source:
- Create a Python package that includes your custom source class. This involves defining a new source class that extends the DataHub ingestion framework.
- Refer to the <https://github.com/datahub-project/datahub/blob/master/docs/how/add-custom-ingestion-source.md|metadata-ingestion source guide> for detailed instructions on building a custom source.
-
Install the Custom Source:
- Build and install the custom source package in your working environment where you are using the DataHub CLI.
- Example project structure:
<project>/src/my-source/custom_ingestion_source.py
with the custom source class namedMySourceClass
.
-
Reference the Custom Source in YAML Recipe:
- Use the fully qualified package name in your YAML recipe to reference the custom source class.
- Example YAML configuration:
source: type: my-source.custom_ingestion_source.MySourceClass config: # place for your custom config defined in the configModel
Adding a Custom Sink Type
-
Build a Custom Sink:
- Similar to the custom source, create a Python package that includes your custom sink class.
- Define a new sink class that extends the DataHub ingestion framework.
-
Install the Custom Sink:
- Build and install the custom sink package in your working environment.
-
Reference the Custom Sink in YAML Recipe:
- Use the fully qualified package name in your YAML recipe to reference the custom sink class.
- Example YAML configuration:
sink: type: my-sink.custom_sink.MySinkClass config: # place for your custom config defined in the configModel
Example Code for Custom Source and Sink
Here is a basic example of how you might set up and run the ingestion pipeline programmatically with custom source and sink:
from datahub.ingestion.run.pipeline import Pipeline
# Define your ingestion configuration
pipeline_config = {
"source": {
"type": "my-source.custom_ingestion_source.MySourceClass",
"config": {
# Custom source configuration
}
},
"sink": {
"type": "my-sink.custom_sink.MySinkClass",
"config": {
# Custom sink configuration
}
}
}
# Create and run the ingestion pipeline
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.raise_from_status()
Would you like more detail on any specific part of this configuration or the programmatic approach?
Sources:
- <https://github.com/datahub-project/datahub/blob/master/docs/how/add-custom-ingestion-source.md|Using a Custom Ingestion Source>
- <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/library/programatic_pipeline.py|Programmatic Pipeline Example> 0 button 0 button Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> 위에서 source guide는 있는데, sink guide는 없네.
You’re right; the provided sources include a guide for adding a custom ingestion source but not explicitly for a custom sink. However, the process for adding a custom sink is quite similar to adding a custom source. Here’s a detailed guide on how to create and use a custom sink in DataHub:
Steps to Create a Custom Sink
-
Create a Custom Sink Class:
- Define a new Python class for your custom sink. This class should extend the
Sink
class from the DataHub ingestion framework. - Implement the required methods, such as
__init__
,close
, andwrite_record
.
- Define a new Python class for your custom sink. This class should extend the
-
Example Custom Sink Class:
from datahub.ingestion.api.sink import Sink from datahub.metadata.schema_classes import MetadataChangeEventClass class MyCustomSink(Sink): def __init__(self, config, ctx): super().__init__(config, ctx) # Initialize your custom sink here @classmethod def create(cls, config_dict, ctx): config = MyCustomSinkConfig.parse_obj(config_dict) return cls(config, ctx) def write_record(self, record: MetadataChangeEventClass): # Implement how to handle the record pass def close(self): # Clean up resources if needed pass
-
Define Configuration for the Custom Sink:
- Create a configuration class for your custom sink.
from pydantic import BaseModel class MyCustomSinkConfig(BaseModel): # Define your custom configuration fields here pass
-
Install the Custom Sink:
- Package your custom sink and install it in your working environment.
- Ensure that your custom sink class is accessible in the Python path.
-
Reference the Custom Sink in YAML Recipe:
- Use the fully qualified package name in your YAML recipe to reference the custom sink class.
sink: type: my_custom_sink.MyCustomSink config: # Custom sink configuration
Example YAML Configuration
source:
type: mysql
config:
username: root
password: password
host_port: localhost:3306
sink:
type: my_custom_sink.MyCustomSink
config:
# Custom sink configuration
Running the Ingestion Pipeline
You can run the ingestion pipeline using the DataHub CLI or programmatically. Here’s an example of running it programmatically:
from datahub.ingestion.run.pipeline import Pipeline
pipeline_config = {
"source": {
"type": "mysql",
"config": {
"username": "root",
"password": "password",
"host_port": "localhost:3306"
}
},
"sink": {
"type": "my_custom_sink.MyCustomSink",
"config": {
# Custom sink configuration
}
}
}
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.raise_from_status()
Additional Resources
For more details on creating custom sinks, you can refer to the discussions and examples in the DataHub community and documentation:
- <Slack Slack Community>
- <Issues · datahub-project/datahub · GitHub GitHub Issues>
Would you like more detailed steps on any specific part of this process?
Sources:
- <Issues · datahub-project/datahub · GitHub GitHub Issues>
- <Slack Slack Community> 0 button 0 button Hint: Mention @runllm in the thread for followups.