Hi Guys,
I am trying to ingest using Kafka source only with schema registry. I have not used the Confluent schema registry but deployed it as a docker container locally using spring boot. Can you please confirm if we can use this or we need to write custom class overriding KafkaSchemaRegistryBase.get_schema_metadata() method?
My recipe file is
source:
type: "kafka"
config:
platform_instance: "local"
# Set the custom schema registry implementation class
schema_registry_class: "datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry"
connection:
schema_registry_url: <http://localhost:8081>
sink:
type: datahub-rest
config:
server: <http://localhost:8080>
token: *******
But i see below warnings, on sink i see the datasets created from the topics but there is no schema present. Can you please suggest what am i missing here?
[2024-05-02 01:04:47,645] INFO {datahub.cli.ingest_cli:152} - DataHub CLI version: 0.12.1.4
[2024-05-02 01:04:49,134] INFO {datahub.ingestion.run.pipeline:238} - Sink configured successfully. DataHubRestEmitter: configured to talk to <http://localhost:8080> with token: **********
%4|1714592089.951|CONFWARN|rdkafka#producer-2| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
[2024-05-02 01:06:05,207] WARNING {datahub.ingestion.source.confluent_schema_registry:63} - Failed to get subjects from schema registry: Unknown Schema Registry Error: b'<HTML><HEAD>\r\n\r\n<TITLE>Network Error</TITLE>\r\n\r\n</HEAD>\r\n\r\n<BODY>\r\n\r\n<FONT face="Helvetica">\r\n\r\n<big><strong></strong></big><BR>\r\n\r\n</FONT>\r\n\r\n<blockquote>\r\n\r\n<TABLE border=0 cellPadding=1 width="80%">\r\n\r\n<TR><TD>\r\n\r\n<FONT face="Helvetica">\r\n\r\n<big>Network Error (tcp_error)</big>\r\n\r\n<BR>\r\n\r\n<BR>\r\n\r\n</FONT>\r\n\r\n</TD></TR>\r\n\r\n<TR><TD>\r\n\r\n<FONT face="Helvetica">\r\n\r\nA communication error occurred: "Operation timed out"\r\n\r\n</FONT>\r\n\r\n</TD></TR>\r\n\r\n<TR><TD>\r\n\r\n<FONT face="Helvetica">\r\n\r\nThe Web Server may be down, too busy, or experiencing other problems preventing it from responding to requests. You may wish to try again at a later time.\r\n\r\n</FONT>\r\n\r\n</TD></TR>\r\n\r\n<TR><TD>\r\n\r\n<FONT face="Helvetica" SIZE=2>\r\n\r\n<BR>\r\n\r\nFor assistance, contact the Helpdesk at extension <tel:+16504324357|+1 (650) 432-4357> or tieline 6 (720)4357. <p>Proxy = na975cr01blbc01</p>\r\n\r\n</FONT>\r\n\r\n</TD></TR>\r\n\r\n</TABLE>\r\n\r\n</blockquote>\r\n\r\n</FONT>\r\n\r\n</BODY></HTML>\r\n\r\n' (HTTP status code 503, SR code -1)
[2024-05-02 01:06:05,208] INFO {datahub.ingestion.run.pipeline:255} - Source configured successfully.
[2024-05-02 01:06:05,212] INFO {datahub.cli.ingest_cli:133} - Starting metadata ingestion
/[2024-05-02 01:06:05,743] INFO {datahub.ingestion.source.kafka:544} - Fetching config details for all topics |[2024-05-02 01:06:05,750] INFO {datahub.ingestion.source.kafka:575} - Config details for topic aabharti_orders fetched successfully
[2024-05-02 01:06:05,750] INFO {datahub.ingestion.source.kafka:575} - Config details for topic reecha_orders fetched successfully
[2024-05-02 01:06:05,750] INFO {datahub.ingestion.source.kafka:575} - Config details for topic _schemas fetched successfully
[2024-05-02 01:06:05,750] INFO {datahub.ingestion.source.kafka:575} - Config details for topic __consumer_offsets fetched successfully
Cli report:
{'cli_version': '0.12.1.4',
'cli_entry_location': '/opt/homebrew/lib/python3.9/site-packages/datahub/__init__.py',
'py_version': '3.9.18 (main, Aug 24 2023, 18:16:58) \n[Clang 15.0.0 (clang-1500.1.0.2.5)]',
'py_exec_path': '/opt/homebrew/opt/python@3.9/bin/python3.9',
'os_details': 'macOS-14.4.1-arm64-arm-64bit',
'mem_info': '89.77 MB',
'peak_memory_usage': '89.77 MB',
'disk_info': {'total': '494.38 GB', 'used': '471.61 GB', 'used_initally': '471.61 GB', 'free': '22.77 GB'},
'peak_disk_usage': '471.61 GB',
'thread_count': 2,
'peak_thread_count': 2}
Source (kafka) report:
{'events_produced': 1,
'events_produced_per_sec': 0,
'entities': {'dataset': ['urn:li:dataset:(urn:li:dataPlatform:kafka,local.aabharti_orders,PROD)']},
'aspects': {'dataset': {'status': 1, 'browsePaths': 1, 'datasetProperties': 1, 'dataPlatformInstance': 1}},
'warnings': {'aabharti_orders': ['The schema registry subject for the value schema is not found. The topic is either schema-less, or no messages have been written to the topic yet.'],
'reecha_orders': ['The schema registry subject for the value schema is not found. The topic is either schema-less, or no messages have been written to the topic yet.']},
'failures': {},
'soft_deleted_stale_entities': [],
'topics_scanned': 2,
'filtered': [],
'start_time': '2024-05-02 01:04:49.951838 (1 minute and 15.8 seconds ago)',
'running_time': '1 minute and 15.8 seconds'}
Sink (datahub-rest) report:
{'total_records_written': 0,
'records_written_per_second': 0,
'warnings': [],
'failures': [],
'start_time': '2024-05-02 01:04:48.661682 (1 minute and 17.09 seconds ago)',
'current_time': '2024-05-02 01:06:05.752596 (now)',
'total_duration_in_seconds': 77.09,
'gms_version': 'v0.12.1',
'pending_requests': 0}
:hourglass_flowing_sand: Pipeline running with at least 2 warnings so far; produced 1 events in 1 minute and 15.8 seconds. -
[2024-05-02 01:06:08,429] INFO {datahub.cli.ingest_cli:146} - Finished metadata ingestion \
Cli report:
{'cli_version': '0.12.1.4',
'cli_entry_location': '/opt/homebrew/lib/python3.9/site-packages/datahub/__init__.py',
'py_version': '3.9.18 (main, Aug 24 2023, 18:16:58) \n[Clang 15.0.0 (clang-1500.1.0.2.5)]',
'py_exec_path': '/opt/homebrew/opt/python@3.9/bin/python3.9',
'os_details': 'macOS-14.4.1-arm64-arm-64bit',
'mem_info': '90.1 MB',
'peak_memory_usage': '90.1 MB',
'disk_info': {'total': '494.38 GB', 'used': '471.61 GB', 'used_initally': '471.61 GB', 'free': '22.77 GB'},
'peak_disk_usage': '471.61 GB',
'thread_count': 2,
'peak_thread_count': 2}
Source (kafka) report:
{'events_produced': 8,
'events_produced_per_sec': 0,
'entities': {'dataset': ['urn:li:dataset:(urn:li:dataPlatform:kafka,local.aabharti_orders,PROD)',
'urn:li:dataset:(urn:li:dataPlatform:kafka,local.reecha_orders,PROD)']},
'aspects': {'dataset': {'status': 2, 'browsePaths': 2, 'datasetProperties': 2, 'dataPlatformInstance': 2, 'subTypes': 2, 'browsePathsV2': 4}},
'warnings': {'aabharti_orders': ['The schema registry subject for the value schema is not found. The topic is either schema-less, or no messages have been written to the topic yet.'],
'reecha_orders': ['The schema registry subject for the value schema is not found. The topic is either schema-less, or no messages have been written to the topic yet.']},
'failures': {},
'soft_deleted_stale_entities': [],
'topics_scanned': 4,
'filtered': ['_schemas', '__consumer_offsets'],
'start_time': '2024-05-02 01:04:49.951838 (1 minute and 19.12 seconds ago)',
'running_time': '1 minute and 19.12 seconds'}
Sink (datahub-rest) report:
{'total_records_written': 8,
'records_written_per_second': 0,
'warnings': [],
'failures': [],
'start_time': '2024-05-02 01:04:48.661682 (1 minute and 20.41 seconds ago)',
'current_time': '2024-05-02 01:06:09.067328 (now)',
'total_duration_in_seconds': 80.41,
'gms_version': 'v0.12.1',
'pending_requests': 0}
Pipeline finished with at least 2 warnings; produced 8 events in 1 minute and 19.12 seconds.