Adding Custom Source and Sink Types in DataHub Version 0.14.0.2

Original Slack Thread

<@U06TM7M40RK> datahub 0.14.0.2를 사용하고 있는데, ingestion할 때, source type을 추가하고 싶어, 그리고 sink type도 추가하고 싶어

To add a custom source type and sink type for ingestion in DataHub version 0.14.0.2, you can follow these steps:

Adding a Custom Source Type

  1. Build a Custom Source:

  2. Install the Custom Source:

    • Build and install the custom source package in your working environment where you are using the DataHub CLI.
    • Example project structure: <project>/src/my-source/custom_ingestion_source.py with the custom source class named MySourceClass.
  3. Reference the Custom Source in YAML Recipe:

    • Use the fully qualified package name in your YAML recipe to reference the custom source class.
    • Example YAML configuration:
      source:
        type: my-source.custom_ingestion_source.MySourceClass
        config:
          # place for your custom config defined in the configModel
      

Adding a Custom Sink Type

  1. Build a Custom Sink:

    • Similar to the custom source, create a Python package that includes your custom sink class.
    • Define a new sink class that extends the DataHub ingestion framework.
  2. Install the Custom Sink:

    • Build and install the custom sink package in your working environment.
  3. Reference the Custom Sink in YAML Recipe:

    • Use the fully qualified package name in your YAML recipe to reference the custom sink class.
    • Example YAML configuration:
      sink:
        type: my-sink.custom_sink.MySinkClass
        config:
          # place for your custom config defined in the configModel
      

Example Code for Custom Source and Sink

Here is a basic example of how you might set up and run the ingestion pipeline programmatically with custom source and sink:

from datahub.ingestion.run.pipeline import Pipeline

# Define your ingestion configuration
pipeline_config = {
    "source": {
        "type": "my-source.custom_ingestion_source.MySourceClass",
        "config": {
            # Custom source configuration
        }
    },
    "sink": {
        "type": "my-sink.custom_sink.MySinkClass",
        "config": {
            # Custom sink configuration
        }
    }
}

# Create and run the ingestion pipeline
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.raise_from_status()

Would you like more detail on any specific part of this configuration or the programmatic approach?

Sources:

<@U06TM7M40RK> 위에서 source guide는 있는데, sink guide는 없네.

You’re right; the provided sources include a guide for adding a custom ingestion source but not explicitly for a custom sink. However, the process for adding a custom sink is quite similar to adding a custom source. Here’s a detailed guide on how to create and use a custom sink in DataHub:

Steps to Create a Custom Sink

  1. Create a Custom Sink Class:

    • Define a new Python class for your custom sink. This class should extend the Sink class from the DataHub ingestion framework.
    • Implement the required methods, such as __init__, close, and write_record.
  2. Example Custom Sink Class:

    from datahub.ingestion.api.sink import Sink
    from datahub.metadata.schema_classes import MetadataChangeEventClass
    
    class MyCustomSink(Sink):
        def __init__(self, config, ctx):
            super().__init__(config, ctx)
            # Initialize your custom sink here
    
        @classmethod
        def create(cls, config_dict, ctx):
            config = MyCustomSinkConfig.parse_obj(config_dict)
            return cls(config, ctx)
    
        def write_record(self, record: MetadataChangeEventClass):
            # Implement how to handle the record
            pass
    
        def close(self):
            # Clean up resources if needed
            pass
    
  3. Define Configuration for the Custom Sink:

    • Create a configuration class for your custom sink.
    from pydantic import BaseModel
    
    class MyCustomSinkConfig(BaseModel):
        # Define your custom configuration fields here
        pass
    
  4. Install the Custom Sink:

    • Package your custom sink and install it in your working environment.
    • Ensure that your custom sink class is accessible in the Python path.
  5. Reference the Custom Sink in YAML Recipe:

    • Use the fully qualified package name in your YAML recipe to reference the custom sink class.
    sink:
      type: my_custom_sink.MyCustomSink
      config:
        # Custom sink configuration
    

Example YAML Configuration

source:
  type: mysql
  config:
    username: root
    password: password
    host_port: localhost:3306

sink:
  type: my_custom_sink.MyCustomSink
  config:
    # Custom sink configuration

Running the Ingestion Pipeline

You can run the ingestion pipeline using the DataHub CLI or programmatically. Here’s an example of running it programmatically:

from datahub.ingestion.run.pipeline import Pipeline

pipeline_config = {
    "source": {
        "type": "mysql",
        "config": {
            "username": "root",
            "password": "password",
            "host_port": "localhost:3306"
        }
    },
    "sink": {
        "type": "my_custom_sink.MyCustomSink",
        "config": {
            # Custom sink configuration
        }
    }
}

pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.raise_from_status()

Additional Resources

For more details on creating custom sinks, you can refer to the discussions and examples in the DataHub community and documentation:

Would you like more detailed steps on any specific part of this process?

Sources: