I am looking for a java code for MQTT client that subscribes to a given topic, every message published on that topic should reach the client only once.I have written many codes and in all the cases messages are delivered properly to the client when it is connected to the broker but if the subscribed client disconnects from the broker for some time and then again connects back, it does not receive the messages that were sent during the time that it was not connected and I have set the clean session flag also as false but still its not working, the code that I used is given below
import org.fusesource.hawtbuf.*;
import org.fusesource.mqtt.client.*;
/**
* Uses an callback based interface to MQTT. Callback based interfaces
* are harder to use but are slightly more efficient.
*/
class Listener {
public static void main(String []args) throws Exception {
String user = env("APOLLO_USER", "admin");
String password = env("APOLLO_PASSWORD", "password");
String host = env("APOLLO_HOST", "localhost");
int port = Integer.parseInt(env("APOLLO_PORT", "61613"));
final String destination = arg(args, 1, "subject");
MQTT mqtt = new MQTT();
mqtt.setHost(host, port);
mqtt.setUserName(user);
mqtt.setPassword(password);
mqtt.setCleanSession(false);
mqtt.setClientId("newclient");
final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new org.fusesource.mqtt.client.Listener() {
long count = 0;
long start = System.currentTimeMillis();
public void onConnected() {
}
public void onDisconnected() {
}
public void onFailure(Throwable value) {
value.printStackTrace();
System.exit(-2);
}
public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) {
System.out.println("Nisha Messages : " + msg);
System.out.println("Nisha topic" + topic);
System.out.println("Nisha Receive acknowledgement : " + ack);
String body = msg.utf8().toString();
if("SHUTDOWN".equals(body)) {
long diff = System.currentTimeMillis() - start;
System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0)));
connection.disconnect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
System.exit(0);
}
@Override
public void onFailure(Throwable value) {
value.printStackTrace();
System.exit(-2);
}
});
} else {
if( count == 0 ) {
start = System.currentTimeMillis();
}
if( count % 1000 == 0 ) {
System.out.println(String.format("Received %d messages.", count));
}
count ++;
}
}
});
connection.connect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
System.out.println("connected in :::: ");
Topic[] topics = {new Topic(destination, QoS.AT_MOST_ONCE)};
connection.subscribe(topics, new Callback<byte[]>() {
public void onSuccess(byte[] qoses) {
}
public void onFailure(Throwable value) {
value.printStackTrace();
System.exit(-2);
}
});
}
@Override
public void onFailure(Throwable value) {
value.printStackTrace();
System.exit(-2);
}
});
// Wait forever..
synchronized (Listener.class) {
while(true)
Listener.class.wait();
}
}
private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if( rc== null )
return defaultValue;
return rc;
}
private static String arg(String []args, int index, String defaultValue) {
if( index < args.length )
return args[index];
else
return defaultValue;
}
}
Am I doing something wrong here?