DevOps | Cloud | Analytics | Open Source | Programming





How To Fix Kafka Error - Consumer Unable To Send Messages To Kafka



In this post , we will see How To Fix Kafka Python Error - Consumer Unable To Send Messages To Kafka . If you are trying to send messages or write to a topic from Consumer , it might not throw exception even though consumer is not writing. Consider the below code snippet as a Consumer logic -  


<SOME_LOOP_CONDITION> :
  all_messages = consumer.poll(timeout_ms=10000, max_records=2)
  for partition, messages in all_messages.items():
    for msg in messages:
       try:
         #process msg .....
         outmsg1, outmsg2 = <YOUR_FUNCTION_TO_Do_SOME_OPERATION_A_msg(msg.value)>
         producer.send(Topic1, key = msg.key, value = outmsg1).add_callback(<SEND_SUCCESS_FUNCTION>).add_errback(<SEND_ERROR_FUNCTION>)
         producer.flush()
         producer.send(Topic2, key = msg.key, value = outmsg2).add_callback(<SEND_SUCCESS_FUNCTION>).add_errback(<SEND_ERROR_FUNCTION>)
         producer.flush()
         consumer.commit()
       except Exception as e:
         log.error('Flagged error', exc_info=str(e))

If consumer is unable to write to the Topic (without throwing any error) , you could try the below  -

  • Make sure the all Producers are Running in the system
 

  • Make sure all the Consumers or Consumer Group is Running
 

  • There might be some issue with the Consumer Offset - auto.offset.reset. Try to set auto.offset.reset property of the Topic's consumers to 'earliest' .
To understand this - One thing that can affect the offset value is regarding the earliest and latest configs is log retention policy. Lets say - you have a topic with retention configured to 1 hour.
You produce 3 messages, and then an hour later you post 3 more messages. However , the latest offset would still be the the same as in previous .
But the earliest one won't be able to become '0' (ZERO) because Kafka would have already removed these messages and thus the earliest available offset will still be 3. 
To be more precise , You need to know the three options for the auto.offset.reset value -

    • earliest: automatically reset the offset to the earliest offset
    • latest: automatically reset the offset to the latest offset
    • none: throw exception to the consumer if no previous offset is found for the consumer's group
    • anything else: throw exception to the consumer.
 

  • You could also check the settings for  - offsets.retention.minutes along with auto.offset.reset . When the time passed after the last commit exceeds offsets.retention.minutes value , then auto.offset.reset comes into picture. So it is a good idea to cross-check that as well.
Hope this helps to solve the issue.    

Other Reads -


Kafka Consumer, kafka error, kafka consumer error handling