A lot of our code is legacy but we are moving to a "Big Data" back-end and I'm trying to evangelize the newer API calls, encourage the use of the latest Spring libraries etc. One of our problems is application layer ID generation. For reasons I don't understand, a higher authority wants sequential BigInteger's. I would have made them random with re-generate and re-try on failed insertions but I done got vetoed.
Grumbling aside, I'm in a position where I need to increment and get a BigInteger across threads and do it in a safe and performant manner. I've never used AtomicReference before but it looks pretty close to perfect for this application. Right now we have a synchronized code block which hurts our performance pretty badly.
Is this the right way to go? Syntax examples?
I should mention that the way this module works, it hits the database using a Stored Procedure to grab a range of values to use. Tens of thousands at a time so that it only happens maybe once in 20 minutes. This keeps the various servers from stepping on each-other but it also adds the wrinkle of having to set the BigInteger to an arbitrarily subsequent value. Of course, that needs to be thread safe also.
P.S. I still think my random generation idea is better than handling all this threading stuff. A BigInteger is a ridiculously large number and the odds of ever generating the same one twice have to be close to nil.
It is possible using AtomicReference here's a quick draft :
public final class AtomicBigInteger {
private final AtomicReference<BigInteger> valueHolder = new AtomicReference<>();
public AtomicBigInteger(BigInteger bigInteger) {
valueHolder.set(bigInteger);
}
public BigInteger incrementAndGet() {
for (; ; ) {
BigInteger current = valueHolder.get();
BigInteger next = current.add(BigInteger.ONE);
if (valueHolder.compareAndSet(current, next)) {
return next;
}
}
}
}
It is basically a copy of the AtomicLong code for incrementAndGet()
This becomes more manageable and easier to understand using the accumulateAndGet
or getAndAccumulate
introduced in Java 8. These allow you to atomically update the value by supplying an accumulator function that sets the value to the result of the function, and also either returns the previous or calculated result depending on what you need. Here is an example of what that class might look like, followed by a simple example I wrote up that uses it:
import java.math.BigInteger;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
public final class AtomicBigInteger {
private final AtomicReference<BigInteger> bigInteger;
public AtomicBigInteger(final BigInteger bigInteger) {
this.bigInteger = new AtomicReference<>(Objects.requireNonNull(bigInteger));
}
// Method references left out for demonstration purposes
public BigInteger incrementAndGet() {
return bigInteger.accumulateAndGet(BigInteger.ONE, (previous, x) -> previous.add(x));
}
public BigInteger getAndIncrement() {
return bigInteger.getAndAccumulate(BigInteger.ONE, (previous, x) -> previous.add(x));
}
public BigInteger get() {
return bigInteger.get();
}
}
An example using it:
import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ABIExample {
private static final int AVAILABLE_PROCS = Runtime.getRuntime().availableProcessors();
private static final int INCREMENT_AMOUNT = 2_500_000;
private static final int TASK_AMOUNT = AVAILABLE_PROCS * 2;
private static final BigInteger EXPECTED_VALUE = BigInteger.valueOf(INCREMENT_AMOUNT)
.multiply(BigInteger
.valueOf(TASK_AMOUNT));
public static void main(String[] args)
throws InterruptedException, ExecutionException {
System.out.println("Available processors: " + AVAILABLE_PROCS);
final ExecutorService executorService = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
final AtomicBigInteger atomicBigInteger = new AtomicBigInteger(BigInteger.ZERO);
final List<Callable<Void>> incrementTasks = IntStream.rangeClosed(1, TASK_AMOUNT)
.mapToObj(i -> incrementTask(i, atomicBigInteger))
.collect(Collectors.toList());
final List<Future<Void>> futures = executorService.invokeAll(incrementTasks);
for (Future<Void> future : futures) {
future.get();
}
executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
System.out.println("Final value: " + atomicBigInteger.get());
final boolean areEqual = EXPECTED_VALUE.equals(atomicBigInteger.get());
System.out.println("Does final value equal expected? - " + areEqual);
}
private static Callable<Void> incrementTask(
final int taskNumber,
final AtomicBigInteger atomicBigInteger
) {
return () -> {
for (int increment = 0; increment < INCREMENT_AMOUNT; increment++) {
atomicBigInteger.incrementAndGet();
}
System.out.println("Task #" + taskNumber + " Completed");
return null;
};
}
}
And an output from running the example on my machine:
Available processors: 8
Task #3 Completed
Task #8 Completed
Task #7 Completed
Task #6 Completed
Task #5 Completed
Task #2 Completed
Task #4 Completed
Task #1 Completed
Task #9 Completed
Task #10 Completed
Task #11 Completed
Task #13 Completed
Task #16 Completed
Task #12 Completed
Task #14 Completed
Task #15 Completed
Final value: 80000000
Does final value equal expected? - true