Synchronise concurrent requests to share results o

2019-05-23 01:07发布

问题:

I have a Java UI service that has an API method that invokes an operation that's relatively slow (say ~30secs). The operation is parameterless, but it operates on external data that does change (relatively slowly) over time. It's not critical for the method to return the most up-to-date results - if they're 30secs old that's acceptable.

Ultimately I need to optimise the implementation of the slow operation, but as a short-term fix, I'd like to make the operation mutually exclusive, such that if a second incoming request (on a separate thread) attempts to invoke the operation while another is already in progress, then the second one blocks until the first one completes. The second thread then uses the results of the first invocation of the operation - i.e. it doesn't attempt to run the operation again.

E.g.:

class MyService {
    String serviceApiMmethod() {
       // If a second thread attempts to call this method while another is in progress
       // then block here until the first returns and then use those results
       // (allowing it to return immediately without a second call to callSlowOperation).
       return callSlowOperation();
    }
}

What's the preferred general approach for this in Java (8). I'm guessing I could use a CountDownLatch, but it's not clear how best to share the result across the threads. Is there an existing concurrency primitive that facilitates this?

EDIT: I need to clear any references to the result once all threads have consumed it (i.e. returned it to the caller), as it's relatively large object, which needs to be GC'ed as soon as possible.

回答1:

Simple idea

Version 1:

class Foo {
    public String foo() throws Exception {
        synchronized (this) {
            if (counter.incrementAndGet() == 1) {
                future = CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(1000 * (ThreadLocalRandom.current().nextInt(3) + 1));
                    } catch (InterruptedException e) {
                    }
                    return "ok" + ThreadLocalRandom.current().nextInt();
                });
            }
        }

        String result = future.get();
        if (counter.decrementAndGet() == 0) {
            future = null;
        }

        return result;
    }

    private AtomicInteger counter = new AtomicInteger();
    private Future<String> future;
}

Version 2: together with @AleksandrSemyannikov

public class MyService {
    private AtomicInteger counter = new AtomicInteger();
    private volatile String result;

    public String serviceApiMethod() {
        counter.incrementAndGet();
        try {
            synchronized (this) {
                if (result == null) {
                    result = callSlowOperation();
                }
            }
            return result;
        } finally {
            if (counter.decrementAndGet() == 0) {
                synchronized (this) {
                    if (counter.get() == 0) {
                        result = null;
                    }
                }
            }
        }
    }

    private String callSlowOperation() {
        try {
            Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Thread.currentThread().getName();
    }
}


回答2:

ReentrantReadWriteLock will be easier to use:

class MyService {

  String result;
  ReadWriteLock lock = new ReentrantReadWriteLock(); 

  String serviceApiMmethod() {
    lock.readLock().lock();
    try {
      if (result == null || staleResult()) {
        lock.readLock().unlock();
        lock.writeLock().lock();
        try {
          if (result == null || staleResult()) {
            result = callSlowOperation();
          }
        } finally {
          lock.writeLock().unlock();
          lock.readLock().lock();
        }
      }
      return result;
    } finally {
       lock.readLock().unlock();
    }
  }
}

Here, read lock protects against reading stale state, and write lock protects against performing "slow operation" by multiple threas simulateneously.



回答3:

Another approach (which I think might be even better) would be to add all threads that ask for result in a synchronized collection. Then when the result arrives - send the responses back to the threads. You can use the java 8 functional interface consumer to make it more fancy. It will not waste CPU time (like with thread.sleep or even with countDownLatch and other modern java concurrent classes). It would require these thread to have a callback method to accept the result but it might even make your code easier to read:

class MyService {
    private  static volatile boolean isProcessing;
    private synchronized static  boolean  isProcessing() {
        return isProcessing;
    }
    private static Set<Consumer<String>> callers=Collections.synchronizedSet(new HashSet<>());

    void serviceApiMmethod(Consumer<String> callBack) {
       callers.add(callBack);
       callSlowOperation();
    }

    private synchronized static  void callSlowOperation() {
        if(isProcessing())
            return;
        isProcessing=true;
        try { Thread.sleep(1000); }catch (Exception e) {}//Simulate slow operation
        final String result="slow result";
        callers.forEach(consumer-> consumer.accept(result));
        callers.clear();
        isProcessing=false;

    }
}

And the calling threads:

class ConsumerThread implements Runnable{
    final int threadNumber;
    public ConsumerThread(int num) {
        this.threadNumber=num;

    }
    public void processResponse(String response) {
        System.out.println("Thread ["+threadNumber+"] got response:"+response+" at:"+System.currentTimeMillis());
    }

    @Override
    public void run() {
            new MyService().serviceApiMmethod(this::processResponse);
    }


}

This way the resulting object will be garbage collected because the all consumers will get it right away and release it.

And to test the results:

public class Test{
    public static void main(String[] args) {
        System.out.println(System.currentTimeMillis());
        for(int i=0;i<5;i++) {
            new Thread(new ConsumerThread(i)).start();
        }
    }
}

And the result:

1542201686784
Thread [1] got response:slow result at:1542201687827
Thread [2] got response:slow result at:1542201687827
Thread [3] got response:slow result at:1542201687827
Thread [0] got response:slow result at:1542201687827
Thread [4] got response:slow result at:1542201687827

All threads got their result after 1 second. Kind of reactive programming ;) It does change the way into a more asynchronous way but if the caller of the thread wants to block the execution while getting the result he can achieve it. The service basically is self explanatory about what is done. It's like saying "my operation is slow so instead of running the call for each of you callers, I will send you the result once I am ready - give me a consumer method"



回答4:

As a solution you can use something like this:

public class MyService {

    private volatile ResultHolder holder;

    public String serviceApiMethod() {
        if (holder != null && !isTimedOut(holder.calculated)) {
            return holder.result;
        }
        synchronized (this) {
            if (holder != null && !isTimedOut(holder.calculated)) {
                return holder.result;
            }
            String result = callSlowOperation();
            holder = new ResultHolder(result, LocalDateTime.now());
            return result;
        }
    }

    private static class ResultHolder {
        private String result;
        private LocalDateTime calculated;

        public ResultHolder(String result, LocalDateTime calculated) {
            this.result = result;
            this.calculated = calculated;
        }
    }
}

Note that MyService must be singleton and ResultHolder must be immutable