How to make sure that I am not sharing same socket

2020-02-05 11:31发布

问题:

I have a code in which I am dealing with sockets and I need to make sure that I don't share same socket between two threads. In my below code, I have a background thread which runs every 60 seconds and calls updateLiveSockets() method. In the updateLiveSockets() method, I iterate all the sockets I have and then start pinging them one by one by calling send method of SendToQueue class and basis on the response I mark them as live or dead.

Now all the reader threads will call getNextSocket() method concurrently to get the next live available socket so it has to be thread safe and I need to make sure all the reader threads should see the same consitent state of SocketHolder and Socket.

Below is my SocketManager class:

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
      new ConcurrentHashMap<>();
  private final ZContext ctx = new ZContext();

  // ...

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(this::updateLiveSockets, 60, 60, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
      liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketHolder> connect(List<String> paddes, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    // ....
    return socketList;
  }

  // this method will be called by multiple threads concurrently to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!CollectionUtils.isEmpty(listOfEndPoints)) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
      }
    }
    return Optional.absent();
  }

  // runs every 60 seconds to ping all the socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean status = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
        boolean isLive = (status) ? true : false;

        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }
}

And here is my SendToQueue class:

  // this method will be called by multiple threads concurrently to send the data
  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
    PendingMessage m = new PendingMessage(address, encodedRecords, liveSockets.get().getSocket(), true);
    cache.put(address, m);
    return doSendAsync(m, socket);
  }

  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A
      return msg.send(socket);
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      // Alternatively (checks that address points to m):
      // cache.asMap().remove(address, m);
      cache.invalidate(address);
    }
  }

Problem Statement

Now as you can see that I am sharing same socket between two threads. It seems getNextSocket() could return a 0MQ socket to thread A. Concurrently, the timer thread may access the same 0MQ socket to ping it. In this case thread A and the timer thread are mutating the same 0MQ socket, which can lead to problems. So I am trying to find a way so that I can prevent different threads from sending data to the same socket at the same time and mucking up my data.

So I decided to synchronize the socket so that no two threads can access the same socket at the same time. Below is the change I made in updateLiveSockets method. I synchronized on the socket in below method:

  // runs every 60 seconds to ping all the socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // using the socket as its own lock
        synchronized (socket) {
            // pinging to see whether a socket is live or not
            boolean status = SendToQueue.getInstance().execute(message.getAddress(), message.getEncodedRecords(), socket);
            boolean isLive = (status) ? true : false;

            SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
            liveUpdatedSockets.add(zmq);
        }
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }

And below is the change I made in doSendAsync method. In this also I synchronized on socket before sending on it.

  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A by synchronizing on it
      synchronized (socket) {
        return msg.send(socket);
      }
    } finally {
      msg.destroy();
    }
  }

What is the best way by which I can make sure that I am not sharing same sockets between two threads? In general I have around 60 sockets and 20 threads accessing those sockets.

If many threads use the same socket, resources aren't well utilized. Moreover If msg.send(socket); is blocked (technically it shouldn't) all threads waiting for this socket are blocked. So I guess there might be a better way to ensure that every thread uses a different single live socket at the same time instead of synchronization on a particular socket. Also is there any corner case or edge case that I have missed which can lead to some bug?

回答1:

First of all you need a way for clients to notify you that they're done using a Socket. You could add a method that allows them to signal this. That's legit and it will work, but you'll have to rely on your clients to be well behaved. Or rather that the programmer using your socket doesn't forget to return it. There's a pattern that helps address this : the execute around pattern. Rather than giving out a Socket, you make a method that accepts a Consumer<Socket>, and then executes the consumer, and does the returning of the Socket itself.

public void useSocket(Consumer<Socket> socketUser) {
    Socket socket = getSocket();
    try {
        socketUser.accept(socket);
    } finally {
        returnSocket(socket);
    }
}

Now let's look at how we're going to implement getSocket() and returnSocket(). Clearly it involves getting them from some sort of collection, and returning them back to that collection. A Queue is a good choice here (as others have also noted). It allows getting it from one side, and returning on the other, plus there are plenty of efficient thread safe implementations, and takers and adders are typically not in contention with one another. Since you know the number of sockets beforehand, I'd opt for an ArrayBlockingQueue.

An additional concern here is that your implementation returns an Optional. I'm not sure what your clients will do if there is no available Socket, but if it is waiting and retrying, I'd suggest you simply make getSocket() blocking on the queue. As it is, I'll respect this aspect of your approach, and take into account that there may not have been a Socket available. For the execute around approach, this'll translate this into the useSocket() method returning false if no Socket was available.

