Is there a Python API for event-driven Kafka consu

2019-07-25 10:26发布

问题:

I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.

I have been looking for something like the Spring implementation:

@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
    System.out.println("Received Messasge in group mygroup: " + message);
}

I have looked at:

  1. kafka-python
  2. pykafka
  3. confluent-kafka

But I couldn't find anything related to event-driven style of implementation in Python.

回答1:

Kafka Consumer have to continuously poll to retrieve data from brokers.

Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.

You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.



回答2:

Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.

from kafka import KafkaConsumer
import threading

BOOTSTRAP_SERVERS = ['localhost:9092']

def register_kafka_listener(topic, listener):
# Poll kafka
    def poll():
        # Initialize consumer Instance
        consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

        print("About to start polling for topic:", topic)
        consumer.poll(timeout_ms=6000)
        print("Started Polling for topic:", topic)
        for msg in consumer:
            print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
            kafka_listener(msg)
    print("About to register listener to topic:", topic)
    t1 = threading.Thread(target=poll)
    t1.start()
    print("started a background thread")

def kafka_listener(data):
    print("Image Ratings:\n", data.value.decode("utf-8"))

register_kafka_listener('topic1', kafka_listener)

The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.