Integrating Kafka and MSK Platforms with Kafka Connect

Original Slack Thread

<@U06TM7M40RK> integrating kafka and msk platform instances with kafka-connect

To integrate Kafka and AWS Managed Streaming for Apache Kafka (MSK) with Kafka Connect, you need to configure Kafka Connect to use the MSK cluster as its Kafka broker. Here are the steps to achieve this:

  1. Set Up AWS MSK Cluster: Ensure you have an AWS MSK cluster set up and running. Note down the bootstrap servers and other configuration details.

  2. Configure Kafka Connect: Update the Kafka Connect configuration to use the AWS MSK cluster.

  3. Deploy Kafka Connect: Deploy Kafka Connect with the updated configuration.

Detailed Steps

Step 1: Set Up AWS MSK Cluster

Ensure your AWS MSK cluster is set up and running. You should have the bootstrap server endpoints available. For example:

b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092,b-3.msk-cluster.amazonaws.com:9092

Step 2: Configure Kafka Connect

Update the Kafka Connect configuration to use the AWS MSK cluster. This involves setting the appropriate environment variables and configuration settings.

Example Configuration for Kafka Connect

  1. Create a Kafka Connect Properties File: Create a properties file (e.g., connect-distributed.properties) with the following content:
bootstrap.servers=b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092,b-3.msk-cluster.amazonaws.com:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  1. Set Environment Variables: Ensure the necessary environment variables are set for Kafka Connect to use AWS MSK IAM authentication.
export KAFKA_BOOTSTRAP_SERVERS=b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092,b-3.msk-cluster.amazonaws.com:9092
export KAFKA_SECURITY_PROTOCOL=SASL_SSL
export KAFKA_SASL_MECHANISM=AWS_MSK_IAM
export KAFKA_SASL_JAAS_CONFIG="software.amazon.msk.auth.iam.IAMLoginModule required;"
export KAFKA_SASL_CLIENT_CALLBACK_HANDLER_CLASS=software.amazon.msk.auth.iam.IAMClientCallbackHandler

Step 3: Deploy Kafka Connect

Deploy Kafka Connect with the updated configuration. This can be done using Docker, Kubernetes, or any other deployment method you prefer.

Example Docker Compose Configuration

If you are using Docker Compose, you can create a docker-compose.yml file with the following content:

version: '2'
services:
  kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    environment:
      CONNECT_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS}
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect-cluster"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_SECURITY_PROTOCOL: ${KAFKA_SECURITY_PROTOCOL}
      CONNECT_SASL_MECHANISM: ${KAFKA_SASL_MECHANISM}
      CONNECT_SASL_JAAS_CONFIG: ${KAFKA_SASL_JAAS_CONFIG}
      CONNECT_SASL_CLIENT_CALLBACK_HANDLER_CLASS: ${KAFKA_SASL_CLIENT_CALLBACK_HANDLER_CLASS}
    ports:
      - 8083:8083

Run the Docker Compose setup:

docker-compose up -d

Additional Resources

For more detailed information, you can refer to the following resources:

Would you like more detailed steps or further assistance on a specific part of the process?

Sources:

multiple platform instance mapping <@U06TM7M40RK>

To handle multiple platform instances in DataHub, especially when using Kafka Connect, you need to configure the platform_instance_map and connect_to_platform_map in your Kafka Connect recipe. This allows you to specify different platform instances for different connectors or platforms.

Configuration Steps

  1. Define Platform Instances: Ensure that each platform instance is defined in your ingestion recipes.
  2. Configure Kafka Connect Recipe: Use platform_instance_map and connect_to_platform_map to map platform instances to connectors.

Example Configuration

Step 1: Define Platform Instances

In your ingestion recipes, define the platform instances for each source or sink system. For example, for MySQL and Snowflake:

MySQL Ingestion Recipe:

