I am trying to get a barebones application same up and running leveraging ActiveMQ's AMQP with the JMS transformer. My Client library is Spring Integration, however, I cannot get a basic sample up and running in this configuration.
details on ActiveMQ's JMS transformer over AMQP: http://activemq.apache.org/amqp.html
main test app
@IntegrationComponentScan
@SpringBootApplication
public class SpringCloudStreamJmsActivemqSenderExampleApplication implements CommandLineRunner {
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
return connectionFactory;
}
@Bean
public ConnectionFactory connectionFactoryAMQP() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:5672");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
return connectionFactory;
}
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamJmsActivemqSenderExampleApplication.class, args);
}
@Autowired
JmsGateway gateway;
@Override
public void run(String... strings) throws Exception {
gateway.sendMessage("Hi");
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1, TimeUnit.SECONDS).get();
}
@Bean(name = "outboundChannel")
MessageChannel myOutBoundChannel() {
return new QueueChannel();
}
@Bean(name = "inboundChannel")
MessageChannel myInboundChannel() {
return new QueueChannel();
}
@Bean(name = "errorChannel")
MessageChannel myErrorChannel() {
return new DirectChannel();
}
@Bean
IntegrationFlow jmsInboundFlow() {
return IntegrationFlows.from(Jms
.inboundGateway(connectionFactoryAMQP())
.destination("myCoolQueue")
.errorChannel(myErrorChannel()))
.handle(this::print)
.get();
}
@Bean
IntegrationFlow jmsOutboundFlow() {
return IntegrationFlows.from(myOutBoundChannel())
.handle(Jms.outboundAdapter(connectionFactory())
.destination("myCoolQueue"))
.get();
}
@Bean
IntegrationFlow customErrorFlow() {
return IntegrationFlows.from(myErrorChannel())
.handle(this::printStackTrace)
.get();
}
private void print(Message message) {
System.out.println("Message payload: " + message.getPayload());
//throw new RuntimeException("broke it");
}
private void printStackTrace(Message errorMessage) {
((ErrorMessage)errorMessage).getPayload().printStackTrace();
}
}
messaging gateway
@MessagingGateway
interface JmsGateway {
@Gateway(requestChannel = "outboundChannel")
void sendMessage(String message);
}
ActiveMQ.xml
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.transformer=jms"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
Log output
2017-01-09 08:42:26.158 INFO 24332 --- [ restartedMain] treamJmsActivemqSenderExampleApplication : Started SpringCloudStreamJmsActivemqSenderExampleApplication in 2.676 seconds (JVM running for 3.041)
2017-01-09 08:42:31.143 WARN 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'myCoolQueue' - trying to recover. Cause: Disposed due to prior exception
2017-01-09 08:42:31.150 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
2017-01-09 08:42:36.155 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
2017-01-09 08:42:41.163 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=2, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
The ActiveMQ client only speaks the ActiveMQ native protocol OpenWire so trying to connect it to the AMQP port won't work, the connection attempt will fail. You need to use an AMQP client to connect to the AMQP port on the broker to send and receive messages over AMQP. The Apache Qpid project has a number of AMQP v1.0 client to choose from. If you want to stick to JMS type client APIs then the Qpid JMS client is the one for you.
you have to change your Bean definition by 2 ways :
JNDI:
OR
FACTORY:
Add this dependency
add port in activemq.xml
transport.transformer=jms only to convert JMS messages from/to AMQP messages on the broker side between AMQP transport & ActiveMQ, when broker receives an AMQP message through AMQP transport it is converted from AMQP message to JMS message and when a message is dispatched to consumer through AMQP transport it is converted from JMS to AMQP message.