Dealing with Errors and Batch Size in Kafka Data Ingestion

Original Slack Thread

<@U06TM7M40RK>

Getting below error while running kafka ingestion

/[2025-01-02 06:23:05,794] INFO     {datahub.cli.ingest_cli:143} - Finished metadata ingestion```

The error message you’re encountering indicates a timeout issue when trying to emit metadata to the DataHub Global Metadata Service (GMS). Specifically, the error is a ReadTimeoutError, which suggests that the request to the GMS is taking longer than the allowed 30 seconds to complete.

Here are some potential causes and steps to troubleshoot this issue:

  1. Network Connectivity: Ensure that there is stable network connectivity between the Kafka ingestion process and the DataHub GMS. Network issues can cause delays and timeouts.

  2. GMS Performance: Check the performance and resource allocation of the GMS. If the GMS is under heavy load or not allocated sufficient resources, it may not be able to process requests in a timely manner. Consider scaling up the resources or instances of the GMS.

  3. Request Size: Large requests can take longer to process. If the ingestion involves large datasets or numerous records, consider breaking them into smaller batches to reduce processing time.

  4. Timeout Configuration: If the above steps do not resolve the issue, you may need to increase the timeout setting for the HTTP connection. This can often be configured in the ingestion pipeline settings or the client making the request.

  5. Logs and Monitoring: Check the logs of the GMS for any errors or warnings that might indicate what is causing the delay. Monitoring tools can also help identify bottlenecks in the system.

For more detailed guidance, you can refer to the <Ingestion | DataHub UI Ingestion Debugging Guide>, which provides insights into common issues and troubleshooting steps for ingestion failures.

Would you like more detailed steps on any of these points, or do you have additional information about your current setup that might help narrow down the issue?

Sources:

  • <Ingestion | DataHub UI Ingestion Debugging Guide> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

<@U06TM7M40RK>
How to reduce batch size or request size?

To reduce the batch size or request size during ingestion in DataHub, you can adjust the configuration settings related to batch processing. Here are some steps you can take:

  1. Adjust Batch Size in DataHub Rest Emitter:
    The DataHubRestEmitter class in DataHub has a configuration for the maximum payload size and batch length. You can adjust these settings to reduce the batch size.

    • Max Payload Size: The INGEST_MAX_PAYLOAD_BYTES is set to 15 MB by default. You can reduce this value to limit the size of each request payload.
    • Batch Payload Length: The BATCH_INGEST_MAX_PAYLOAD_LENGTH is set to 200 by default. This limits the number of Metadata Change Proposals (MCPs) in a batch.

    You can set these environment variables to adjust the batch size:

    os.environ["DATAHUB_REST_EMITTER_BATCH_MAX_PAYLOAD_LENGTH"] = "100"  # Example value
    
  2. Configure DataHub Rest Sink:
    The DatahubRestSinkConfig class allows you to configure the maximum number of MCPs per batch using the max_per_batch parameter. This is particularly useful in async batch mode.

    from datahub.ingestion.sink.datahub_rest import DatahubRestSinkConfig
    
    config = DatahubRestSinkConfig(
        max_per_batch=50  # Example value
    )
    
  3. Modify Ingestion Pipeline Configuration:
    If you are using a configuration file for your ingestion pipeline, you can specify the batch size directly in the configuration. Look for parameters like batch_size or max_per_batch in your configuration file and adjust them accordingly.

  4. Check Documentation and Examples:
    Refer to the <https://datahubproject.io/docs|DataHub documentation> for specific examples and additional configuration options that might be available for your version of DataHub.

If you need more detailed guidance on implementing these changes, please let me know!

Sources:

<@U06TM7M40RK>
How to ingest just 1 schema in Kafka recipe?

To ingest metadata for just one specific schema in a Kafka ingestion recipe for DataHub, you can use the topic_patterns configuration to filter the topics associated with that schema. Here’s how you can set it up:

  1. Identify the Topic: Determine the Kafka topic that is associated with the schema you want to ingest.

  2. Configure the Ingestion Recipe: Use the topic_patterns configuration to specify the exact topic you want to include. This will ensure that only the metadata for that specific topic (and its associated schema) is ingested.

Here is an example configuration:

source:
  type: kafka
  config:
    connection:
      bootstrap: "broker:9092"
      schema_registry_url: "http://localhost:8081"
    topic_patterns:
      allow:
        - "^your_specific_topic$"  # Replace 'your_specific_topic' with the actual topic name
    stateful_ingestion:
      enabled: true
      remove_stale_metadata: true

sink:
  type: datahub-rest
  config:
    server: "http://localhost:8080"

Explanation:

  • topic_patterns.allow: This field uses a regex pattern to specify which topics to include. By setting it to the exact name of your topic, you ensure that only this topic’s metadata is ingested.
  • schema_registry_url: Ensure that this points to your schema registry where the schema for the topic is stored.
  1. Run the Ingestion: Execute the ingestion command with your configured recipe file.
datahub ingest -c path/to/your_kafka_recipe.yml

This setup will ingest metadata only for the specified topic and its associated schema.

Would you like more details on any specific part of this process?

Sources:

<@U06TM7M40RK>
How to reduce request size?

To reduce the request size during ingestion in DataHub, you can adjust several configuration settings related to batch processing and payload limits. Here are some steps you can take:

  1. Adjust Batch Size in DataHub Rest Emitter:
    The DataHubRestEmitter class in DataHub has configurations for the maximum payload size and batch length. You can adjust these settings to reduce the request size.

    • Max Payload Size: The INGEST_MAX_PAYLOAD_BYTES is set to 15 MB by default. You can reduce this value to limit the size of each request payload.
    • Batch Payload Length: The BATCH_INGEST_MAX_PAYLOAD_LENGTH is set to 200 by default. This limits the number of Metadata Change Proposals (MCPs) in a batch.

    You can set these environment variables to adjust the batch size:

    os.environ["DATAHUB_REST_EMITTER_BATCH_MAX_PAYLOAD_LENGTH"] = "100"  # Example value
    
  2. Configure DataHub Rest Sink:
    The DatahubRestSinkConfig class allows you to configure the maximum number of MCPs per batch using the max_per_batch parameter. This is particularly useful in async batch mode.

    from datahub.ingestion.sink.datahub_rest import DatahubRestSinkConfig
    
    config = DatahubRestSinkConfig(
        max_per_batch=50  # Example value
    )
    
  3. Modify Ingestion Pipeline Configuration:
    If you are using a configuration file for your ingestion pipeline, you can specify the batch size directly in the configuration. Look for parameters like batch_size or max_per_batch in your configuration file and adjust them accordingly.

  4. Check Documentation and Examples:
    Refer to the <https://datahubproject.io/docs|DataHub documentation> for specific examples and additional configuration options that might be available for your version of DataHub.

If you need more detailed guidance on implementing these changes, please let me know!

Sources:

<@U06TM7M40RK>

Getting below error

2025-01-02 07:23:14,520 [qtp1685232414-7475] INFO  i.d.metadata.context.RequestContext:52 - RequestContext{actorUrn='urn:li:corpuser:svc_am_cicd@visa.com', sourceIP='10.160.48.148', requestAPI=RESTLI, requestID='ingestProposal([dataset])', userAgent='python-requests/2.32.3'}
2025-01-02 07:23:14,525 [qtp1685232414-7407] ERROR c.l.m.filter.RestliLoggingFilter:38 - <http://Rest.li|Rest.li> error:
com.linkedin.restli.server.RestLiServiceException: java.lang.OutOfMemoryError: Java heap space
	at com.linkedin.metadata.resources.restli.RestliUtils.toTask(RestliUtils.java:46)
	at com.linkedin.metadata.resources.restli.RestliUtils.toTask(RestliUtils.java:57)
	at com.linkedin.metadata.resources.entity.AspectResource.ingestProposals(AspectResource.java:301)
	at com.linkedin.metadata.resources.entity.AspectResource.ingestProposalBatch(AspectResource.java:268)
	at jdk.internal.reflect.GeneratedMethodAccessor417.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at com.linkedin.restli.internal.server.RestLiMethodInvoker.doInvoke(RestLiMethodInvoker.java:177)
	at com.linkedin.restli.internal.server.RestLiMethodInvoker.invoke(RestLiMethodInvoker.java:333)
	at com.linkedin.restli.internal.server.filter.FilterChainDispatcherImpl.onRequestSuccess(FilterChainDispatcherImpl.java:47)
	at com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.onRequest(RestLiFilterChainIterator.java:86)
	at com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.lambda$onRequest$0(RestLiFilterChainIterator.java:73)
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757)
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735)
	at java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182)
	at com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.onRequest(RestLiFilterChainIterator.java:72)
	at com.linkedin.restli.internal.server.filter.RestLiFilterChain.onRequest(RestLiFilterChain.java:55)
	at com.linkedin.restli.server.BaseRestLiServer.handleResourceRequest(BaseRestLiServer.java:270)
	at com.linkedin.restli.server.RestRestLiServer.handleResourceRequestWithRestLiResponse(RestRestLiServer.java:294)
	at com.linkedin.restli.server.RestRestLiServer.handleResourceRequest(RestRestLiServer.java:262)
	at com.linkedin.restli.server.RestRestLiServer.handleResourceRequest(RestRestLiServer.java:232)
	at com.linkedin.restli.server.RestRestLiServer.doHandleRequest(RestRestLiServer.java:215)
	at com.linkedin.restli.server.RestRestLiServer.handleRequest(RestRestLiServer.java:171)
	at com.linkedin.restli.server.RestLiServer.handleRequest(RestLiServer.java:132)
	at com.linkedin.restli.server.DelegatingTransportDispatcher.handleRestRequest(DelegatingTransportDispatcher.java:70)
	at com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest(DispatcherRequestFilter.java:70)
	at com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:76)
	at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)
	at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)
	at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)
	at com.linkedin.r2.filter.TimedNextFilter.onRequest(TimedNextFilter.java:55)
	at com.linkedin.r2.filter.transport.ServerQueryTunnelFilter.onRestRequest(ServerQueryTunnelFilter.java:58)
	at com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:76)
	at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)
	at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)
	at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)
	at com.linkedin.r2.filter.TimedNextFilter.onRequest(TimedNextFilter.java:55)
	at com.linkedin.r2.filter.message.rest.RestFilter.onRestRequest(RestFilter.java:50)
	at com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:76)
	at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)
	at com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)
	at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)
	at com.linkedin.r2.filter.FilterChainImpl.onRestRequest(FilterChainImpl.java:106)
	at com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest(FilterChainDispatcher.java:75)
	at com.linkedin.r2.util.finalizer.RequestFinalizerDispatcher.handleRestRequest(RequestFinalizerDispatcher.java:61)
	at com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:101)
	at com.linkedin.r2.transport.http.server.AbstractJakartaR2Servlet.service(AbstractJakartaR2Servlet.java:96)
	at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:587)
	at com.linkedin.restli.server.RestliHandlerServlet.service(RestliHandlerServlet.java:30)
	at com.linkedin.restli.server.RestliHandlerServlet.handleRequest(RestliHandlerServlet.java:36)
	at org.springframework.web.context.support.HttpRequestHandlerServlet.service(HttpRequestHandlerServlet.java:67)
	at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:587)
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:764)
	at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1665)
	at com.datahub.auth.authentication.filter.AuthenticationFilter.doFilter(AuthenticationFilter.java:107)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:202)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1635)
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:527)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:131)
	at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:598)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:122)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:223)
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1580)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:221)
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1381)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:176)
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:484)
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1553)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:174)
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1303)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:129)
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:149)
	at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:51)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:122)
	at org.eclipse.jetty.server.Server.handle(Server.java:563)
	at org.eclipse.jetty.server.HttpChannel$RequestDispatchable.dispatch(HttpChannel.java:1598)
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:753)
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:501)
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:287)
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:314)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:100)
	at org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53)
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:421)
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:390)
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:277)
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.run(AdaptiveExecutionStrategy.java:199)
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:411)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:969)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1194)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1149)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.OutOfMemoryError: Java heap space```

