I'm trying to write a simple pyspark job, which would receive data from a kafka broker topic, did some transformation on that data, and put the transformed data on a different kafka broker topic.
I have the following code, which reads data from a kafka topic, but has no effect running sendkafka function:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
def sendkafka(messages):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
for message in messages:
yield producer.send_messages('spark.out', message)
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 5)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
parsed = kvs.map(lambda (key, value): json.loads(value))
parsed.pprint()
sentRDD = kvs.mapPartitions(sendkafka)
sentRDD.count()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
What should I change, in order to make my sendkafka function to actually send data to the spark.out kafka topic?