How do I create a tcp-inbound-gateway which detect

2019-05-28 07:21发布

问题:

I'm trying to configure a set of spring-integration components to consume data from a TCP socket. The basic protocol is that upon opening the connection I am prompted for my username, then password, then if authentication succeeds data is streamed to me as it becomes available. A ping message is sent to me every 30 seconds so that I can verify that the connection is alive during quiet periods when no data is streaming.

Per the spring-integration docs I have set up a TCP Gateway. http://docs.spring.io/spring-integration/reference/html/ip.html#tcp-gateways

<bean id="authInterceptorFactory"
    class="org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
    <property name="interceptors">
        <array>
           <bean class="com.socketfetching.AuthConnectionInterceptorFactory">
               <constructor-arg value="Login Username:"/>
               <constructor-arg value="${socket.username}"/>
               <constructor-arg value="Password:"/>
               <constructor-arg value="${socket.password}"/>
           </bean>
        </array>
    </property>
</bean>

<bean id="lfSeserializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayLfSerializer"/>

<ip:tcp-connection-factory id="connectionFactory"
  type="client"
  host="${socket.url}"
  port="${socket.port}"
  single-use="false"
  so-keep-alive="true"
  interceptor-factory-chain="authInterceptorFactory"
  deserializer="lfSeserializer"
  serializer="lfSeserializer"
/>

<int:channel id="socketInitChannel"/>

<ip:tcp-inbound-gateway id="inGateway"
    request-channel="clientBytes2StringChannel"
    reply-channel="socketInitChannel"
    connection-factory="connectionFactory"
    reply-timeout="10000"
    retry-interval="5000"
    auto-startup="true"
    client-mode="true"/>

The InterceptorFactory handles the handshaking which occurs when the connection is opened, and takes as parameters the expected prompts and my desired responses. This handshaking works perfectly and my application is receiving its periodic ping from the server.

client-mode=true causes the gateway to open a connection immediately at startup and wait for the username prompt.

My issue is with recovery when my connection is lost. If I kill my network connection, obviously the pings stop coming and I would like my gateway to detect this and begin attempting to re-connect at regular intervals. When my network connection is restored, the gateway should successfully reconnect.

I thought that retry-interval might handle this, but it doesn't seem to have any effect. The docs suggest that I use a TaskScheduler for this purpose...but I'm not exactly sure how to integrate it with my ping message from the server.

Any advice?

EDIT: I found a solution that works, although I'm not sure it's ideal. The retry-interval on my gateway means that every 5 seconds the connection will be tested for alive and re-created if needed. It does this by calling isOpen() on my AuthConnectionInterceptor. So I was able to override this method to check the delta between current time and last message to come through the interceptor. If the time gap is too long I manually kill the connection and trigger a re-connect.

Full source for those classes follows... InterceptorFactory: package com.socketfetching;

import org.apache.log4j.Logger;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptor;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory;

/**
 * AuthConnectionInterceptorFactory
 * Created by: Seth Kelly
 * Date: 10/3/13
 */
public class AuthConnectionInterceptorFactory implements TcpConnectionInterceptorFactory {
    private static Logger logger = Logger.getLogger(AuthConnectionInterceptorFactory.class);

    private String usernamePrompt;
    private String username;
    private String passwordPrompt;
    private String password;

    public AuthConnectionInterceptorFactory(String usernamePrompt, String username, String passwordPrompt, String password) {
        this.usernamePrompt = usernamePrompt;
        this.username = username;
        this.passwordPrompt = passwordPrompt;
        this.password = password;
    }

    @Override
    public TcpConnectionInterceptor getInterceptor() {
        return new AuthConnectionInterceptor(usernamePrompt, username, passwordPrompt, password);
    }
}

Interceptor:

package com.socketfetching;

import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.springframework.integration.Message;
import org.springframework.integration.MessagingException;
import org.springframework.integration.ip.tcp.connection.AbstractTcpConnectionInterceptor;
import org.springframework.integration.support.MessageBuilder;

/**
 * AuthConnectionInterceptor
 * Created by: Seth Kelly
 * Date: 10/3/13
 *
 * Handles username/password authentication when opening a new TCP connection.
 */
public class AuthConnectionInterceptor extends AbstractTcpConnectionInterceptor {
    private static Logger logger = Logger.getLogger(AuthConnectionInterceptor.class);

    private String usernamePrompt;
    private String username;
    private String passwordPrompt;
    private String password;

    private Boolean usernameSent = false;
    private Boolean passwordSent = false;

    private static final String PING_PREFIX = "Ping";

    private DateTime lastMsgReceived;
    private Integer inactivityTimeout = 35000;

    public AuthConnectionInterceptor(String usernamePrompt, String username, String passwordPrompt, String password) {

        this.usernamePrompt = usernamePrompt;
        this.username = username;
        this.passwordPrompt = passwordPrompt;
        this.password = password;
    }

    @Override
    public boolean onMessage(Message<?> message) {
        lastMsgReceived = new DateTime();
        Boolean forwardMessage = true;

        if(!this.isServer()) {
            String payload = new String((byte[])message.getPayload());

            if(!usernameSent) {
                if(payload.equals(usernamePrompt))  {
                    try {
                        logger.debug("Sending username=" + username + "to authenticate socket.");
                        super.send(MessageBuilder.withPayload(username).build());
                        usernameSent = true;
                        forwardMessage = false;

                    } catch (Exception e) {
                        throw new MessagingException("Negotiation error", e);
                    }
                }
                else {
                    throw new MessagingException("Negotiation error.  expected message=" + usernamePrompt +
                            " actual message=" + payload);
                }
            }
            else if(!passwordSent) {
                if(payload.equals(passwordPrompt))  {
                    try {
                        logger.debug("Sending password to authenticate socket.");
                        super.send(MessageBuilder.withPayload(password).build());
                        passwordSent = true;
                        forwardMessage = false;

                    } catch (Exception e) {
                        throw new MessagingException("Negotiation error", e);
                    }
                }
                else {
                    throw new MessagingException("Negotiation error.  expected message=" + passwordPrompt +
                            " actual message=" + payload);
                }
            }
            else if(payload.startsWith(PING_PREFIX)) {
                //Just record that we received the ping.
                forwardMessage = false;
            }
        }

        if(forwardMessage)
            return super.onMessage(message);
        else
            return true;
    }

    @Override
    public boolean isOpen() {
        DateTime now = new DateTime();
        if((lastMsgReceived == null) ||
                ((now.getMillis() - lastMsgReceived.getMillis()) < inactivityTimeout)) {
            return super.isOpen();
        }
        else
        {
            if(super.isOpen()) {
                super.close();
            }
            return false;
        }
    }


    public Integer getInactivityTimeout() {
        return inactivityTimeout;
    }

    public void setInactivityTimeout(Integer inactivityTimeout) {
        this.inactivityTimeout = inactivityTimeout;
    }
}