Cyber Security | DevOps | Cloud | Analytics | Open Source | Programming





How To Fix Kafka Error - "org.apache.kafka.connect.errors.DataException"



In this post , we will see How to Fix Kafka Error - "org.apache.kafka.connect.errors.dataexception".


Caused by: org.apache.kafka.connect.errors.DataException

org.apache.kafka.connect.errors.DataException: Invalid JSON for record default value: null
org.apache.kafka.connect.errors.DataException: Struct schemas do not match. at
org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic Test to Avro:
org.apache.kafka.connect.errors.DataException: Schema must contain 'type' field
org.apache.kafka.connect.errors.DataException: Invalid JSON for array default value: "null"
org.apache.kafka.connect.errors.DataException: Schema required for \[updating schema metadata\]
org.apache.kafka.connect.errors.DataException: Record with a null key was encountered.

This issue is very often related to the differences between data handling expectations and data handling set parameters (extending to data serialization). Firstly read our earlier post on SerializationException which might help - Fix Kafka Error – “org.apache.kafka.common.errors.SerializationException” If the issue was not fixed by above post, please proceed ahead to read this post. Let us explore the steps that we can adopt to check and fix this issue. Read the below pointers and try to use those -  

  • Firstly - Are you using the Right converter ? For example - check if you are using an Avro converter to read data from a Topic - But the data in the topic is Not Avro format. Or Using JSON converter to Read AVRO data.
You have to use the Right and Suitable converter based on the data that you are reading. Some available value.converter Classes to be chosen for key value -


key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter

key.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter=io.confluent.connect.protobuf.ProtobufConverter

key.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter=io.confluent.connect.json.JsonSchemaConverter

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

  • If you are using Schema registry, see the field value used for "datatype" . If you are using Avro data formats , check the available options of data types . You have to use the RIGHT one based on your use case as "datatype" value. Using "Union" as datatype appears to fix the issue sometimes as it it flexible.
    • int,
    • string,
    • long,
    • bytes,
    • enum,
    • arrays,
    • unions (allow a field to take different types)
    • optionals
    • dates,
    • timestamp-millis,
    • decimal
    • data record (name and namespace).
 

  • KafkaAvroSerializer is a popularly used Serializer in case of Confluent Schema Registry . Sorts out lots of issue if you are dealing with Avro data.
 

  •  Check if "Null" values are Allowed or Handled both in case of any Message Key or Value or Schema . It is a good practice to handle nulls.
 

  • Check - What is the Type expected for the topic data - Avro or something else.
 

  • If you are not using Schema Registry , then each producer has to define the data schema (if it is not a standard schema) , when the producer sends to topic. Or at least the Consumer must be able to comprehend and parse the schema.
 

  • For Consumer side , if you are using Kafka Connect then check the converter used for the sink (List given in earlier part of this post). Check what value is set for the below fields. This applies to the Consumer side.For example, if you’re consuming JSON data from a Kafka topic into a Kafka Connect sink:

value.converter = <USE\_APPROPRIATE\_CONVERTER>
value.converter.schemas.enable =true/false

For Producer side , if you are using Kafka Connect then check the converter (List given in earlier part of this post) used for the data source. Check what value is set for the below fields. This applies to the Consumer side.


value.converter 
value.converter.schema.registry.url
connector.class

Some Available Connector Class in Kafka Connect -


"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"io.confluent.connect.hdfs.HdfsSinkConnector",
"io.confluent.connect.hdfs.tools.SchemaSourceConnector",
"io.confluent.connect.jdbc.JdbcSinkConnector", 
"io.confluent.connect.jdbc.JdbcSourceConnector",
"io.confluent.connect.s3.S3SinkConnector",
"io.confluent.connect.storage.tools.SchemaSourceConnector",
"org.apache.kafka.connect.file.FileStreamSinkConnector",
"org.apache.kafka.connect.file.FileStreamSourceConnector",

  • Check if you have set the below field as shown . If you have set it as false, then the connector will be trying to create the mapping based on the schema of message\data (Message schema is created by the Converter and modified by Transforms) . And if no schema is defined , it will throw exception.
However if you set to True , the connector won't try to infer schema.


schema.ignore=true\\false

  • Check if you have set the below fields as True (Then you have to provide a schema. Also if the value is True then the data must be in correct structure. Else it throws dataexception)
However you have to set it as false if either of key or value or both don't have any schema.


value.converter.schemas.enable=true\false
key.converter.schemas.enable=true\false

Try all these steps and check if it helps to solve the error - org.apache.kafka.connect.errors.DataException

Other Interesting Reads -

 


org.apache.kafka.connect.errors.dataexception ,kafka connect dataexception ,org.apache.kafka.connect.errors.dataexception java.nio.channels.closedchannelexception ,caused by org.apache.kafka.connect.errors.dataexception ,org.apache.kafka.connect.errors.dataexception jsonconverter with schemas.enable requires ,org.apache.kafka.connect.errors.dataexception java.sql.sqlexception numeric overflow ,org.apache.kafka.connect.errors.dataexception: is not a valid field name ,converting byte() to kafka connect data failed due to serialization error ,kafka json connector ,bytearrayconverter kafka connect ,kafka message converter ,kafka connect transform json ,kafka connect converters ,kafka connect protobuf ,org.apache.kafka.connect.errors.dataexception cannot infer mapping without schema ,org.apache.kafka.connect.errors.dataexception ,org.apache.kafka.connect.errors.dataexception failed to serialize avro data from topic ,org.apache.kafka.connect.errors.dataexception failed to deserialize data for topic to avro ,org.apache.kafka.connect.errors.dataexception is not a valid field name ,org.apache.kafka.connect.errors.dataexception multipart upload failed to complete ,org.apache.kafka.connect.errors.dataexception java.nio.channels.closedchannelexception ,org.apache.kafka.connect.errors.dataexception ,org.apache.kafka.connect.errors.dataexception no-existent s3 bucket ,org.apache.kafka.connect.errors.dataexception java.nio.channels.closedchannelexception ,org.apache.kafka.connect.errors.dataexception invalid type for struct ,org.apache.kafka.connect.errors.dataexception jsonconverter with schemas.enable requires ,org.apache.kafka.connect.errors.dataexception java.sql.sqlexception numeric overflow, fix kafka dataexception, solve kafka dataexception, Apache Kafka dataexception, Kafka exception, org.apache.kafka.connect.errors.DataException ,org.apache.kafka.connect.errors.DataException: Invalid JSON for record default value: null ,org.apache.kafka.connect.errors.DataException: Struct schemas do not match. at ,org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic Test to Avro: ,org.apache.kafka.connect.errors.DataException: Schema must contain 'type' field ,org.apache.kafka.connect.errors.DataException: Invalid JSON for array default value: "null" ,org.apache.kafka.connect.errors.DataException: Schema required for \[updating schema metadata\] ,org.apache.kafka.connect.errors.DataException: Record with a null key was encountered