Troubleshooting SerializationException in DataHub GMS Kafka Consumer

Original Slack Thread

<@U06TM7M40RK>

  1. Are you using UI or CLI for ingestion? UI
  2. Which DataHub version are you using? 0.8.45
  3. What data source(s) are you integrating with DataHub? Redshift
    error:
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
        at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194)
        at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MetadataChangeLog_Versioned_v1-0 at offset 11102. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2
Caused by: java.lang.ArrayIndexOutOfBoundsException: null```

The error message you’re encountering indicates a SerializationException in the Kafka consumer within the DataHub GMS pod. This is likely due to an issue with deserializing an Avro message. The error message suggests configuring an ErrorHandlingDeserializer for better error handling.

Steps to Resolve

  1. Configure ErrorHandlingDeserializer:
    According to the https://github.com/datahub-project/datahub/blob/master/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java|KafkaEventConsumerFactory, you can configure the ErrorHandlingDeserializer for both key and value deserializers. This will help handle deserialization errors gracefully.

  2. Check Schema Registry:
    Ensure that your schema registry is correctly configured and accessible. If the schema registry is not reachable, it can cause deserialization errors.

  3. Seek Past Faulty Record:
    You might need to seek past the faulty record causing the deserialization issue. This can be done manually or programmatically.

Here is an example of how you can configure the ErrorHandlingDeserializer in your Kafka consumer properties:

Map<String, Object> customizedProperties = baseKafkaProperties.buildConsumerProperties(null);
customizedProperties.put(
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
customizedProperties.put(
    ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
customizedProperties.put(
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
customizedProperties.put(
    ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, schemaRegistryConfig.getDeserializer());

// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
schemaRegistryConfig.getProperties().entrySet().stream()
    .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty())
    .forEach(entry -> customizedProperties.put(entry.getKey(), entry.getValue()));

customizedProperties.put(
    ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
    kafkaConfiguration.getConsumer().getMaxPartitionFetchBytes());

return customizedProperties;

Further Steps

  • Verify Schema Compatibility: Ensure that the Avro schema used for serialization and deserialization is compatible.
  • Check Kafka Broker Logs: Sometimes, the issue might be related to the Kafka broker itself. Check the broker logs for any related errors.

Would you like more detailed steps on how to implement these changes or further troubleshooting tips?

Sources:

<@U06TM7M40RK>
UI based ingestion
Which DataHub version are you using? 0.8.45
3. What data source(s) are you integrating with DataHub? Redshift

error:
Failed to create token. Unknown error occurred.

The error “Failed to create token. Unknown error occurred.” suggests that there might be an issue with the token generation process in DataHub. Here are some steps to troubleshoot and resolve this issue:

Steps to Troubleshoot

  1. Check Console and Network Logs:

    • Open the browser’s developer tools (usually by pressing F12 or right-clicking and selecting “Inspect”).
    • Go to the “Console” and “Network” tabs to check for any error messages or failed network requests when you attempt to create a token.
  2. Verify Metadata Service Authentication Configuration:

  3. Check Permissions:

  4. Inspect DataHub Logs:

    • Check the logs of the DataHub services, especially the GMS (Graph Metadata Service) and frontend services, for any errors or warnings related to token creation.
    • You can use docker logs <container-id> or kubectl logs <pod-name> to inspect the logs.

Example Configuration for Token Generation

Ensure that your application.yml or equivalent configuration file has the following settings:

metadataService:
  auth:
    enabled: true

Example of Using Personal Access Tokens

Once the token is generated, you can use it in your ingestion recipes as follows:

source:
  type: redshift
  config:
    host_port: 'your-redshift-cluster:5439'
    database: your_database
    username: your_username
    password: your_password
sink:
  type: datahub-rest
  config:
    server: 'http://datahub-gms:8080'
    token: <your-personal-access-token>

Additional Resources

Would you like more detailed steps on any of these points or further assistance with specific configurations?

Sources: