DevOps | Cloud | Analytics | Open Source | Programming





How To Fix Kafka Error - Producer Unable To Send Messages To Kafka



In this post , we will see How To Fix Kafka Python Error - Producer Unable To Send Messages To Kafka . If you are using Kafka package in Python and trying to send messages to Kafka Broker (Writing to Topics from Producer), sometimes it throws the below error - "Kafka Producer Closed" and the Producer will not be able to deliver the messages.


INFO:kafka.producer.kafka:Kafka producer closed

First thing first , the version used in Kafka as well as in Python needs to be checked. I had found issues due to version compatibility issue between the tech stacks in the similar context. So I had reverted with trial & error for earlier releases and worked fine for me . Always try to stick to the Last STABLE Release . The code used in this article is used for the below versions -

  • Kafka 2.11
  • kafka-python 2.0.1
  • Python 3.8
Refer the sample code below  -

  • It reads line by line from input.txt file
  • Appends Timestamp to each line (message)
  • And then sends to Kafka Broker in every 6 seconds .

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from time import sleep
from json import dumps
from kafka import KafkaProducer
from datetime import datetime
from kafka import KafkaClient
import json 


producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))


filepath = 'input.txt'
with open(filepath) as fp:
   line = fp.readline()
   cnt = 1
   while line:
     msg = str(datetime.now())+str(',')+line
     producer.send(topic='test',value=msg)
     producer.flush()
     print("Msg Sent - "+msg)
     sleep(6)
     line = fp.readline()
     cnt += 1


Note the major pointers - You need to add producer.send() & producer.flush()  in sequence to ensure the messages are delivered from Producer to Kafka Broker. Hope this helps to solve the issue.    

Other Reads -


Kafka Producer, kafka error, kafka producer error handling