<@U06TM7M40RK>
Getting below error while running kafka ingestion
/[2025-01-02 06:23:05,794] INFO {datahub.cli.ingest_cli:143} - Finished metadata ingestion```
<@U06TM7M40RK>
Getting below error while running kafka ingestion
/[2025-01-02 06:23:05,794] INFO {datahub.cli.ingest_cli:143} - Finished metadata ingestion```
The error message you’re encountering indicates a timeout issue when trying to emit metadata to the DataHub Global Metadata Service (GMS). Specifically, the error is a ReadTimeoutError
, which suggests that the request to the GMS is taking longer than the allowed 30 seconds to complete.
Here are some potential causes and steps to troubleshoot this issue:
Network Connectivity: Ensure that there is stable network connectivity between the Kafka ingestion process and the DataHub GMS. Network issues can cause delays and timeouts.
GMS Performance: Check the performance and resource allocation of the GMS. If the GMS is under heavy load or not allocated sufficient resources, it may not be able to process requests in a timely manner. Consider scaling up the resources or instances of the GMS.
Request Size: Large requests can take longer to process. If the ingestion involves large datasets or numerous records, consider breaking them into smaller batches to reduce processing time.
Timeout Configuration: If the above steps do not resolve the issue, you may need to increase the timeout setting for the HTTP connection. This can often be configured in the ingestion pipeline settings or the client making the request.
Logs and Monitoring: Check the logs of the GMS for any errors or warnings that might indicate what is causing the delay. Monitoring tools can also help identify bottlenecks in the system.
For more detailed guidance, you can refer to the <Ingestion | DataHub UI Ingestion Debugging Guide>, which provides insights into common issues and troubleshooting steps for ingestion failures.
Would you like more detailed steps on any of these points, or do you have additional information about your current setup that might help narrow down the issue?
Sources:
<@U06TM7M40RK>
How to reduce batch size or request size?
To reduce the batch size or request size during ingestion in DataHub, you can adjust the configuration settings related to batch processing. Here are some steps you can take:
Adjust Batch Size in DataHub Rest Emitter:
The DataHubRestEmitter
class in DataHub has a configuration for the maximum payload size and batch length. You can adjust these settings to reduce the batch size.
INGEST_MAX_PAYLOAD_BYTES
is set to 15 MB by default. You can reduce this value to limit the size of each request payload.BATCH_INGEST_MAX_PAYLOAD_LENGTH
is set to 200 by default. This limits the number of Metadata Change Proposals (MCPs) in a batch.You can set these environment variables to adjust the batch size:
os.environ["DATAHUB_REST_EMITTER_BATCH_MAX_PAYLOAD_LENGTH"] = "100" # Example value
Configure DataHub Rest Sink:
The DatahubRestSinkConfig
class allows you to configure the maximum number of MCPs per batch using the max_per_batch
parameter. This is particularly useful in async batch mode.
from datahub.ingestion.sink.datahub_rest import DatahubRestSinkConfig
config = DatahubRestSinkConfig(
max_per_batch=50 # Example value
)
Modify Ingestion Pipeline Configuration:
If you are using a configuration file for your ingestion pipeline, you can specify the batch size directly in the configuration. Look for parameters like batch_size
or max_per_batch
in your configuration file and adjust them accordingly.
Check Documentation and Examples:
Refer to the <https://datahubproject.io/docs|DataHub documentation> for specific examples and additional configuration options that might be available for your version of DataHub.
If you need more detailed guidance on implementing these changes, please let me know!
Sources:
<@U06TM7M40RK>
How to ingest just 1 schema in Kafka recipe?
To ingest metadata for just one specific schema in a Kafka ingestion recipe for DataHub, you can use the topic_patterns
configuration to filter the topics associated with that schema. Here’s how you can set it up:
Identify the Topic: Determine the Kafka topic that is associated with the schema you want to ingest.
Configure the Ingestion Recipe: Use the topic_patterns
configuration to specify the exact topic you want to include. This will ensure that only the metadata for that specific topic (and its associated schema) is ingested.
Here is an example configuration:
source:
type: kafka
config:
connection:
bootstrap: "broker:9092"
schema_registry_url: "http://localhost:8081"
topic_patterns:
allow:
- "^your_specific_topic$" # Replace 'your_specific_topic' with the actual topic name
stateful_ingestion:
enabled: true
remove_stale_metadata: true
sink:
type: datahub-rest
config:
server: "http://localhost:8080"
Explanation:
topic_patterns.allow
: This field uses a regex pattern to specify which topics to include. By setting it to the exact name of your topic, you ensure that only this topic’s metadata is ingested.schema_registry_url
: Ensure that this points to your schema registry where the schema for the topic is stored.datahub ingest -c path/to/your_kafka_recipe.yml
This setup will ingest metadata only for the specified topic and its associated schema.
Would you like more details on any specific part of this process?
Sources:
<@U06TM7M40RK>
How to reduce request size?
To reduce the request size during ingestion in DataHub, you can adjust several configuration settings related to batch processing and payload limits. Here are some steps you can take:
Adjust Batch Size in DataHub Rest Emitter:
The DataHubRestEmitter
class in DataHub has configurations for the maximum payload size and batch length. You can adjust these settings to reduce the request size.
INGEST_MAX_PAYLOAD_BYTES
is set to 15 MB by default. You can reduce this value to limit the size of each request payload.BATCH_INGEST_MAX_PAYLOAD_LENGTH
is set to 200 by default. This limits the number of Metadata Change Proposals (MCPs) in a batch.You can set these environment variables to adjust the batch size:
os.environ["DATAHUB_REST_EMITTER_BATCH_MAX_PAYLOAD_LENGTH"] = "100" # Example value
Configure DataHub Rest Sink:
The DatahubRestSinkConfig
class allows you to configure the maximum number of MCPs per batch using the max_per_batch
parameter. This is particularly useful in async batch mode.
from datahub.ingestion.sink.datahub_rest import DatahubRestSinkConfig
config = DatahubRestSinkConfig(
max_per_batch=50 # Example value
)
Modify Ingestion Pipeline Configuration:
If you are using a configuration file for your ingestion pipeline, you can specify the batch size directly in the configuration. Look for parameters like batch_size
or max_per_batch
in your configuration file and adjust them accordingly.
Check Documentation and Examples:
Refer to the <https://datahubproject.io/docs|DataHub documentation> for specific examples and additional configuration options that might be available for your version of DataHub.
If you need more detailed guidance on implementing these changes, please let me know!
Sources:
<@U06TM7M40RK>
Getting below error
2025-01-02 07:23:14,520 [qtp1685232414-7475] INFO i.d.metadata.context.RequestContext:52 - RequestContext{actorUrn='urn:li:corpuser:svc_am_cicd@visa.com', sourceIP='10.160.48.148', requestAPI=RESTLI, requestID='ingestProposal([dataset])', userAgent='python-requests/2.32.3'}
2025-01-02 07:23:14,525 [qtp1685232414-7407] ERROR c.l.m.filter.RestliLoggingFilter:38 - <http://Rest.li|Rest.li> error:
com.linkedin.restli.server.RestLiServiceException: java.lang.OutOfMemoryError: Java heap space
at com.linkedin.metadata.resources.restli.RestliUtils.toTask(RestliUtils.java:46)
at com.linkedin.metadata.resources.restli.RestliUtils.toTask(RestliUtils.java:57)
at com.linkedin.metadata.resources.entity.AspectResource.ingestProposals(AspectResource.java:301)
at com.linkedin.metadata.resources.entity.AspectResource.ingestProposalBatch(AspectResource.java:268)
at jdk.internal.reflect.GeneratedMethodAccessor417.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at com.linkedin.restli.internal.server.RestLiMethodInvoker.doInvoke(RestLiMethodInvoker.java:177)
at com.linkedin.restli.internal.server.RestLiMethodInvoker.invoke(RestLiMethodInvoker.java:333)
at com.linkedin.restli.internal.server.filter.FilterChainDispatcherImpl.onRequestSuccess(FilterChainDispatcherImpl.java:47)
at com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.onRequest(RestLiFilterChainIterator.java:86)
at com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.lambda$onRequest$0(RestLiFilterChainIterator.java:73)
at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757)
at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735)
at java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182)
at com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.onRequest(RestLiFilterChainIterator.java:72)
at com.linkedin.restli.internal.server.filter.RestLiFilterChain.onRequest(RestLiFilterChain.java:55)
at com.linkedin.restli.server.BaseRestLiServer.handleResourceRequest(BaseRestLiServer.java:270)
at com.linkedin.restli.server.RestRestLiServer.handleResourceRequestWithRestLiResponse(RestRestLiServer.java:294)
at com.linkedin.restli.server.RestRestLiServer.handleResourceRequest(RestRestLiServer.java:262)
at com.linkedin.restli.server.RestRestLiServer.handleResourceRequest(RestRestLiServer.java:232)
at com.linkedin.restli.server.RestRestLiServer.doHandleRequest(RestRestLiServer.java:215)
at com.linkedin.restli.server.RestRestLiServer.handleRequest(RestRestLiServer.java:171)
at com.linkedin.restli.server.RestLiServer.handleRequest(RestLiServer.java:132)
at com.linkedin.restli.server.DelegatingTransportDispatcher.handleRestRequest(DelegatingTransportDispatcher.java:70)
at com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest(DispatcherRequestFilter.java:70)
at com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:76)
at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)
at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)
at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)
at com.linkedin.r2.filter.TimedNextFilter.onRequest(TimedNextFilter.java:55)
at com.linkedin.r2.filter.transport.ServerQueryTunnelFilter.onRestRequest(ServerQueryTunnelFilter.java:58)
at com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:76)
at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)
at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)
at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)
at com.linkedin.r2.filter.TimedNextFilter.onRequest(TimedNextFilter.java:55)
at com.linkedin.r2.filter.message.rest.RestFilter.onRestRequest(RestFilter.java:50)
at com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:76)
at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)
at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)
at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)
at com.linkedin.r2.filter.FilterChainImpl.onRestRequest(FilterChainImpl.java:106)
at com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest(FilterChainDispatcher.java:75)
at com.linkedin.r2.util.finalizer.RequestFinalizerDispatcher.handleRestRequest(RequestFinalizerDispatcher.java:61)
at com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:101)
at com.linkedin.r2.transport.http.server.AbstractJakartaR2Servlet.service(AbstractJakartaR2Servlet.java:96)
at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:587)
at com.linkedin.restli.server.RestliHandlerServlet.service(RestliHandlerServlet.java:30)
at com.linkedin.restli.server.RestliHandlerServlet.handleRequest(RestliHandlerServlet.java:36)
at org.springframework.web.context.support.HttpRequestHandlerServlet.service(HttpRequestHandlerServlet.java:67)
at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:587)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:764)
at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1665)
at com.datahub.auth.authentication.filter.AuthenticationFilter.doFilter(AuthenticationFilter.java:107)
at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:202)
at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1635)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:527)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:131)
at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:598)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:122)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:223)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1580)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:221)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1381)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:176)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:484)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1553)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:174)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1303)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:129)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:149)
at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:51)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:122)
at org.eclipse.jetty.server.Server.handle(Server.java:563)
at org.eclipse.jetty.server.HttpChannel$RequestDispatchable.dispatch(HttpChannel.java:1598)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:753)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:501)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:287)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:314)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:100)
at org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53)
at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:421)
at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:390)
at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:277)
at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.run(AdaptiveExecutionStrategy.java:199)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:411)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:969)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1194)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1149)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.OutOfMemoryError: Java heap space```
The error message you’re encountering indicates a java.lang.OutOfMemoryError: Java heap space
, which means that the Java Virtual Machine (JVM) running the DataHub Global Metadata Service (GMS) has run out of memory. This can happen when the memory allocated to the JVM is insufficient to handle the workload, especially during large batch processing or when handling large datasets.
Here are some steps you can take to address this issue:
Increase JVM Heap Size:
-Xmx
(maximum heap size) and -Xms
(initial heap size) parameters. This can be done in the environment where the GMS is running, such as in the Kubernetes deployment configuration or the startup script.-Xms2g -Xmx4g
Optimize Batch Size:
Monitor and Profile Memory Usage:
Review and Optimize Ingestion Pipeline:
Check for Memory Leaks:
Upgrade Resources:
If you need more detailed guidance on implementing these changes, please let me know!
Sources:
<@U06TM7M40RK>
How to reduce kafka ingestion batch size
To reduce the Kafka ingestion batch size in DataHub, you can adjust the configuration settings related to batch processing. Here’s how you can do it:
Modify Kafka Ingestion Recipe:
In your Kafka ingestion recipe, you can specify the batch size by configuring the max_per_batch
parameter. This parameter controls the maximum number of records processed in a single batch.
Here is an example configuration:
source:
type: kafka
config:
connection:
bootstrap: "broker:9092"
schema_registry_url: "http://localhost:8081"
topic_patterns:
allow:
- "your_topic_name" # Replace with your actual topic name
max_per_batch: 50 # Example value to reduce batch size
sink:
type: datahub-rest
config:
server: "http://localhost:8080"
Explanation:
max_per_batch
: This parameter sets the maximum number of records to be processed in a single batch. Reducing this value will decrease the batch size, which can help manage memory usage and improve performance.Adjust Environment Variables:
You can also set environment variables to control the batch size for Kafka ingestion. For example, you can set the SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE
to limit the size of each request.
export SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE=1048576 # Example value in bytes
Review and Optimize Configuration:
Ensure that your Kafka and DataHub configurations are optimized for your workload. This includes reviewing other relevant parameters such as max.poll.records
and fetch.max.bytes
in your Kafka consumer configuration.
If you need more detailed guidance on implementing these changes, please let me know!
Sources:
Hi ! Did this interaction resolve your issue or do you still require assistance ?
Please tag @ RunLLM if you still need further assistance !
Or tag @Datahub Community Support if you require a person to take a look
Hope this helps ! ><