Ingesting Detailed Schema and Stats Information for Kafka Topics

Original Slack Thread

Hi, team
I have collected the kafka topic information that I use.
However, I would also like to see more detailed information about the topic, such as schema (like column information) or stats information. Is there an option to collect this information when ingesting?attachment

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Are you using UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g. 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g. BigQuery)

<@U06TM7M40RK> Can you help with the questions in this thread?


:information_source: We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a :thumbsup: or :thumbsdown: so other folks in the Community know if you’re still looking for help on the topic!

To provide you with the most accurate assistance, I need a bit more information:

  1. Are you using the UI or CLI for ingestion?
  2. Which version of DataHub are you using? (e.g., 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g., BigQuery, Kafka)

Once you provide these details, I can give you a more tailored response. However, I can provide some general guidance based on the information available.

General Guidance

To ingest detailed information about Kafka topics, including schema and stats, you need to ensure that your Kafka ingestion setup is correctly configured to pull this metadata. Here are some steps and considerations:

  1. Schema Registry Integration: Ensure that your schema registry is integrated with DataHub. This allows DataHub to pull schema information for your Kafka topics.

    • According to <Slack Slack conversation>, DataHub uses the convention of Topic-value and <topic>-key by default to match schemas. If your schema names do not follow this convention, you can use the topic_subject_map configuration to map your topics to their respective schemas.
  2. Configuration: Make sure your Kafka ingestion recipe is correctly configured to include schema information.

    • You can override the default schema lookup behavior by specifying a custom mapping in the topic_subject_map configuration. For example:
      topic_subject_map:
        "my_topic_1-key": "io.acryl.Schema1"
        "my_topic_1-value": "io.acryl.Schema2"
      
  3. Ingestion Recipe: Ensure your ingestion recipe includes the necessary configurations to pull detailed metadata.

    • Here is an example of how you might configure your ingestion recipe:
      source:
        type: "kafka"
        config:
          connection:
            bootstrap: "localhost:9092"
            schema_registry_url: "http://localhost:8081"
          topic_subject_map:
            "my_topic_1-key": "io.acryl.Schema1"
            "my_topic_1-value": "io.acryl.Schema2"
      
  4. Logs and Debugging: Check the logs for any errors or warnings related to schema ingestion.

    • As mentioned in <Slack Slack conversation>, if there are issues with schema ingestion, there should be log messages indicating the problem.

Additional Information

  • Stats Information: Currently, DataHub primarily focuses on ingesting topic names and schemas. If you need more detailed stats information, you might need to extend the ingestion capabilities or use additional tools to collect and visualize this data.

Please provide the additional details requested above so I can offer more specific guidance.

Sources:

  • <Slack Conversation 3> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

(datahub v0.10.4)

  1. UI
  2. 0.10.4
  3. Kafka

You can provide a schema registry in the ingestion to import the topic schema info. If you don’t use schema registry in your Kafka then DataHub cannot import the topic schema and you will have it to add it manually with the python SDK

<@U06TM7M40RK>

Thank you for providing the additional details. Based on your setup (DataHub v0.10.4, using UI for ingestion, and integrating with Kafka), here is a more tailored response:

Ingesting Kafka Topic Schema Information

To ingest detailed schema information for your Kafka topics, you need to ensure that your Kafka ingestion setup is correctly configured to pull this metadata. Here are the steps:

  1. Schema Registry Integration: Ensure that your schema registry is integrated with DataHub. This allows DataHub to pull schema information for your Kafka topics.

  2. Configuration: Make sure your Kafka ingestion recipe is correctly configured to include schema information.

Example Configuration

Here is an example of how you might configure your ingestion recipe to include schema information:

source:
  type: "kafka"
  config:
    connection:
      bootstrap: "localhost:9092"
      schema_registry_url: "http://localhost:8081"
    topic_patterns:
      allow:
        - ".*"  # Adjust this pattern to match your topics
sink:
  type: "datahub-rest"
  config:
    server: "http://datahub-gms:8080"

Manual Schema Addition

If you do not use a schema registry, DataHub cannot automatically import the topic schema. In this case, you will need to add the schema manually using the Python SDK. Here is a basic example of how you can do this:

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.metadata.schema_classes import SchemaField, SchemaMetadata, StringTypeClass
from datahub.emitter.rest_emitter import DatahubRestEmitter

# Initialize the emitter
emitter = DatahubRestEmitter("http://datahub-gms:8080")

# Define the schema fields
fields = [
    SchemaField(
        fieldPath="field1",
        type=StringTypeClass(),
        nativeDataType="string",
        description="Description of field1"
    ),
    SchemaField(
        fieldPath="field2",
        type=StringTypeClass(),
        nativeDataType="string",
        description="Description of field2"
    )
]

# Create the schema metadata
schema_metadata = SchemaMetadata(
    schemaName="exampleSchema",
    platform="kafka",
    version=0,
    fields=fields
)

# Emit the schema metadata
dataset_urn = make_dataset_urn(platform="kafka", name="your_topic_name", env="PROD")
emitter.emit_mce(schema_metadata)

Additional Considerations

  • Logs and Debugging: Check the logs for any errors or warnings related to schema ingestion. This can help you identify and resolve issues.
  • Custom Schema Registry: If you have a custom schema registry, you can implement a custom schema registry client as described in the <Kafka | DataHub documentation>.

Sources:

  • <Slack Conversation 3>
  • <Kafka | DataHub documentation> :+1: 1 button :-1: 0 button :bulb: Hint: Mention <@U06TM7M40RK> in the thread for followups.

:+1: