Ingesting Schema Using Kafka Source with Confluent Schema Registry - Troubleshooting Schema Retrieval and Ingestion

Original Slack Thread

Hi Guys,
I am trying to ingest using Kafka source only with schema registry. I have not used the Confluent schema registry but deployed it as a docker container locally using spring boot. Can you please confirm if we can use this or we need to write custom class overriding KafkaSchemaRegistryBase.get_schema_metadata() method?

My recipe file is
source:
type: "kafka"
config:
platform_instance: "local"
# Set the custom schema registry implementation class
schema_registry_class: "datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry"
connection:
schema_registry_url: <http://localhost:8081>
sink:
type: datahub-rest
config:
server: <http://localhost:8080>
token: *******

But i see below warnings, on sink i see the datasets created from the topics but there is no schema present. Can you please suggest what am i missing here?
[2024-05-02 01:04:47,645] INFO {datahub.cli.ingest_cli:152} - DataHub CLI version: 0.12.1.4
[2024-05-02 01:04:49,134] INFO {datahub.ingestion.run.pipeline:238} - Sink configured successfully. DataHubRestEmitter: configured to talk to <http://localhost:8080> with token: **********
%4|1714592089.951|CONFWARN|rdkafka#producer-2| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
[2024-05-02 01:06:05,207] WARNING {datahub.ingestion.source.confluent_schema_registry:63} - Failed to get subjects from schema registry: Unknown Schema Registry Error: b'<HTML><HEAD>\r\n\r\n<TITLE>Network Error</TITLE>\r\n\r\n</HEAD>\r\n\r\n<BODY>\r\n\r\n<FONT face="Helvetica">\r\n\r\n<big><strong></strong></big><BR>\r\n\r\n</FONT>\r\n\r\n<blockquote>\r\n\r\n<TABLE border=0 cellPadding=1 width="80%">\r\n\r\n<TR><TD>\r\n\r\n<FONT face="Helvetica">\r\n\r\n<big>Network Error (tcp_error)</big>\r\n\r\n<BR>\r\n\r\n<BR>\r\n\r\n</FONT>\r\n\r\n</TD></TR>\r\n\r\n<TR><TD>\r\n\r\n<FONT face="Helvetica">\r\n\r\nA communication error occurred: "Operation timed out"\r\n\r\n</FONT>\r\n\r\n</TD></TR>\r\n\r\n<TR><TD>\r\n\r\n<FONT face="Helvetica">\r\n\r\nThe Web Server may be down, too busy, or experiencing other problems preventing it from responding to requests. You may wish to try again at a later time.\r\n\r\n</FONT>\r\n\r\n</TD></TR>\r\n\r\n<TR><TD>\r\n\r\n<FONT face="Helvetica" SIZE=2>\r\n\r\n<BR>\r\n\r\nFor assistance, contact the Helpdesk at extension <tel:+16504324357|+1 (650) 432-4357> or tieline 6 (720)4357. <p>Proxy = na975cr01blbc01</p>\r\n\r\n</FONT>\r\n\r\n</TD></TR>\r\n\r\n</TABLE>\r\n\r\n</blockquote>\r\n\r\n</FONT>\r\n\r\n</BODY></HTML>\r\n\r\n' (HTTP status code 503, SR code -1)
[2024-05-02 01:06:05,208] INFO {datahub.ingestion.run.pipeline:255} - Source configured successfully.
[2024-05-02 01:06:05,212] INFO {datahub.cli.ingest_cli:133} - Starting metadata ingestion
/[2024-05-02 01:06:05,743] INFO {datahub.ingestion.source.kafka:544} - Fetching config details for all topics |[2024-05-02 01:06:05,750] INFO {datahub.ingestion.source.kafka:575} - Config details for topic aabharti_orders fetched successfully
[2024-05-02 01:06:05,750] INFO {datahub.ingestion.source.kafka:575} - Config details for topic reecha_orders fetched successfully
[2024-05-02 01:06:05,750] INFO {datahub.ingestion.source.kafka:575} - Config details for topic _schemas fetched successfully
[2024-05-02 01:06:05,750] INFO {datahub.ingestion.source.kafka:575} - Config details for topic __consumer_offsets fetched successfully

Cli report:
{'cli_version': '0.12.1.4',
'cli_entry_location': '/opt/homebrew/lib/python3.9/site-packages/datahub/__init__.py',
'py_version': '3.9.18 (main, Aug 24 2023, 18:16:58) \n[Clang 15.0.0 (clang-1500.1.0.2.5)]',
'py_exec_path': '/opt/homebrew/opt/python@3.9/bin/python3.9',
'os_details': 'macOS-14.4.1-arm64-arm-64bit',
'mem_info': '89.77 MB',
'peak_memory_usage': '89.77 MB',
'disk_info': {'total': '494.38 GB', 'used': '471.61 GB', 'used_initally': '471.61 GB', 'free': '22.77 GB'},
'peak_disk_usage': '471.61 GB',
'thread_count': 2,
'peak_thread_count': 2}
Source (kafka) report:
{'events_produced': 1,
'events_produced_per_sec': 0,
'entities': {'dataset': ['urn:li:dataset:(urn:li:dataPlatform:kafka,local.aabharti_orders,PROD)']},
'aspects': {'dataset': {'status': 1, 'browsePaths': 1, 'datasetProperties': 1, 'dataPlatformInstance': 1}},
'warnings': {'aabharti_orders': ['The schema registry subject for the value schema is not found. The topic is either schema-less, or no messages have been written to the topic yet.'],
'reecha_orders': ['The schema registry subject for the value schema is not found. The topic is either schema-less, or no messages have been written to the topic yet.']},
'failures': {},
'soft_deleted_stale_entities': [],
'topics_scanned': 2,
'filtered': [],
'start_time': '2024-05-02 01:04:49.951838 (1 minute and 15.8 seconds ago)',
'running_time': '1 minute and 15.8 seconds'}
Sink (datahub-rest) report:
{'total_records_written': 0,
'records_written_per_second': 0,
'warnings': [],
'failures': [],
'start_time': '2024-05-02 01:04:48.661682 (1 minute and 17.09 seconds ago)',
'current_time': '2024-05-02 01:06:05.752596 (now)',
'total_duration_in_seconds': 77.09,
'gms_version': 'v0.12.1',
'pending_requests': 0}

:hourglass_flowing_sand: Pipeline running with at least 2 warnings so far; produced 1 events in 1 minute and 15.8 seconds. -
[2024-05-02 01:06:08,429] INFO {datahub.cli.ingest_cli:146} - Finished metadata ingestion \
Cli report:
{'cli_version': '0.12.1.4',
'cli_entry_location': '/opt/homebrew/lib/python3.9/site-packages/datahub/__init__.py',
'py_version': '3.9.18 (main, Aug 24 2023, 18:16:58) \n[Clang 15.0.0 (clang-1500.1.0.2.5)]',
'py_exec_path': '/opt/homebrew/opt/python@3.9/bin/python3.9',
'os_details': 'macOS-14.4.1-arm64-arm-64bit',
'mem_info': '90.1 MB',
'peak_memory_usage': '90.1 MB',
'disk_info': {'total': '494.38 GB', 'used': '471.61 GB', 'used_initally': '471.61 GB', 'free': '22.77 GB'},
'peak_disk_usage': '471.61 GB',
'thread_count': 2,
'peak_thread_count': 2}
Source (kafka) report:
{'events_produced': 8,
'events_produced_per_sec': 0,
'entities': {'dataset': ['urn:li:dataset:(urn:li:dataPlatform:kafka,local.aabharti_orders,PROD)',
'urn:li:dataset:(urn:li:dataPlatform:kafka,local.reecha_orders,PROD)']},
'aspects': {'dataset': {'status': 2, 'browsePaths': 2, 'datasetProperties': 2, 'dataPlatformInstance': 2, 'subTypes': 2, 'browsePathsV2': 4}},
'warnings': {'aabharti_orders': ['The schema registry subject for the value schema is not found. The topic is either schema-less, or no messages have been written to the topic yet.'],
'reecha_orders': ['The schema registry subject for the value schema is not found. The topic is either schema-less, or no messages have been written to the topic yet.']},
'failures': {},
'soft_deleted_stale_entities': [],
'topics_scanned': 4,
'filtered': ['_schemas', '__consumer_offsets'],
'start_time': '2024-05-02 01:04:49.951838 (1 minute and 19.12 seconds ago)',
'running_time': '1 minute and 19.12 seconds'}
Sink (datahub-rest) report:
{'total_records_written': 8,
'records_written_per_second': 0,
'warnings': [],
'failures': [],
'start_time': '2024-05-02 01:04:48.661682 (1 minute and 20.41 seconds ago)',
'current_time': '2024-05-02 01:06:09.067328 (now)',
'total_duration_in_seconds': 80.41,
'gms_version': 'v0.12.1',
'pending_requests': 0}

Pipeline finished with at least 2 warnings; produced 8 events in 1 minute and 19.12 seconds.

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Are you using UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g. 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g. BigQuery)

<@U02E2B0DWSK> <@U028L1V9BE1> <@U01GCJKA8P9>

I am trying to ingest schema using custom schema registry but still its is not going through.
Following are the sequence of steps i took :
• pip install sumup-sr-client
• ls -l /opt/homebrew/anaconda3/lib/python3.11/site-packages/schema_registry/registry/sumup_schema_registry.py
• created recipe yaml with schema_registry_class: "schema_registry.registry.sumup_schema_registry.ConfluentSchemaRegistry"
• but still see warning "Failed to get subjects from schema registry:" and then see warning 'warnings': {'test-schmareg': ['The schema registry subject for the value schema is not found. The topic is either schema-less, or no messages have been written to the topic yet.']}
Can you please suggest here, i am able to see topics getting populated in datahub but not schemas?