The error message you’re encountering indicates a java.lang.OutOfMemoryError: Java heap space, which means that the Java Virtual Machine (JVM) running the DataHub Global Metadata Service (GMS) has run out of memory. This can happen when the memory allocated to the JVM is insufficient to handle the workload, especially during large batch processing or when handling large datasets.

Here are some steps you can take to address this issue:

  1. Increase JVM Heap Size:

    • You can increase the heap size allocated to the JVM by setting the -Xmx (maximum heap size) and -Xms (initial heap size) parameters. This can be done in the environment where the GMS is running, such as in the Kubernetes deployment configuration or the startup script.
    • For example, you can set the heap size to 4GB by adding the following JVM options:
      -Xms2g -Xmx4g
      
  2. Optimize Batch Size:

    • As previously discussed, reducing the batch size can help manage memory usage more effectively. Adjust the batch size in your ingestion configuration to ensure that the GMS is not overwhelmed by large requests.
  3. Monitor and Profile Memory Usage:

    • Use monitoring tools to track memory usage and identify any memory leaks or inefficient memory usage patterns. Tools like JVisualVM or JProfiler can help you analyze heap dumps and optimize memory usage.
  4. Review and Optimize Ingestion Pipeline:

    • Ensure that your ingestion pipeline is optimized for performance. This includes reviewing the configuration settings, such as batch size and payload limits, to ensure they are appropriate for your workload.
  5. Check for Memory Leaks:

    • Investigate the code and configuration for potential memory leaks. This might involve reviewing custom code or third-party libraries used in the ingestion process.
  6. Upgrade Resources:

    • If the above steps do not resolve the issue, consider upgrading the resources allocated to the GMS, such as CPU and memory, to handle larger workloads.

