DevOps | Cloud | Analytics | Open Source | Programming





How To Fix Kafka Error - "org.apache.kafka.common.errors.SerializationException"



In this post , we will see How to Fix Kafka Error - "org.apache.kafka.common.errors.SerializationException" which is caused by various reasons.


org.apache.kafka.common.errors.SerializationException:
Caused by: org.apache.kafka.common.errors.SerializationException:

This Exception is caused by some of the Possible below conditions and formats -

  • Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
  • Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data
  • Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message
  • Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
  • Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
  • Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing json to Avro
  To understand the issue , when we produce any message and publish to Kafka or when we consume the message from Kafka either ways , the system should be able to parse the message with some schema structure (since messages are serialized and de-serialized before and after reaching Kafka ) . Avro ,JSON, Protobuf, String delimited (e.g., CSV) are popularly used schema. So the schema structure should be comprehended by both the Producer as well as the Consumer and should be in tandem.  

Generic Solution :

  • First thing first , Producer & Consumer has to be in conformation with each other in terms of serialization formats used. e.g. If Producer sends Json message, Consumer should be ready to consume Json formatted messages. Missing this simple aspect makes system to face issues.
 

  • The issue is very much related to the deserializer used (custom, default whatever) and their inherent capabilities and limits. So if your key or value in the message is a String (or any custom structure) and you are using , say IntegerDeserializer to parse it , it will throw error . If the derializer is , say IntegerDeserializer it will expect the data length to be of 4-Bytes . So it can not parse a String .
 

  • Check if you are consuming any message with Null value (either as Key or as Value) and trying to deserialize that . The system might not be able too handle null during deserialization. Verify the business logic if such data\message can be skipped or re-directed (to log etc.)
 

  • Check and verify what deserializer is set for the consumer. These are set by the below flags . Check if you are using the correct Deserializer. e.g.

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Possible deserialization options are -

    • ByteArrayDeserializer,
    • ByteBufferDeserializer,
    • BytesDeserializer,
    • DoubleDeserializer,
    • ExtendedDeserializer.Wrapper,
    • FloatDeserializer,
    • IntegerDeserializer,
    • LongDeserializer,
    • SessionWindowedDeserializer,
    • ShortDeserializer,
    • StringDeserializer,
    • TimeWindowedDeserializer,
    • UUIDDeserializer
 

  •  Depending on use case , you might also be required to create own custom deserializer if the out-of-box options does not meet the need.
 

  • If you are using Kafka Streams, when you convert KTable to KStream , you can filter out nulls .

ktable.toStream.filter((key, value) -> value != null)

  • Choose Serialization based on -
    • Schema - Based on the schema of your data. The schema is the Eye-Glass to read your data - so you need to ensure it is intact.
    • Message size - JSON uses is dependent on the the compression offered by Kafka itself. However both Avro and Protobuf are binary and hence smaller in sizes.
    • Language - If consumer is coded in Java , Avro is a popular choice. But Non-Java languages might have to depend on formats like Protobuf.
 

  • Check the settings in the config and verify what types of values are being expected by below flags . Based on the ingested data , you might have to change that.

value.converter 
key.converter

Apart from the Generic Solution , you can check the custom pointers below -  

SerializationException: Size of data received by IntegerDeserializer is not 4

  • Possibly you are using IntegerDeserializer which expects the data length to be of 4-Bytes. But you are trying to parse messages bigger than that i.e. length > 4-Bytes - It might be some string. So set the appropriate deserializer.
   

SerializationException: Can't deserialize data

  • May be you are not using the right deserializer taking into account the length or structure  of the data or message
  • May be you have some specific Business-use-case specific data structure . And the the out-of-box deserializers might not be able to parse that. In such scenario , once you identify the structure of the message , you would have to code custom deserializers. And use the respective class and jar while running the Kafka consumer.
   

SerializationException: Error deserializing Avro message

  • This occurs if the consumer is not able to load the class definition of the type generated from Avro schema.
  • Check if the deserializer used is sufficient for AVRO messages.
  • May be you have some specific Business-use-case specific data structure . And the the out-of-box deserializers might not be able to parse that. In such scenario , once you identify the structure of the message , you would have to code custom deserializers (extending KafkaAvroDeserializer). And use the respective class and jar while running the Kafka consumer.
   