source:
  type: mysql
  config:
    host_port: localhost:3306
    platform_instance: mysql_instance1
    database: dbname
    username: root
    password: example

Snowflake Ingestion Recipe:

source:
  type: snowflake
  config:
    host_port: your_snowflake_host
    database: your_database
    username: your_username
    password: your_password
    platform_instance: snowflake_instance1
    env: PROD

Step 2: Configure Kafka Connect Recipe

In your Kafka Connect recipe, use platform_instance_map and connect_to_platform_map to specify the platform instances for each connector.

Kafka Connect Recipe:

source:
  type: kafka-connect
  config:
    bootstrap_servers: "b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092,b-3.msk-cluster.amazonaws.com:9092"
    platform_instance_map:
      snowflake: snowflake_instance1
      mysql: mysql_instance1
    connect_to_platform_map:
      mysql_connector1:
        mysql: mysql_instance1
      mysql_connector2:
        mysql: mysql_instance2
      snowflake_connector:
        snowflake: snowflake_instance1

Explanation

  • platform_instance_map: This is used to specify a default platform instance for each platform. If a single instance of a platform is used across all connectors, you can use this map.
  • connect_to_platform_map: This is used to specify platform instances for individual connectors. This takes precedence over platform_instance_map if both are specified.

Example with Multiple MySQL Connectors

If you have multiple MySQL connectors, each reading from different MySQL instances, you can configure them as follows:

source:
  type: kafka-connect
  config:
    bootstrap_servers: "b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092,b-3.msk-cluster.amazonaws.com:9092"
    connect_to_platform_map:
      mysql_connector1:
        mysql: mysql_instance1
      mysql_connector2:
        mysql: mysql_instance2
      kafka_connector1:
        kafka: kafka_instance1
      kafka_connector2:
        kafka: kafka_instance2

Additional Resources

For more detailed information, you can refer to the following resources:

Would you like more detailed steps or further assistance on a specific part of the process?

Sources:

<@U06TM7M40RK> differentiate platform_instance, platform_instance_map, connect_to_platforn_map in kafka connect ingestion

To differentiate between platform_instance, platform_instance_map, and connect_to_platform_map in Kafka Connect ingestion, let’s break down each term and its usage:

1. platform_instance

  • Definition: This specifies the instance of the platform that all assets produced by this recipe belong to.
  • Usage: It is used when you have a single instance of a platform and want to apply it globally across all connectors in your Kafka Connect ingestion recipe.
  • Example:
    source:
      type: kafka-connect
      config:
        platform_instance: my_kafka_instance
    

2. platform_instance_map

  • Definition: This is a mapping of platform names to platform instances. It is used when you have multiple platforms and want to specify a default instance for each platform.
  • Usage: It is useful when you have a single instance of each platform but multiple platforms in your Kafka Connect setup.
  • Example:
    source:
      type: kafka-connect
      config:
        platform_instance_map:
          snowflake: snowflake_instance1
          mysql: mysql_instance1
    

3. connect_to_platform_map

  • Definition: This is a mapping of connector names to platform instances. It is used when you have multiple instances of a platform and need to specify which instance to use for each connector.
  • Usage: It is useful when you have multiple instances of the same platform and need to differentiate between them for different connectors.
  • Example:
    source:
      type: kafka-connect
      config:
        connect_to_platform_map:
          mysql_connector1:
            mysql: mysql_instance1
          mysql_connector2:
            mysql: mysql_instance2
          kafka_connector1:
            kafka: kafka_instance1
          kafka_connector2:
            kafka: kafka_instance2
    

Detailed Explanation

platform_instance

  • Scope: Global for the entire recipe.
  • Precedence: Lowest precedence. If platform_instance_map or connect_to_platform_map is specified, they will override this setting.
  • Use Case: When you have a single instance of a platform and want to apply it globally.

platform_instance_map

  • Scope: Platform-specific.
  • Precedence: Medium precedence. It overrides platform_instance but is overridden by connect_to_platform_map.
  • Use Case: When you have multiple platforms and want to specify a default instance for each platform.

