How can I Send/Receive a message from Azure Servic

2019-05-28 03:00发布

问题:

I am currently researching how to connect to Azure Service Bus using Qpid JMS (qpid-jms-client-0.11.1.jar).

I have created a Demo Java application SimpleSenderReceiver which connects to an already configured Azure Service Bus using the following guide (#link1). This code seems to work using a "very" old version om the Qpid JMS client (version 0.32). I am now trying to get it to work with the latest stable version of Qpid JMS (qpid-jms-client-0.11.1.jar), And so far I have not been successful. Going through the documentation #link2 of Qpid JMS 0.11.1, you can see that the way that the in the properties file the property connectionfactory is different to that in version 0.32.

  • How can i setup a correct connection amqp connection string in the properties file?
  • How can I setup de Qpid JMS - Azure Service Bus Demo to work with the latest Qpid stable version?

I keep running in the following problem:

731 [AmqpProvider:(1):[amqps://example-bus.servicebus.windows.net?transport.connectTimeout=60000]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
javax.jms.JMSException: Idle timeout value specified in connection OPEN ('30000 ms') is not supported. Minimum idle timeout is '60000' ms. TrackingId:238849ced1em4cd3a093261372f4fc1e_G21, SystemTracker:gateway6, Timestamp:10/27/2016 8:16:23 AM [condition = amqp:internal-error]
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:150)
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:105)
at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.remotelyClosed(AmqpAbstractResource.java:147)
at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:251)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:771)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:90)
at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I have the follwing properties file servicebus.properties:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]

connectionfactory.myFactoryLookup = amqps://example-open-bus.servicebus.windows.net?jms.username=somePolicy&jms.password=aM2k3PaZY5jdIkmGKm7G%2FcH%2BUFQaFAgHIYc3dSsuiLI%3D&transport.connectTimeout=6000

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]

queue.myQueueLookup = queue1

I have the flowing class SimpleSenderReceiver.java:

package com.demo.AzureTest;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Hashtable;
import java.util.Random;

public class SimpleSenderReceiver implements MessageListener {

    private static boolean runReceiver = false;
    private Connection connection;
    private Session sendSession;
    private Session receiveSession;
    private MessageProducer sender;
    private MessageConsumer receiver;
    private static Random randomGenerator = new Random();

    public SimpleSenderReceiver() throws Exception {
        // Configure JNDI environment
        Hashtable<String, String> env = new Hashtable<String, String>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, 
                   "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        env.put(Context.PROVIDER_URL, "C://PATH//servicebus.properties");
        Context context = new InitialContext(env);

        // Look up ConnectionFactory and Queue
        ConnectionFactory cf = (ConnectionFactory) context.lookup("myFactoryLookup");
        System.out.println("lookup: " + context.lookup("myFactoryLookup"));
        System.out.println("cf:"+cf);
        Destination queue = (Destination) context.lookup("myQueueLookup");

        System.out.println("queue:");

        // Create Connection
        connection = cf.createConnection();
        System.out.println("connection :"+connection);

//        // Create sender-side Session and MessageProducer
        sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        System.out.println("Session open.");

        sender = sendSession.createProducer(queue);
        System.out.println(sender.getDestination());
        System.out.println("sender:"+sender);

        if (runReceiver) {
            // Create receiver-side Session, MessageConsumer,and MessageListener
            receiveSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            receiver = receiveSession.createConsumer(queue);
            receiver.setMessageListener(this);
            connection.start();
        }
    }

    public static void main(String[] args) {
        try {

            if ((args.length > 0) && args[0].equalsIgnoreCase("sendonly")) {
                runReceiver = false;
            }

            SimpleSenderReceiver simpleSenderReceiver = new SimpleSenderReceiver();
            System.out.println("Press [enter] to send a message. Type 'exit' + [enter] to quit.");
            BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));

