Ingesting Data from Kafka into DataHub

Original Slack Thread

can you help me ingest data from kafka to datahub <@U06TM7M40RK>

To ingest data from Kafka into DataHub, you need to set up a metadata ingestion pipeline that connects to your Kafka instance and sends metadata to DataHub. Here are the general steps to achieve this:

  1. Install the DataHub Kafka Plugin:
    You need to install the DataHub Kafka plugin to enable Kafka as a source for metadata ingestion. You can do this by running:

    pip install 'acryl-datahub[datahub-kafka]'
    
  2. Configure the Kafka Source:
    You need to create a configuration file (YAML) for the Kafka source. This configuration will specify how to connect to your Kafka instance and which topics to ingest. Here’s an example configuration:

    source:
      type: "kafka"
      config:
        platform_instance: "YOUR_CLUSTER_ID"
        connection:
          bootstrap: "your-kafka-bootstrap-server:9092"
          consumer_config:
            security.protocol: "SASL_SSL"
            sasl.mechanism: "PLAIN"
            sasl.username: "${CLUSTER_API_KEY_ID}"
            sasl.password: "${CLUSTER_API_KEY_SECRET}"
        schema_registry_url: "https://your-schema-registry-url"
        schema_registry_config:
          basic.auth.user.info: "${REGISTRY_API_KEY_ID}:${REGISTRY_API_KEY_SECRET}"
        topic_patterns:
          allow: ['.*']  # Adjust this to specify which topics to ingest
          deny: ['_confluent.*']  # Example of topics to exclude
    
  3. Set Up the Sink Configuration:
    You also need to configure the sink to specify where the ingested metadata should be sent. For DataHub, you can use the REST API sink:

    sink:
      type: "datahub-rest"
      config:
        server: "http://localhost:8080"  # Replace with your DataHub GMS endpoint
    
  4. Run the Ingestion Pipeline:
    With the configuration in place, you can run the ingestion pipeline using the DataHub CLI:

    datahub ingest -c your-config-file.yaml
    
  5. Verify the Ingestion:
    After running the pipeline, you can verify that the metadata has been ingested by checking the DataHub UI.

For more detailed information on configuring Kafka as a source, you can refer to the <Kafka | DataHub Kafka Source Documentation>.

Would you like more detailed steps or specific configurations for any of these processes?

Sources:

<@U06TM7M40RK> can you give me a step wise answer

Certainly! Here is a step-by-step guide to ingest data from Kafka into DataHub:

Step 1: Install the DataHub Kafka Plugin
First, ensure you have Python 3.6+ installed. Then, install the DataHub CLI and the Kafka plugin:

python3 -m pip install --upgrade pip wheel setuptools
python3 -m pip install --upgrade acryl-datahub
pip install 'acryl-datahub[datahub-kafka]'

Step 2: Create a Kafka Source Configuration
Create a YAML configuration file (e.g., kafka_ingestion.yml) to define the Kafka source. This file will specify how to connect to your Kafka instance and which topics to ingest.

source:
  type: "kafka"
  config:
    platform_instance: "YOUR_CLUSTER_ID"
    connection:
      bootstrap: "your-kafka-bootstrap-server:9092"
      consumer_config:
        security.protocol: "SASL_SSL"
        sasl.mechanism: "PLAIN"
        sasl.username: "${CLUSTER_API_KEY_ID}"
        sasl.password: "${CLUSTER_API_KEY_SECRET}"
    schema_registry_url: "https://your-schema-registry-url"
    schema_registry_config:
      basic.auth.user.info: "${REGISTRY_API_KEY_ID}:${REGISTRY_API_KEY_SECRET}"
    topic_patterns:
      allow: ['.*']  # Adjust this to specify which topics to ingest
      deny: ['_confluent.*']  # Example of topics to exclude

Step 3: Configure the DataHub REST Sink
Add the sink configuration to the same YAML file to specify where the ingested metadata should be sent.

sink:
  type: "datahub-rest"
  config:
    server: "http://localhost:8080"  # Replace with your DataHub GMS endpoint

Step 4: Run the Ingestion Pipeline
Use the DataHub CLI to run the ingestion based on your configuration file.

datahub ingest -c kafka_ingestion.yml

This command will read the metadata from the specified Kafka topics and ingest it into your DataHub instance.

Step 5: Verify the Ingestion
After running the pipeline, verify that the metadata has been ingested by checking the DataHub UI. You should see the metadata from the Kafka topics reflected in the DataHub interface.

