In this post , we will see How To Fix Kafka Python Error - Producer Unable To Send Messages To Kafka . If you are using Kafka package in Python and trying to send messages to Kafka Broker (Writing to Topics from Producer), sometimes it throws the below error - "Kafka Producer Closed" and the Producer will not be able to deliver the messages.
INFO:kafka.producer.kafka:Kafka producer closed
First thing first , the version used in Kafka as well as in Python needs to be checked. I had found issues due to version compatibility issue between the tech stacks in the similar context. So I had reverted with trial & error for earlier releases and worked fine for me . Always try to stick to the Last STABLE Release . The code used in this article is used for the below versions -
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from time import sleep
from json import dumps
from kafka import KafkaProducer
from datetime import datetime
from kafka import KafkaClient
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
filepath = 'input.txt'
with open(filepath) as fp:
line = fp.readline()
cnt = 1
while line:
msg = str(datetime.now())+str(',')+line
producer.send(topic='test',value=msg)
producer.flush()
print("Msg Sent - "+msg)
sleep(6)
line = fp.readline()
cnt += 1
Note the major pointers - You need to add producer.send() & producer.flush() in sequence to ensure the messages are delivered from Producer to Kafka Broker. Hope this helps to solve the issue.
Kafka Producer, kafka error, kafka producer error handling