可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
kafka-python (1.0.0) throws error while connecting to the broker.
At the same time /usr/bin/kafka-console-producer and /usr/bin/kafka-console-consumer work fine.
Python application used to work well also, but after zookeeper restart, it no longer can connect.
I am using bare bones example from the docs:
from kafka import KafkaProducer
from kafka.common import KafkaError
producer = KafkaProducer(bootstrap_servers=['hostname:9092'])
# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')
I am getting this error:
Traceback (most recent call last): File "pp.py", line 4, in <module>
producer = KafkaProducer(bootstrap_servers=['hostname:9092']) File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
self.config['api_version'] = client.check_version() File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
connect(node_id) File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object at 0x7f6171294c50>> ignored
When stepping through ( /usr/lib/python2.6/site-packages/kafka/client_async.py) I noticed that line 270 evaluates as false:
270 if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0:
271 if self._can_send_request(node_id):
272 return True
273 return False
In my case self._metadata_refresh_in_progress is False, but the ttl() = 0;
At the same time kafka-console-* are happily pushing messages around:
/usr/bin/kafka-console-producer --broker-list hostname:9092 --topic test-topic
hello again
hello2
Any advice?
回答1:
I had the same problem, and none of the solutions above worked. Then I read the exception messages and it seems it's mandatory to specify api_version, so
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,10))
works fine (at least completes without exceptions, now have to convince it to accept messages ;) )
回答2:
I had a similar problem. In my case, broker hostname was unresolvable on the client side . Try to explicitly set advertised.host.name in the configuration file.
回答3:
A host could have multiple dns aliases. Any of them would work for ssh or ping test. However kafka connection should use the alias that matches advertised.host.name in server.properties file of the broker.
I was using a different alias in bootstrap_servers parameter. Hence an error. Once I changed the call to use advertised.hostname, the problem was solved
回答4:
I had the same problem.
I solved the problem with hint of user3503929.
The kafka server was installed on windows.
server.properties
...
host.name = 0.0.0.0
...
.
producer = KafkaProducer(bootstrap_servers='192.168.1.3:9092',
value_serializer=str.encode)
producer.send('test', value='aaa')
producer.close()
print("DONE.")
There was no problem with the processing in the windows kafka client.
However, when I send a message to topic using kafka-python in ubuntu, a NoBrokersAvailable
exception is raised.
Add the following settings to server.properties.
...
advertised.host.name = 192.168.1.3
...
It runs successfully in the same code.
I spent three hours because of this.
Thanks
回答5:
Install kafka-python using pip install kafka-python
Steps to create kafka data pipeline:-
1. Run the Zookeeper using shell command or install zookeeperd using
sudo apt-get install zookeeperd
This will run zookeeper as a daemon and by default listens to 2181 port
- Run the kafka Server
- Run the script with producer.py and consumer.py on separate consoles to see the live data.
Here are the commands to run:-
cd kafka-directory
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
Now that you have zookeeper and kafka server running, Run the producer.py script and consumer.py
Producer.py:
from kafka import KafkaProducer
import time
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'simple-text'
<implement a file reading functionality or any log reader and put it in lines>
for line in lines:
lst = line.split(" ")
try:
final_list = [lst[x] for x in range(14)]
producer.send(topic, final_list[0]).get(timeout=10)
except IndexError as e:
print e
continue
Consumer.py:-
from kafka import KafkaConsumer
topic = 'simple-text'
consumer = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
# print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
# message.offset, message.key,
# message.value))
print message
Now run the producer.py and consumer.py in separate terminals to see the live data..!
Note: Above producer.py script runs once only to run it forever, use while loop and use time module.
回答6:
I had a similar problem and removing the port from the bootstrap_servers helped.
consumer = KafkaConsumer('my_topic',
#group_id='x',
bootstrap_servers='kafka.com')
回答7:
In your server.properties file make sure he Listener IP is set to your box Ip address which is accessible to remote machine. By default it is localhost
Update this line in your server.properties:
listeners=PLAINTEXT://<Your-IP-address>:9092
Also make sure you don't have a firewall which might be blocking other IP addresses to reach you. If you have sudo previleges. The try disabling the firewall.
sudo systemctl stop firewalld