private final BlockingQueue<Socket> queue;

public SocketPool(Set<Socket> sockets) {
    queue = new ArrayBlockingQueue<>(sockets.size());
    queue.addAll(sockets);
}

public boolean useSocket(Consumer<Socket> socketUser) throws InterruptedException {
    Optional<Socket> maybeSocket = getSocket();
    try {
        maybeSocket.ifPresent(socketUser);
        return maybeSocket.isPresent();
    } finally {
        maybeSocket.ifPresent(this::returnSocket);
    }
}

private void returnSocket(Socket socket) {
    queue.add(socket);
}

private Optional<Socket> getSocket() throws InterruptedException {
    return Optional.ofNullable(queue.poll());
}

There, that's it, that's your SocketPool.

Ah, but then the stingy bit : the checking for liveness. It's stingy because your liveness check actually competes with your regular clients.

In order to address this, I suggest the following : let your clients report whether the Socket they got was live or not. Since checking for liveness comes down to using the Socket, this should be straightforward for your clients.

So instead of a Consumer<Socket>, we'll take a Function<Socket, Boolean>. And if the function returns false, we'll consider the Socket to be no longer live. In that case, rather than adding it back to the regular queue, we add it to a collection of dead Sockets, and we'll have a scheduled task, that rechecks the dead sockets intermittently. As this happens on a separate collection, the scheduled checking does not compete with regular clients any more.

Now you can make a SocketManager with a Map that maps data centers to SocketPool instances. This map doesn't need to change, so you can make it final and initialize it in the SocketManager's constructor.

This is my preliminary code for SocketPool (untested) :

class SocketPool implements AutoCloseable {

    private final BlockingQueue<Socket> queue;
    private final Queue<Socket> deadSockets = new ConcurrentLinkedQueue<>();
    private final ScheduledFuture<?> scheduledFuture;

    public SocketPool(Set<Socket> sockets, ScheduledExecutorService scheduledExecutorService) {
        queue = new ArrayBlockingQueue<>(sockets.size());
        queue.addAll(sockets);
        scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::recheckDeadSockets, 60, 60, TimeUnit.SECONDS);
    }

    public boolean useSocket(Function<Socket, Boolean> socketUser) throws InterruptedException {
        Optional<Socket> maybeSocket = getSocket();
        boolean wasLive = true;
        try {
            wasLive = maybeSocket.map(socketUser).orElse(false);
            return wasLive && maybeSocket.isPresent();
        } finally {
            boolean isLive = wasLive;
            maybeSocket.ifPresent(socket -> {
                if (isLive) {
                    returnSocket(socket);
                } else {
                    reportDead(socket);
                }
            });
        }
    }

    private void reportDead(Socket socket) {
        deadSockets.add(socket);
    }

    private void returnSocket(Socket socket) {
        queue.add(socket);
    }

    private Optional<Socket> getSocket() throws InterruptedException {
        return Optional.ofNullable(queue.poll());
    }

    private void recheckDeadSockets() {
        for (int i = 0; i < deadSockets.size(); i++) {
            Socket socket = deadSockets.poll();
            if (checkAlive(socket)) {
                queue.add(socket);
            } else {
                deadSockets.add(socket);
            }
        }
    }

    private boolean checkAlive(Socket socket) {
        // do actual live check with SendSocket class, or implement directly in this one
        return true;
    }

    @Override
    public void close() throws Exception {
        scheduledFuture.cancel(true);
    }
}


回答2:

I would say this code has several issues:

  1. getLiveSocket() can return the same socket for multiple threads.
  2. java.util.Random doesn't work well with multiple threads.
  3. Snapshot of live sockets in getNextSocket() can be stale because of concurrent invocation of updateLiveSockets() method which modifies that snapshot.
  4. If connectToZMQSockets() doesn't check liveness of sockets there are no live sockets for 60 seconds because of delay in scheduleAtFixedRate method.

In addition there is no flag for checking whether socket is in use and it's unclear whether socket returns back to pool after a thread finishes its work with it.

Consider to simplify the code in the following way:

  1. Your classes have cyclic references to each other, for me it's a signal that there should be only single class.
  2. I don't think it make sense to periodically check whether all sockets are alive cause it doesn't guarantee that state of socket won't change after check and before real send, better strategy is to verify a particular socket if send to it was failed.
  3. It's better to confine socket management in thread safe data structure instead for use of explicit locks, for example in a blocking queue. Such strategy allows to well utilize all available sockets.

