Just to make things tricky, I'd like to consume messages from the rabbitMQ queue. Now I know there is a plugin for MQTT on rabbit (https://www.rabbitmq.com/mqtt.html).
However I cannot seem to make an example work where Spark consumes a message that has been produced from pika.
For example I am using the simple wordcount.py program here (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) to see if I can I see a message producer in the following way:
import sys
import pika
import json
import future
import pprofile
def sendJson(json):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='analytics', durable=True)
channel.queue_bind(exchange='analytics_exchange',
queue='analytics')
channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
connection.close()
if __name__ == "__main__":
with open(sys.argv[1],'r') as json_file:
sendJson(json_file.read())
The sparkstreaming consumer is the following:
import sys
import operator
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")
#RabbitMQ
"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""
brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
However unlike the simple wordcount example, I cannot get this to work and get the following error:
16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)
So my questions are, what should be the settings in terms of MQTTUtils.createStream(ssc, brokerUrl, topic)
to listen into the queue and whether there are any more fuller examples and how these map onto those of rabbitMQ.
I am running my consumer code with: ./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py
I have updated the producer code as follows with TCP parameters as suggested by one comment:
url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
and the spark streaming as:
brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()
From the
MqttAsyncClient
Javadoc, the server URI must have one of the following schemes:tcp://
,ssl://
, orlocal://
. You need to change yourbrokerUrl
above to have one of these schemes.For more information, here's a link to the source for
MqttAsyncClient
:https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java#L272
It looks like you are using wrong port number. Assuming that:
rabbitmq-plugins enable rabbitmq_mqtt
) and restarted RabbitMQ serverspark-streaming-mqtt
when executingspark-submit
/pyspark
(either withpackages
orjars
/driver-class-path
)you can connect using TCP with
tcp://localhost:1883
. You have to also remember that MQTT is usingamq.topic
.Quick start:
create
Dockerfile
with following content:build Docker image:
start image and wait until server is ready:
create
producer.py
with following content:start producer
and visit management console http://127.0.0.1:15672/#/exchanges/%2F/amq.topic
to see that messages are received.
create
consumer.py
with following content:download dependencies (adjust Scala version to the one used to build Spark and Spark version):
make sure
SPARK_HOME
andPYTHONPATH
point to the correct directories.submit
consumer.py
with (adjust versions as before):If you followed all the steps you should see Hello world messages in the Spark log.