If you need more detailed guidance on implementing these changes, please let me know!

Sources:

<@U06TM7M40RK>
How to reduce kafka ingestion batch size

To reduce the Kafka ingestion batch size in DataHub, you can adjust the configuration settings related to batch processing. Here’s how you can do it:

  1. Modify Kafka Ingestion Recipe:
    In your Kafka ingestion recipe, you can specify the batch size by configuring the max_per_batch parameter. This parameter controls the maximum number of records processed in a single batch.

    Here is an example configuration:

    source:
      type: kafka
      config:
        connection:
          bootstrap: "broker:9092"
          schema_registry_url: "http://localhost:8081"
        topic_patterns:
          allow:
            - "your_topic_name"  # Replace with your actual topic name
        max_per_batch: 50  # Example value to reduce batch size
    
    sink:
      type: datahub-rest
      config:
        server: "http://localhost:8080"
    

    Explanation:

    • max_per_batch: This parameter sets the maximum number of records to be processed in a single batch. Reducing this value will decrease the batch size, which can help manage memory usage and improve performance.
  2. Adjust Environment Variables:
    You can also set environment variables to control the batch size for Kafka ingestion. For example, you can set the SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE to limit the size of each request.

    export SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE=1048576  # Example value in bytes
    
  3. Review and Optimize Configuration:
    Ensure that your Kafka and DataHub configurations are optimized for your workload. This includes reviewing other relevant parameters such as max.poll.records and fetch.max.bytes in your Kafka consumer configuration.

If you need more detailed guidance on implementing these changes, please let me know!

Sources:

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><