Additional Resources
For more detailed information on configuring Kafka as a source, you can refer to the <Kafka | DataHub Kafka Source Documentation>.

Would you like more detailed steps or specific configurations for any of these processes?

Sources:

do i need to run a kafka instance and push some data into its topic

Execution finished with errors.
{'exec_id': 'd86a1227-f47a-4115-a7ed-a3af443760a2',
 'infos': ['2024-10-30 15:11:02.746857 INFO: Starting execution for task with name=RUN_INGEST',
           "2024-10-30 15:12:15.034743 INFO: Failed to execute 'datahub ingest', exit code 1",
           '2024-10-30 15:12:15.096333 INFO: Caught exception EXECUTING task_id=d86a1227-f47a-4115-a7ed-a3af443760a2, name=RUN_INGEST, '
           'stacktrace=Traceback (most recent call last):\n'
           '  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/acryl/executor/execution/default_executor.py", line 139, in execute_task\n'
           '    task_event_loop.run_until_complete(task_future)\n'
           '  File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete\n'
           '    return future.result()\n'
           '  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/acryl/executor/execution/sub_process_ingestion_task.py", line 400, in '
           'execute\n'
           '    raise TaskError("Failed to execute \'datahub ingest\'")\n'
           "acryl.executor.execution.task.TaskError: Failed to execute 'datahub ingest'\n"],
 'errors': []}

~~~~ Ingestion Report ~~~~
{
  "cli": {
    "cli_version": "0.14.1",
    "cli_entry_location": "/tmp/datahub/ingest/venv-kafka-3cbb1ad0ed8a0388/lib/python3.10/site-packages/datahub/__init__.py",
    "models_version": "bundled",
    "py_version": "3.10.12 (main, Jul 29 2024, 16:56:48) [GCC 11.4.0]",
    "py_exec_path": "/tmp/datahub/ingest/venv-kafka-3cbb1ad0ed8a0388/bin/python3",
    "os_details": "Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.35",
    "mem_info": "99.1 MB",
    "peak_memory_usage": "99.1 MB",
    "disk_info": {
      "total": "1.08 TB",
      "used": "16.25 GB",
      "used_initally": "16.25 GB",
      "free": "1.01 TB"
    },
    "peak_disk_usage": "16.25 GB",
    "thread_count": 1,
    "peak_thread_count": 1
  },
  "source": {
    "type": "kafka",
    "report": {
      "events_produced": 0,
      "events_produced_per_sec": 0,
      "entities": {},
      "aspects": {},
      "aspect_urn_samples": {},
      "soft_deleted_stale_entities": [],
      "last_state_non_deletable_entities": [],
      "topics_scanned": 0,
      "filtered": [],
      "start_time": "2024-10-30 15:11:08.484129 (1 minute and 3.44 seconds ago)",
      "running_time": "1 minute and 3.44 seconds",
      "failures": [
        {
          "title": "Pipeline Error",
          "message": "Ingestion pipeline raised an unexpected exception!",
          "context": [
            "&lt;class 'cimpl.KafkaException'&gt;: KafkaError{code=_TRANSPORT,val=-195,str=\"Failed to get metadata: Local: Broker transport failure\"}"
          ]
        }
      ],
      "warnings": [],
      "infos": []
    }
  },
  "sink": {
    "type": "datahub-rest",
    "report": {
      "total_records_written": 0,
      "records_written_per_second": 0,
      "warnings": [],
      "failures": [],
      "start_time": "2024-10-30 15:11:07.918753 (1 minute and 4.01 seconds ago)",
      "current_time": "2024-10-30 15:12:11.924457 (now)",
      "total_duration_in_seconds": 64.01,
      "max_threads": 15,
      "gms_version": "v0.14.1",
      "pending_requests": 0,
      "async_batches_split": 0,
      "main_thread_blocking_timer": null
    }
  }
}