connect_to_platform_map

  • Scope: Connector-specific.
  • Precedence: Highest precedence. It overrides both platform_instance and platform_instance_map.
  • Use Case: When you have multiple instances of the same platform and need to specify which instance to use for each connector.

Example Configuration Combining All Three

Here is an example configuration that combines all three settings:

source:
  type: kafka-connect
  config:
    platform_instance: default_instance
    platform_instance_map:
      snowflake: snowflake_instance1
      mysql: mysql_instance1
    connect_to_platform_map:
      mysql_connector1:
        mysql: mysql_instance1
      mysql_connector2:
        mysql: mysql_instance2
      kafka_connector1:
        kafka: kafka_instance1
      kafka_connector2:
        kafka: kafka_instance2

In this example:

  • default_instance is the global default.
  • snowflake_instance1 and mysql_instance1 are the default instances for Snowflake and MySQL platforms, respectively.
  • mysql_connector1 and mysql_connector2 use different MySQL instances, overriding the default specified in platform_instance_map.
  • kafka_connector1 and kafka_connector2 use different Kafka instances.

Additional Resources

For more detailed information, you can refer to the following resources:

Would you like more detailed steps or further assistance on a specific part of the process?

Sources:

  • <Kafka Connect | DataHub Connect Advanced Configurations> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

<@U06TM7M40RK> I have one instance of dynamodb, documentdb, postgres, mongodb, s3 and two instances of kafka (msk and kafka). What will my config look like?

To configure your Kafka Connect ingestion with one instance of DynamoDB, DocumentDB, PostgreSQL, MongoDB, S3, and two instances of Kafka (MSK and Kafka), you will need to use platform_instance_map and connect_to_platform_map to specify the platform instances appropriately.

Configuration Steps

  1. Define Platform Instances: Specify the platform instances for each source and sink system.
  2. Configure Kafka Connect Recipe: Use platform_instance_map for single-instance platforms and connect_to_platform_map for multiple-instance platforms.

Example Configuration

Step 1: Define Platform Instances

In your ingestion recipes, define the platform instances for each source or sink system. For example:

DynamoDB Ingestion Recipe:

source:
  type: dynamodb
  config:
    platform_instance: dynamodb_instance1
    table_name: your_table_name
    region: your_region
    access_key_id: your_access_key_id
    secret_access_key: your_secret_access_key

DocumentDB Ingestion Recipe:

source:
  type: documentdb
  config:
    platform_instance: documentdb_instance1
    host_port: your_documentdb_host:27017
    database: your_database
    username: your_username
    password: your_password

PostgreSQL Ingestion Recipe:

source:
  type: postgres
  config:
    platform_instance: postgres_instance1
    host_port: your_postgres_host:5432
    database: your_database
    username: your_username
    password: your_password

MongoDB Ingestion Recipe:

source:
  type: mongodb
  config:
    platform_instance: mongodb_instance1
    host_port: your_mongodb_host:27017
    database: your_database
    username: your_username
    password: your_password

S3 Ingestion Recipe:

source:
  type: s3
  config:
    platform_instance: s3_instance1
    bucket_name: your_bucket_name
    region: your_region
    access_key_id: your_access_key_id
    secret_access_key: your_secret_access_key

Step 2: Configure Kafka Connect Recipe

In your Kafka Connect recipe, use platform_instance_map for single-instance platforms and connect_to_platform_map for multiple-instance platforms.

Kafka Connect Recipe:

source:
  type: kafka-connect
  config:
    bootstrap_servers: "b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092,b-3.msk-cluster.amazonaws.com:9092"
    platform_instance_map:
      dynamodb: dynamodb_instance1
      documentdb: documentdb_instance1
      postgres: postgres_instance1
      mongodb: mongodb_instance1
      s3: s3_instance1
    connect_to_platform_map:
      kafka_connector1:
        kafka: kafka_instance1
      kafka_connector2:
        kafka: kafka_instance2