Here is a code sample:

public class SendToSocket {
  private final BlockingQueue<Socket> queue;

  public SendToSocket() {
    this.queue = new LinkedBlockingQueue<>();
    // collect all available sockets
    List<Socket> sockets = new ArrayList<>();
    for (Socket socket : sockets) {
      queue.add(socket);
    }
  }

  public boolean send(final byte[] reco) throws InterruptedException {
    // can be replaced with poll() method
    Socket socket = queue.take();
    // handle exceptions if needed
    boolean status = sendInternal(socket, reco);
    if (!status) {
      // check whether socket is live
      boolean live = ping(socket);
      if (!live) {
        // log error
        return status;
      }
    }
    // return socket back to pool
    queue.add(socket);
    return status;
  }

  private boolean sendInternal(Socket socket, byte[] reco) {
    return true;
  }

  private boolean ping(Socket socket) {
    return true;
  }
}


回答3:

As I explained in your other question the best solution for your problem is to use ConcurrentQueue

For example this is how would you remove sockets that are not alive and keep the ones that are alive.

    private final Map<Datacenters, ConcurrentLinkedQueue<SocketHolder>> liveSocketsByDatacenter =
          new ConcurrentHashMap<>();

//fill up the Queue and the Map

// runs every 60 seconds to ping 70 sockets the socket to make sure whether they are alive or not (it does not matter if you ping more sockets than there are in the list because you are rotating the que)
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
    Queue<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      for (int i = 0; i<70; i++) {
            SocketHolder s = liveSockets.poll();
        Socket socket = s.getSocket();
        String endpoint = s.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
        boolean isLive = (status) ? true : false;

        SocketHolder zmq = new SocketHolder(socket, s.getContext(), endpoint, isLive);
        liveSockets.add(zmq);
      }
    }
  }


回答4:

You don't need to lock the refresh of the active state since updating a boolean is an atomic operation. Just refresh active periodically in a background thread after checking it out of the pool. You may want to add a timestamp on the Socket instance level for when a message gets sent, so you can wait an additional 30 seconds to ping. An unused socket would need to ping sooner than a used socket.

I guess the idea is that the synchronized block reads 2 booleans and sets a boolean, so it should return immediately. You don't want to send while synchronized because it will block other threads for a really long time.

Any type of synchronization I have ever seen really only requires the synchronization of two or three atomic operations.

boolean got = false;

synchronized(obj) {
    if(alive && available) {
        available = false;
        got = true;
    }
}

if(got) {
    ...               // all thread safe because available must be false
    available = true; // atomic, no need to synchronize when done
}

Usually you can figure out a way to avoid synchronization altogether by being careful about the order that you make atomic updates.

For example, you could probably get this to work with no synchronization at all by using a map instead of a list to store the sockets. I'd have to think about it, but you can probably make this thread safe with absolutely no synchronization and then it would be much faster.

I would never consider using a thread-safe Collection like Hashtable instead HashMap.

class Socket {
    Socket() {
        alive = true;
        available = true;
        last = System.currentTimeMillis();
    }

    private synchronized boolean tryToGet() {
        // should return pretty fast - only 3 atomic operations

        if(alive && available) {
            available = false;
            return true;
        }

        return false;
    }

    public boolean send() {
        if(tryToGet()) {
            // do it
            last = System.currentTimeMillis();
            available = true; // no need to lock atomic operation
            return true;
        }

        return false;
    }

    private boolean ping() { // ... }

    public void pingIfNecessary() {
        // long update may not be atomic, cast to int if not

        if(alive && (System.currentTimeMillis() - last) > 30000) {
            if(tryToGet()) {
                // other pingIfNecessary() calls have to wait

                if(ping()) {
                    last = System.currentTimeMillis();
                } else {
                    alive = false;
                }

                available = true;
            }
        }
    }

    private boolean alive;
    private boolean available;
    private long last;
};

void sendUsingPool(String s) {    
    boolean sent = false;

    while(!sent) {
        for(Socket socket : sockets) {
            if(socket.send(s)) {
                sent = true;
                break;
            }
        }

        if(!sent) {    
            // increase this number if you want to be nicer
            try { Thread.sleep(1); } catch (Exception e) { }
        }
    }
}

public void run() {
    while(true) {
        for(Socket socket : sockets) {
            socket.pingIfNecessary();
        }

        try { Thread.sleep(100); } catch (Exception e) { }
    }
}