How do I get Websphere MQ connection status and ho

2019-03-26 13:26发布

问题:

1.) From a .net client, how do I test if the client is connected to the server (i.e. can send and receive) Yes, I could send a message inside a try block and catch the ensuing exception but I'm hoping for a more elegant solution.

2) How do I open, close, and re-open connections? In my attempts to resolve question 1 above I discovered that if I open a connection then call connection.Close() I am not able to obtain another connection from the connection factory (see code fragment below). I receive error message XMSCC0008

I am using a very standard vanilla MQ configuration . Here is how my client connects:

ISession session = MQAccess.GetSession(MQAccess.Connection);
IDestination destination = session.CreateTopic(SubTopicName);
Consumer = MQAccess.GetConsumer(session, destination);
Consumer.MessageListener = new MessageListener(HandleMQSubEvent);
MQAccess.Connection.Start();

where MQAccess is a small utility class.

Edited the question to add MQAccess code:

public static class MQAccess
{
    public static readonly MQConfigurationSectionHandler ConfigSettings;
    public static readonly IConnectionFactory ConnectionFactory;

    private static readonly IConnection connection;
    public static IConnection Connection
    {
        get { return connection; }
    }

    static MQAccess()
    {
        ConfigSettings = (MQConfigurationSectionHandler)
            ConfigurationManager.GetSection("mq-configuration");

        XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
        ConnectionFactory = factory.CreateConnectionFactory();
        ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname);
        ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port);
        ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel);

        if (ConfigSettings.QueueManager == string.Empty)
        {
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "");
        }
        else
        {
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager);
        }

        connection = GetConnection();
    }

    public static IConnection GetConnection()
    {
        return ConnectionFactory.CreateConnection();
    }

    public static ISession GetSession(IConnection connection)
    {
        return connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
    }

    public static IMessageProducer GetProducer(ISession session, IDestination destination)
    {
        return session.CreateProducer(destination);
    }

    public static IMessageConsumer GetConsumer(ISession session, IDestination destination)
    {
        return session.CreateConsumer(destination);
    }

    public static void MQPub(string TopicURI, string message)
    {
        using (var session = GetSession(Connection))
        {
            using (var destination = session.CreateTopic(TopicURI))
            {
                using (var producer = GetProducer(session, destination))
                {
                    producer.Send(session.CreateTextMessage(message));
                }
            }
        }
    }

    public static void MQPub(string TopicURI, IEnumerable<string> messages)
    {
        using (var session = GetSession(Connection))
        {
            using (var destination = session.CreateTopic(TopicURI))
            {
                using (var producer = GetProducer(session, destination))
                {
                    foreach (var message in messages)
                    {
                        producer.Send(session.CreateTextMessage(message));
                    }
                }
            }
        }
    }
}

Edit: Renamed MQAccess class to MQClient. Made it an instance class per T Rob suggestion. Disconnect method still crashes with error msgs listed above

public class MQClient : IDisposable
{
    public MQConfigurationSectionHandler ConfigSettings { get; private set; }
    public IConnectionFactory ConnectionFactory { get; private set; }

    public IConnection Connection { get; private set;  }

    public IMessageConsumer Consumer { get; private set; }
    public IMessageProducer Producer { get; private set; }
    // Save sessions as fields for disposing and future subscription functionality
    private ISession ProducerSession;
    private ISession ConsumerSession;
    public string SubTopicName { get; private set; }
    public string PubTopicName { get; private set; }
    public bool IsConnected { get; private set; }
    public event Action<Exception> ConnectionError;
    private Action<IMessage> IncomingMessageHandler;

    public MQClient(string subTopicName, string pubTopicName, Action<IMessage> incomingMessageHandler)
    {
        // Dont put connect logic in the constructor.  If we lose the connection we may need to connect again.
        SubTopicName = subTopicName;
        PubTopicName = pubTopicName;
        IncomingMessageHandler = incomingMessageHandler;
    }