~~~~ Ingestion Logs ~~~~
Obtaining venv creation lock...
Acquired venv creation lock
venv is already set up
venv setup time = 0 sec
This version of datahub supports report-to functionality
+ exec datahub ingest run -c /tmp/datahub/ingest/d86a1227-f47a-4115-a7ed-a3af443760a2/recipe.yml --report-to /tmp/datahub/logs/d86a1227-f47a-4115-a7ed-a3af443760a2/artifacts/ingestion_report.json
[2024-10-30 15:11:07,818] INFO     {datahub.cli.ingest_cli:149} - DataHub CLI version: 0.14.1
[2024-10-30 15:11:07,830] INFO     {datahub.ingestion.run.pipeline:254} - No sink configured, attempting to use the default datahub-rest sink.
[2024-10-30 15:11:07,925] INFO     {datahub.ingestion.run.pipeline:271} - Sink configured successfully. DataHubRestEmitter: configured to talk to <http://datahub-gms:8080>
%3|1730301068.315|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%4|1730301068.484|CONFWARN|rdkafka#producer-2| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
%3|1730301068.588|FAIL|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1730301069.457|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
%3|1730301070.480|FAIL|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 1ms in state CONNECT, 1 identical error(s) suppressed)
[2024-10-30 15:11:11,775] WARNING  {datahub.ingestion.source.confluent_schema_registry:63} - Failed to get subjects from schema registry: HTTPConnectionPool(host='localhost', port=8080): Max retries exceeded with url: /schema-registry/api/subjects (Caused by NewConnectionError('&lt;urllib3.connection.HTTPConnection object at 0x7fd2cd2d9c00&gt;: Failed to establish a new connection: [Errno 111] Connection refused'))
[2024-10-30 15:11:11,778] INFO     {datahub.ingestion.run.pipeline:295} - Source configured successfully.
[2024-10-30 15:11:11,779] INFO     {datahub.cli.ingest_cli:130} - Starting metadata ingestion
%3|1730301099.493|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 29 identical error(s) suppressed)
%3|1730301100.493|FAIL|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 22 identical error(s) suppressed)
%3|1730301130.248|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 38 identical error(s) suppressed)
%3|1730301131.294|FAIL|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 28 identical error(s) suppressed)
[2024-10-30 15:12:11,779] ERROR    {datahub.ingestion.run.pipeline:738} - Ingestion pipeline threw an uncaught exception
Traceback (most recent call last):
  File "/tmp/datahub/ingest/venv-kafka-3cbb1ad0ed8a0388/lib/python3.10/site-packages/datahub/ingestion/run/pipeline.py", line 462, in run
    for wu in itertools.islice(
  File "/tmp/datahub/ingest/venv-kafka-3cbb1ad0ed8a0388/lib/python3.10/site-packages/datahub/ingestion/source/state/stale_entity_removal_handler.py", line 73, in auto_stale_entity_removal
    for wu in stream:
  File "/tmp/datahub/ingest/venv-kafka-3cbb1ad0ed8a0388/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 147, in auto_workunit_reporter
    for wu in stream:
  File "/tmp/datahub/ingest/venv-kafka-3cbb1ad0ed8a0388/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 245, in auto_browse_path_v2
    for urn, batch in _batch_workunits_by_urn(stream):
  File "/tmp/datahub/ingest/venv-kafka-3cbb1ad0ed8a0388/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 460, in _batch_workunits_by_urn
    for wu in stream:
  File "/tmp/datahub/ingest/venv-kafka-3cbb1ad0ed8a0388/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 360, in auto_fix_duplicate_schema_field_paths
    for wu in stream:
  File "/tmp/datahub/ingest/venv-kafka-3cbb1ad0ed8a0388/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 164, in auto_materialize_referenced_tags_terms
    for wu in stream:
  File "/tmp/datahub/ingest/venv-kafka-3cbb1ad0ed8a0388/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 102, in auto_status_aspect
    for wu in stream:
  File "/tmp/datahub/ingest/venv-kafka-3cbb1ad0ed8a0388/lib/python3.10/site-packages/datahub/ingestion/source/kafka.py", line 301, in get_workunits_internal
    topics = self.consumer.list_topics(
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}
[2024-10-30 15:12:11,921] ERROR    {datahub.ingestion.run.pipeline:739} - Pipeline Error: Ingestion pipeline raised an unexpected exception!Pipeline Error: Ingestion pipeline raised an unexpected exception!: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}
[2024-10-30 15:12:11,926] INFO     {datahub.ingestion.reporting.file_reporter:54} - Wrote FAILURE report successfully to &lt;_io.TextIOWrapper name='/tmp/datahub/logs/d86a1227-f47a-4115-a7ed-a3af443760a2/artifacts/ingestion_report.json' mode='w' encoding='UTF-8'&gt;
[2024-10-30 15:12:11,967] INFO     {datahub.cli.ingest_cli:143} - Finished metadata ingestion
Pipeline finished with at least 1 failures; produced 0 events in 1 minute and 3.44 seconds.```

<@U06TM7M40RK>