Explanation

  • platform_instance_map: This map specifies the default platform instance for each platform. Since you have only one instance for DynamoDB, DocumentDB, PostgreSQL, MongoDB, and S3, you can use this map to set their instances.
  • connect_to_platform_map: This map specifies the platform instance for each connector. Since you have two instances of Kafka (MSK and Kafka), you need to specify which instance to use for each Kafka connector.

Additional Resources

For more detailed information, you can refer to the following resources:

Would you like more detailed steps or further assistance on a specific part of the process?

Sources:

  • <Kafka Connect | DataHub Connect Advanced Configurations> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

<@U06TM7M40RK> explain generic connectors

Generic connectors in DataHub are used to integrate with data sources or sinks that are not natively supported by DataHub. These connectors allow you to define custom ingestion logic to fetch metadata from various systems and ingest it into DataHub. This flexibility is crucial for integrating with proprietary systems, custom databases, or any other data sources that do not have out-of-the-box support.

Key Concepts

  1. Custom Source Class: You need to create a custom source class that extends DataHub’s Source class. This class will contain the logic to fetch metadata from your data source.
  2. Configuration Class: Define a configuration class that includes all the necessary parameters required to connect to your data source.
  3. Ingestion Recipe: Create an ingestion recipe that specifies the custom source and its configuration.
  4. Registration: Register your custom source with DataHub so that it can be used in ingestion recipes.

Steps to Create a Generic Connector

Step 1: Define the Configuration Class

Create a configuration class that includes fields for all the necessary parameters required to connect to your data source.

from pydantic import BaseModel

class CustomSourceConfig(BaseModel):
    host: str
    port: int
    username: str
    password: str
    database: str
    # Add other configuration fields as needed

Step 2: Create the Custom Source Class

Create a custom source class that extends DataHub’s Source class. Implement the logic to fetch metadata from your data source.

from datahub.ingestion.api.source import Source
from datahub.ingestion.api.common import PipelineContext
from typing import Dict, Any, Iterable
from datahub.metadata.schema_classes import MetadataWorkUnit

class CustomSource(Source):
    def __init__(self, config: CustomSourceConfig, ctx: PipelineContext):
        super().__init__(ctx)
        self.config = config

    def get_workunits(self) -> Iterable[MetadataWorkUnit]:
        # Implement your custom logic to fetch metadata
        # Example: Fetch tables from a custom database
        tables = self.fetch_tables()
        for table in tables:
            workunit = self.create_workunit(table)
            yield workunit

    def fetch_tables(self):
        # Custom logic to fetch tables from the database
        pass

    def create_workunit(self, table):
        # Custom logic to create a MetadataWorkUnit from a table
        pass

    @classmethod
    def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "CustomSource":
        config = CustomSourceConfig.parse_obj(config_dict)
        return cls(config, ctx)

Step 3: Create an Ingestion Recipe

Create a YAML file (e.g., custom_source_recipe.yaml) that specifies the custom source and its configuration.

source:
  type: custom-source
  config:
    host: "localhost"
    port: 5432
    username: "user"
    password: "password"
    database: "my_database"
sink:
  type: datahub-rest
  config:
    server: "<http://localhost:8080>"
    token: "your-auth-token"

Step 4: Register the Custom Source

Register your custom source with DataHub so that it can be used in ingestion recipes.

from datahub.ingestion.source.source_registry import source_registry

# Register the custom source
source_registry.register("custom-source", CustomSource)

Example Usage

Once you have defined and registered your custom source, you can run the ingestion using the DataHub CLI:

datahub ingest -c custom_source_recipe.yaml

Additional Resources

For more detailed information, you can refer to the following resources:

Would you like more detailed steps or further assistance on a specific part of the process?

Sources:

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><