            while (true) {
                String s = commandLine.readLine();
                if (s.equalsIgnoreCase("exit")) {
                    simpleSenderReceiver.close();
                    System.exit(0);
                } else {
                    simpleSenderReceiver.sendMessage();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void sendMessage() throws JMSException {
        TextMessage message = sendSession.createTextMessage();
        message.setText("Hello from SIS Test AMQP message from Java JMSaaa");
        long randomMessageID = randomGenerator.nextLong() >>>1;
        message.setStringProperty("TenantId", "klant");
        message.setStringProperty("EventType", "bericht");
        message.setStringProperty("EventTypeVersion", "1.0");
        message.setStringProperty("MessageType", "DocumentMessage");
        message.setStringProperty("OperationType", "Create");
        message.setStringProperty("SourceSystem", "sis_sender");
        message.setStringProperty("EnterpriseKey", "sis_sender-klant-bericht");
        message.setJMSMessageID("ID:" + randomMessageID);
        sender.send(message);
        System.out.println("Sent message with JMSMessageID = " + message.getJMSMessageID());
        System.out.println("Sent message with Text = " + message.getText());
    }

    public void close() throws JMSException {
        connection.close();
    }

    public void onMessage(Message message) {
        try {
            System.out.println("Received message with JMSMessageID = " + message.getJMSMessageID());
            TextMessage txtmessage = (TextMessage) message;
            System.out.println("Received message with Text = " + txtmessage.getText());
            message.acknowledge();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}  

Maven dependencies:

    <dependencies>
        <dependency>
          <groupId>org.apache.qpid</groupId>
          <artifactId>qpid-jms-client</artifactId>
          <version>0.11.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.6.2</version>
        </dependency>
    </dependencies>

--- Update ---

I have since gotten a little further but still a bit stuck. Update to the connectionfactory property:

connectionfactory.myFactoryLookup = connectionfactory.myFactoryLookup = amqps://example-open-bus.servicebus.windows.net?amqp.idleTimeout=150000&jms.username=somePolicy&jms.password=aM2k3PaZY5jdIkmGKm7G%2FcH%2BUFQaFAgHIYc3dSkuiLI%3D

I now am getting the following stacktrace:

842 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
1014 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:543efe98-3ecc-485e-9f7f-3046c40db0cb:1 connected to remote Broker: amqps://example-open-bus-bus.servicebus.windows.net
1301 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] WARN org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder - Open of resource:(JmsProducerInfo { ID:546efe78-3ecc-485d-9f6f-3065c40db1ce:1:1:1, destination = klant }) failed: Attempted to perform an unauthorized operation. TrackingId:2950b1ed7a0d4e0a97b0k32b25434ac2_G10, SystemTracker:gateway6, Timestamp:10/27/2016 1:36:21 PM [condition = amqp:unauthorized-access]
Caught exception, exiting.
javax.jms.JMSSecurityException: Attempted to perform an unauthorized operation. TrackingId:2890b0ed9a0d4e0a97b1k32b25434ac2_G10, SystemTracker:gateway6, Timestamp:10/27/2016 1:36:21 PM [condition = amqp:unauthorized-access]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:129)
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:105)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:167)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:113)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:795)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:90)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

回答1:

The newer client enables AMQP heartbeating/idle-timeout by default, while the older client did not. The client sets a default 60second timeout, and in turn this means it requests a 30sec (30000ms) idle-timeout value in its AMQP Open frame when connecting to the server, in accordance with the specifications defined behaviour (where peers advertise half their actual timeout to help avoid spurious timeouts).

ServiceBus is refusing the 30000ms Open frame value, and indicating it needs a value of at least 60000ms (or presumably also 0, which means it is disabled). To acheive this you will need to configure the client to have its timeout set to at least 120000ms, which will result in the required minimum 60000ms Open frame idle-timeout value ServiceBus is mandating (or again, perhaps disable the clients timeout handling by setting it to 0).

You can do this using the "amqp.idleTimeout" URI option, as per http://qpid.apache.org/releases/qpid-jms-0.11.1/docs/index.html#amqp-configuration-options

EDIT: I see you figured that out at the same time I was typing my answer.

The new exception is from ServiceBus saying you arent authorized to do something you are trying. It should be easy enough to catch the exception at its source and determine what.

Your URI seems fine (though I assume your username isnt actually 'somePolicy' and the double connectionfactory.myFactoryLookup = connectionfactory.myFactoryLookup = at the start is a c&p error). I haven't used the client with ServiceBus personally, but I've seen questions from various folks that have, so I'm not aware of a particular issue outright stopping them working together.



回答2:

I ran into the same security issue referred to above and spent a while tracking it down so for anyone else my issue was caused by the key value specified in the user.password query param containing the + character.

There is usually an = on the end of the value which I encoded as %3D in the string and I encoded the + as %2B however if you put a breakpoint at the point the ConnectionFactory is instantiated and look at the password attribute you will see that the = is correctly unencoded but the + has been stripped and is a SPACE hence the unauthorized access issues.

My workaround was just to regenerate the primary key in Azure so it didn't have a + in it (yuk) but it worked. Possibly a bug in the AQPID libs.