I'm trying to implement some features on an MQTT client in Java with Eclipse Paho. The target is to subscribe to a topic and when a message is received, the client send another message on another topic.
This looks very easy, but I have a weird problem I can't solve. Here is my code :
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttOperations implements MqttCallback {
MqttClient sampleClient;
MqttConnectOptions connOpts;
public MqttOperations() {
}
public static void main(String[] args) throws InterruptedException {
new MqttOperations().launchMqttClient();
}
public void launchMqttClient() throws InterruptedException {
try {
MemoryPersistence persistence = new MemoryPersistence();
sampleClient = new MqttClient("tcp://broker.mqttdashboard.com:1883", "iamaclient", persistence);
connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
sampleClient.connect(connOpts);
sampleClient.subscribe("topic/example/ofmessage");
sampleClient.setCallback(this);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException
{
System.out.println("Received: " + message.toString());
try{
System.out.println("Publishing message: i am the answer");
MqttMessage ans = new MqttMessage("i am the answer".getBytes());
ans.setQos(2);
sampleClient.publish("topic/example/ofanswer", ans);
System.out.println("Message published");
}catch(MqttException me){
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
The thing is, this program works only once. When the message is received, the answer to this message is sent, but it appears that the message "message published" is never displayed on the screen, and the client doesn't receive any other messages.
I have this impression that the line sampleClient.publish("topic/example/ofanswer", ans);
never finishes its execution.
Does anyone know how it comes and how to solve my problem please?
I had a similar problem today. When I read an other question with two connections I got it: You need two MqttClient instances. One for publishing and one for subscribing. Unfortunately I found no documentation for that fact.
By the way. In my first implementation with two clients, I gave them the same ids (logically it should be the same connection). But the second connection disconnects the first one. When I started to use two different ids, it starts to work.
Dominik Obermaier is right: the problem is that you block in messageArrived. Specifically, MqttClient.publish waits until a notice of delivery for the message has been received - but the MqttClient work thread never gets to retrieve it, because it's sitting waiting for the very notice in messageArrived!
The two-clients solution works because the other client's work thread is free to retrieve the notice from the socket, but the proper solution is to either publish with QoS 0 from within messageArrived (as QoS 0 messages need no confirmation of delivery) or use an API that does not wait for the message to be delivered, such as MqttTopic.publish.