SerializationException: Size of data received by LongDeserializer is not 8

  • Possibly you are using LongDeserializer which expects the data length to be of 8-Bytes. But you are trying to parse messages bigger than that i.e. length > 8-Bytes - It might be some string. So set the appropriate deserializer.
   

SerializationException: Unknown magic byte!

  • It often denotes that the Topic data\message is not valid Avro structure and hence could not be deserialised.
  • If possible to drop or skip such invalid data or messages from the Topic, you can use the below config settings in case of Kafka Connect

"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true

SerializationException: Error Deserializing Json to Avro

  • Check if you are consuming any message with Null value (either as Key or as Value) and trying to deserialize that . The configured deserializer might not be able to perform the conversion from json to avro if the data is Null.
  • If possible to drop or skip such invalid data or messages from the Topic or use Custom deserilizer to handle the Null.
  • If you are using Kafka Connect , use below configs to skip invalid Null data -

"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true

Hope this helps.

Other Interesting Reads - 


org.apache.kafka.common.errors.SerializationException ,org apache kafka common errors serializationexception unknown magic byte n type sink ,org.apache.kafka.common.errors.serializationexception: error deserializing avro message for id -1 ,kafka-avro-console-consumer unknown magic byte ,kafka magic byte ,kafkaavrodeserializer unknown magic byte ,org apache kafka connect errors dataexception failed to deserialize data for topic ,"error deserializing avro message for id -1" ,message does not start with magic byte ,org.apache.kafka.common.errors.serializationexception unknown magic byte ,org.apache.kafka.common.errors.serializationexception ,org.apache.kafka.common.errors.serializationexception can't serialize data ,org.apache.kafka.common.errors.serializationexception can't deserialize data ,org.apache.kafka.common.errors.serializationexception error deserializing key/value ,org.apache.kafka.common.errors.serializationexception error deserializing json ,kafka.common.errors.serializationexception error deserializing key/value for partition ,kafka handle serializationexception ,kafka.common.errors.serializationexception unknown magic byte ,kafka consumer serializationexception ,kafka consumer handle serializationexception ,kafka.common.errors.serializationexception can't convert value of class ,kafka streams serializationexception ,kafka serializationexception skip ,org.apache.kafka.common.errors.serializationexception can't deserialize data ,org apache kafka common errors serializationexception error deserializing key/value ,org.apache.kafka.common.errors.serializationexception can't serialize data ,kafka deserialization exception ,org.apache.kafka.common.errors.serializationexception: can't deserialize data ,kafka consumer serializationexception ,size of data received by integerdeserializer is not 4 ,org.apache.kafka.common.errors.serializationexception can't convert value of class ,org apache kafka common serialization jsonserializer could not be found ,kafka bytearrayserializer ,kafka byte serializer example ,class org apache kafka common serialization stringserializer could not be found ,org apache kafka common serialization deserializer not found ,kafka key.serializer example ,cannot access org apache kafka common serialization extendeddeserializer ,kafka object serializer ,org apache kafka common serialization jsonserializer could not be found ,kafka bytearrayserializer example ,kafka byte serializer example ,class org apache kafka common serialization stringserializer could not be found ,org apache kafka common serialization deserializer not found ,kafka key.serializer example ,cannot access org apache kafka common serialization extendeddeserializer ,kafka object serializer ,org.apache.kafka.common.errors.serializationexception can't deserialize data from topic ,org.apache.kafka.common.errors.serializationexception: error deserializing avro message ,kafka-avro-console-consumer error deserializing avro message for id ,kafka-avro-console-producer ,org.apache.kafka.common.errors.serializationexception: unknown magic byte! ,kafkaavrodeserializer ,org.apache.kafka.common.errors.serializationexception: error deserializing key/value for partition ,class io.confluent.kafka.serializers.kafkaavrodeserializer could not be found ,kafka-avro-console-producer json ,Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4 ,Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data ,Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message ,Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8 ,Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! ,Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing json to Avro