I am trying to use Apache Qpid through Spring Boot application using Jms Qpid client. I am able to configure it but when I am trying to receive message from the queue, the logger is printing:
Dispatcher(918480905)Received a message(878303980)[1] from queue 1 )without a handler - rejecting(requeue)...
Here is my code:
JmsConfiguration.java
@Configuration
public class JmsConfiguration {
@Primary
@Bean
public Context createContext()
{
Properties properties=new Properties();
System.setProperty("IMMEDIATE_PREFETCH", "true");
Context context=null;
try {
properties.load(this.getClass().getResourceAsStream("application.properties"));
context = new InitialContext(properties);
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return context;
}
@Primary
@Bean
public ConnectionFactory createConnectionFactory(Context context)
{
ConnectionFactory connectionFactory=null;
try {
connectionFactory = (ConnectionFactory) context.lookup("qpidConnectionFactory");
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return connectionFactory;
}
@Primary
@Bean
public Connection jmsConnection(ConnectionFactory connectionFactory) throws Exception
{
Connection connection = connectionFactory.createConnection();
connection.start();
return connection;
}
@Primary
@Bean
public Queue jmsQueue(Context context) throws Exception
{
Queue queue = (Queue) context.lookup("myqueue");
return queue;
}
}
application.properties
java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.qpidConnectionFactory = amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672?maxprefetch='0''
queue.myqueue = queue1
ScheduledTask.java It just run send and receive messages in intervals.
@Component
public class ScheduledTasks
{
Connection connection;
Queue queue;
@Autowired
public ScheduledTasks(Connection connection, Queue queue) {
this.connection=connection;
this.queue=queue;
}
MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("Received id is------>");
System.out.println(message);
}
};
@Scheduled(fixedDelay = 2000)
public void sendMessage() throws Exception
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Message message=session.createTextMessage();
MessageProducer messageProducer=session.createProducer(queue);
message.setStringProperty("value", "BOOM");
messageProducer.send(message);
session.commit();
messageProducer.close();
//connection.close();
System.out.println("---------------Message Sent");
}
//@JmsListener(destination="queue1")
@Scheduled(initialDelay=5000, fixedDelay = 5000)
public void receiveMessage() throws Exception
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(queue);
// if(messageConsumer.getMessageListener()==null)
// messageConsumer.setMessageListener(messageListener);
Message message = messageConsumer.receive(3000);
if(message!=null)
System.out.println("----------------->"+message.getStringProperty("value"));
session.commit();
messageConsumer.close();
//connection.close();
System.out.println("--------------->Got Message");
}
}
You create an instance implementing
MessageListener
but you don't do anything with it.In Spring you should use
DefaultMessageListenerContainer
orSimpleMessageListenerContainer
from spring-jms and create it as a Spring Bean in theJmsConfiguration
class. After setting connection details (ConnectionFactory
,Queue
,sessionTransacted
etc.) you also need to set the JMSMessageListener
implementing class.