ActiveMQ AMQP with JMS transformer leveraging spri

2019-06-10 04:29发布

问题:

I am trying to get a barebones application same up and running leveraging ActiveMQ's AMQP with the JMS transformer. My Client library is Spring Integration, however, I cannot get a basic sample up and running in this configuration.

details on ActiveMQ's JMS transformer over AMQP: http://activemq.apache.org/amqp.html

main test app

@IntegrationComponentScan
@SpringBootApplication
public class SpringCloudStreamJmsActivemqSenderExampleApplication implements CommandLineRunner {

    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://localhost:61616");
        connectionFactory.setUserName("admin");
        connectionFactory.setPassword("admin");
        return connectionFactory;
    }

    @Bean
    public ConnectionFactory connectionFactoryAMQP() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://localhost:5672");
        connectionFactory.setUserName("admin");
        connectionFactory.setPassword("admin");
        return connectionFactory;
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamJmsActivemqSenderExampleApplication.class, args);
    }

    @Autowired
    JmsGateway gateway;

    @Override
    public void run(String... strings) throws Exception {
        gateway.sendMessage("Hi");
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(1, TimeUnit.SECONDS).get();
    }

    @Bean(name = "outboundChannel")
    MessageChannel myOutBoundChannel() {
        return new QueueChannel();
    }

    @Bean(name = "inboundChannel")
    MessageChannel myInboundChannel() {
        return new QueueChannel();
    }

    @Bean(name = "errorChannel")
    MessageChannel myErrorChannel() {
        return new DirectChannel();
    }

    @Bean
    IntegrationFlow jmsInboundFlow() {
        return IntegrationFlows.from(Jms
                .inboundGateway(connectionFactoryAMQP())
                .destination("myCoolQueue")
                .errorChannel(myErrorChannel()))
                    .handle(this::print)
                .get();
    }

    @Bean
    IntegrationFlow jmsOutboundFlow() {
        return IntegrationFlows.from(myOutBoundChannel())
                .handle(Jms.outboundAdapter(connectionFactory())
                .destination("myCoolQueue"))
                .get();
    }

    @Bean
    IntegrationFlow customErrorFlow() {
        return IntegrationFlows.from(myErrorChannel())
                    .handle(this::printStackTrace)
                .get();
    }

    private void print(Message message) {
        System.out.println("Message payload: " + message.getPayload());
        //throw new RuntimeException("broke it");
    }

    private void printStackTrace(Message errorMessage) {
        ((ErrorMessage)errorMessage).getPayload().printStackTrace();
    }
}

messaging gateway

@MessagingGateway
interface JmsGateway {
    @Gateway(requestChannel = "outboundChannel")
    void sendMessage(String message);
}

ActiveMQ.xml

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;transport.transformer=jms"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

Log output

2017-01-09 08:42:26.158  INFO 24332 --- [  restartedMain] treamJmsActivemqSenderExampleApplication : Started SpringCloudStreamJmsActivemqSenderExampleApplication in 2.676 seconds (JVM running for 3.041)
2017-01-09 08:42:31.143  WARN 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'myCoolQueue' - trying to recover. Cause: Disposed due to prior exception
2017-01-09 08:42:31.150 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
2017-01-09 08:42:36.155 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
2017-01-09 08:42:41.163 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=2, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672

回答1:

you have to change your Bean definition by 2 ways :

JNDI:

@Bean
public ConnectionFactory connectionFactoryAMQP() {
    String factoryName = "myFactoryLookup";
    Properties props = new Properties();
    props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
    props.setProperty("connectionfactory." + factoryName, "amqp://localhost:5672");
    props.put("property.connectionfactory." + factoryName + ".username", "admin");
    props.put("property.connectionfactory." + factoryName + ".password", "admin");
    InitialContext ic = new InitialContext(props);
    ConnectionFactory connectionFactory = (ConnectionFactory) ic.lookup(factoryName );
    return connectionFactory;
}

OR

FACTORY:

@Bean
public ConnectionFactory connectionFactoryAMQP() {
        org.apache.qpid.jms.JmsConnectionFactory connectionFactory = new JmsConnectionFactory();
        connectionFactory.setRemoteURI("amqp://localhost:5672");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
    return connectionFactory;
}

Add this dependency

<dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.9.0</version> </dependency>

add port in activemq.xml

 <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?transport.transformer=jms"/>

transport.transformer=jms only to convert JMS messages from/to AMQP messages on the broker side between AMQP transport & ActiveMQ, when broker receives an AMQP message through AMQP transport it is converted from AMQP message to JMS message and when a message is dispatched to consumer through AMQP transport it is converted from JMS to AMQP message.



回答2:

The ActiveMQ client only speaks the ActiveMQ native protocol OpenWire so trying to connect it to the AMQP port won't work, the connection attempt will fail. You need to use an AMQP client to connect to the AMQP port on the broker to send and receive messages over AMQP. The Apache Qpid project has a number of AMQP v1.0 client to choose from. If you want to stick to JMS type client APIs then the Qpid JMS client is the one for you.