    public string Connect()
    {
        IsConnected = false;
        string errorMsg = string.Empty;

        ConfigSettings = (MQConfigurationSectionHandler)
                ConfigurationManager.GetSection("mq-configuration");

        XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
        ConnectionFactory = factory.CreateConnectionFactory();
        ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname);
        ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port);
        ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel);

        if (ConfigSettings.QueueManager == string.Empty)
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "");
        else
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager);

        Connection = ConnectionFactory.CreateConnection();


        if (!string.IsNullOrEmpty(PubTopicName))
        {
            ProducerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
            Producer = ProducerSession.CreateProducer(ProducerSession.CreateTopic(PubTopicName));
        }

        if (!string.IsNullOrEmpty(SubTopicName) && IncomingMessageHandler != null)
        {
            ConsumerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
            Consumer = ConsumerSession.CreateConsumer(ConsumerSession.CreateTopic(SubTopicName));
            Consumer.MessageListener = new MessageListener(IncomingMessageHandler);
        }

        try
        {
            Connection.Start();
            Connection.ExceptionListener = new ExceptionListener(ConnectionExceptionHandler);
            IsConnected = true;
        }
        catch (TypeInitializationException ex)
        {
            errorMsg = "A TypeInitializationException error occured while attempting to connect to MQ.  Check the Queue configuration in App.config. The error message is: " + ex.Message; 
        }
        catch (IllegalStateException ex)
        {
            errorMsg = "An IllegalStateException error occured while attempting to connect to MQ.  Check the Queue configuration in App.config. The error message is: " + ex.Message; 
        }

        return errorMsg;
    }

    public void Disconnect()
    {
        if (Producer != null)
        {
            Producer.Close();
            Producer.Dispose();
            Producer = null;
        }

        if (ProducerSession != null)
        {
            // Call Unsubscribe here if subscription is durable

            ProducerSession.Close();
            ProducerSession.Dispose();
            ProducerSession = null;
        }

        if (Connection != null)
        {
            Connection.Stop();

            //if (Connection.ExceptionListener != null)
            //    Connection.ExceptionListener = null;

            // Per Shashi............
            //if (Consumer.MessageListener != null)
            //    Consumer.MessageListener = null;

            Connection.Close();
            Connection.Dispose();
            Connection = null;
        }

        if (Consumer != null)
        {

            if (Consumer.MessageListener != null)
                Consumer.MessageListener = null;

            Consumer.Close();
            Consumer.Dispose();
            Consumer = null;
        }


        if (ConsumerSession != null)
        {
            // Call Unsubscribe here if subscription is durable
            ConsumerSession.Close();
            ConsumerSession.Dispose();
            ConsumerSession = null;
        }

        IsConnected = false;
    }


    public void Publish(string message)
    {
        Producer.Send(ProducerSession.CreateTextMessage(message));
    }


    public void Publish(string[] messages)
    {
        foreach (string msg in messages)
            Publish(msg);
    }

    public void ConnectionExceptionHandler(Exception ex)
    {
        Disconnect(); // Clean up

        if (ConnectionError != null)
            ConnectionError(ex);
    }

    #region IDisposable Members
    private bool disposed;

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!this.disposed)
        {
            if (disposing)
                Disconnect();

            disposed = true;
        }
    }
    #endregion

}

回答1:

The problem is here --> where MQAccess is a small utility class.

The first part of the question asks how to tell if the connection is active. The XMS classes for WebSphere MQ are an implementation of the JMS specification for non-Java platforms. They follow the JMS spec fairly closely and the JMS spec does not have a method on the connection or session equivalent to isConnected therefore neither does XMS. However, all GET and PUT activity should occur within a try/catch block in order to catch the JMS exceptions. (From which you always print the linkedException, right?) When a JMS exception is thrown the app either treats it as fatal and dies or else it closes all JMS objects except for the Connection Factory, waits a few seconds and then re-drives the connection sequence.

UPDATE based on new info in the question:
Thanks for posting the MQAccess class. This provides considerable insight into what's happening, although there still isn't any code showing where the connection is closed and reopened as per Part #2 of the question.

However, the code shows that the MQAccess class creates a private instance of ICONNECTION connection as the class instance is constructed, which is then exposed publicly as MQAccess.GetConnection. The MQAccess class as currently posted has no public or private class method that would ever replace the connection handle held by connection so if MQAccess.Connection.Close() is ever called, that IConnection object instance within the MQAccess class will forever after hold an invalid connection handle. Once the connection is closed, that instance of MQAccess is effectively dead. You'd have to delete and reinstantiate MQAccess to get a new connection.

The MQAccess class does expose the connection factory publicly so in theory it would be possible to call MQAccess.GetConnection from outside the class and obtain a valid new IConnection object, even after closing the original one. However, that instance would exist outside the scope of the MQAccess class and thus any subsequent calls to MQAccess would refer to its defunct instance variable connection rather than the new connection instance created outside the class.

If you need to close and recreate connections, you might consider managing that from inside of MQAccess. A low-tech approach might be to write an MQAccess.Close() method for the connection which would close the existing connection then immediately call connection = GetConnection(); so that the private connection variable always holds a valid connection handle.

If this doesn't resolve the problem, please post the code that is closing and recreating the connections.

By the way, the non-transacted session over a network connection opens the possibility to lose or duplicate messages for any JMS provider, including WMQ. Was this what you intended? I've explained why this is in an other SO post here.



回答2:

Adding to comments from T.Rob.

Question 1:
I hope you have access to source code of MQAccess. If yes, you could expose a property in MQAccess that indicates whether a connection is active or not. If you do not have access then you may have to ask the author of that class to add this property. You can do the following to set/reset the property.

1) Set the property after createConnection method returns successfully.
2) Set an Exception listener for the connection.
3) Reset the property in Exception handler. Check the reason code and reset the property if it's a connection broken error (XMSWMQ1107 and the linked exception can have MQRC 2009).

Question 2
It would help if you can show us how you are closing and reopening connections. My recommendation to close connection is:
1) First do a connection.Stop().
2) Remove any message listeners, basically do a consumer.MessageListener = null.
3) Then do connection.Close().
4) Do a connection = null

Additional Information Here is the sample I have used to test.

    private void OnException(Exception ex)
    {
        XMSException xmsex = (XMSException)ex;
        Console.WriteLine("Got exception");
        // Check the error code.
        if (xmsex.ErrorCode == "XMSWMQ1107")
        {
            Console.WriteLine("This is a connection broken error");
            stopProcessing = true; // This is a class member variable
        }
    }

In your method where connection is created, set the exception listener.

        // Create connection.
        connectionWMQ = cf.CreateConnection();
        connectionWMQ.ExceptionListener = new ExceptionListener(OnException);

Whenever there is a connection error, the exception listener will be invoked and flag is set to true.

It is good a practice to dispose the objects when they are no longer required. There is parent child relation, Consumer, Producer etc are children of Session which in turn is a child of Connection. So order of disposal can be child first and parent next. But if a parent is disposed, children are also disposed automatically.