Is my code thread-unsafe?

2019-04-19 11:20发布

问题:

I have wrote code to understand CyclicBarrier.

My application emulates election. Each rounds selects candidate with small votes and this candidate eliminates from the race for victory.

source:

class ElectoralCommission {
    public volatile boolean hasWinner;
    public volatile String winner;
    private List<String> candidates;
    private Map<String, Integer> results = new ConcurrentHashMap<>();

    ElectoralCommission(List<String> candidates) {
        this.candidates = candidates;
    }

    public void acceptVote(int index) {
        Integer currentResult = results.get(candidates.get(index));
        results.put(candidates.get(index), currentResult == null ? 1 : currentResult + 1);
    }

    public void clearResults() {
        results.clear();
    }

    public synchronized List<String> getCandidates() {
        return candidates;
    }

    public String getWinner() {
        return winner;
    }

    public boolean isHasWinner() {
        return hasWinner;
    }

    public void printResults() {
        System.out.println("result:");
        for (Map.Entry<String, Integer> entry : results.entrySet()) {
            System.out.println("key=" + entry.getKey() + ", value=" + entry.getValue());
        }
    }

    public void removeWeakCandidates() {
        int minVoteValue = Collections.min(results.values());
        int maxVoteValue = Collections.max(results.values());
        if (maxVoteValue == minVoteValue) {
            System.out.println("all candidates got same votes count");
        } else {
            List<String> loosers = results.entrySet().stream().filter(i -> i.getValue() == minVoteValue).map(i -> i.getKey()).collect(Collectors.toList());
            candidates.removeAll(loosers);
            if (candidates.size() == 1) {
                hasWinner = true;
                winner = candidates.get(0);
            }
        }
        clearResults();
    }
}

class Voter implements Runnable {
    ElectoralCommission electoralCommission;
    CyclicBarrier cyclicBarrier;

    Voter(CyclicBarrier cyclicBarrier, ElectoralCommission electoralCommission) {
        this.cyclicBarrier = cyclicBarrier;
        this.electoralCommission = electoralCommission;
    }

    @Override
    public void run() {
        while (!electoralCommission.hasWinner) {
            List<String> candidates = electoralCommission.getCandidates();
            int voteIndex = new Random().nextInt(candidates.size());
            electoralCommission.acceptVote(voteIndex);
            try {
                cyclicBarrier.await(); //wait while all voters vote
            } catch (InterruptedException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            } catch (BrokenBarrierException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            }
        }
    }
}

class Main {
    public static final int VOTE_COUNT = 10000;

    public static void main(String[] args) {
        List<String> candidates = new ArrayList<>();
        candidates.add("candidate_1");
        candidates.add("candidate_2");
        candidates.add("candidate_3");
        candidates.add("candidate_4");
        candidates.add("candidate_5");
        candidates.add("candidate_6");
        candidates.add("candidate_7");
        candidates.add("candidate_8");
        candidates.add("candidate_9");
        candidates.add("candidate_10");
        ElectoralCommission electoralCommission = new ElectoralCommission(candidates);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(VOTE_COUNT, new Summarizer(electoralCommission));
        for (int i = 0; i < VOTE_COUNT; i++) {
            new Thread(new Voter(cyclicBarrier, electoralCommission)).start();
        }

    }
}

class Summarizer implements Runnable {
    ElectoralCommission electoralCommission;

    Summarizer(ElectoralCommission electoralCommission) {
        this.electoralCommission = electoralCommission;
    }

    @Override
    public void run() {
        System.out.println("summarizer");
        electoralCommission.printResults();
        electoralCommission.removeWeakCandidates();
        if (electoralCommission.hasWinner) {
            String winnerName = electoralCommission.getWinner();
            System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
            System.out.println("candidate " + winnerName + " won!");
            System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        }
    }
}

sample output:

summarizer
result:
key=candidate_2, value=984
key=candidate_3, value=1004
key=candidate_1, value=1018
key=candidate_6, value=1002
key=candidate_7, value=969
key=candidate_4, value=1031
key=candidate_5, value=1038
key=candidate_10, value=915
key=candidate_8, value=1003
key=candidate_9, value=1034
summarizer
result:
key=candidate_2, value=1096
key=candidate_3, value=1133
key=candidate_1, value=1088
key=candidate_6, value=1144
key=candidate_7, value=1078
key=candidate_4, value=1136
key=candidate_5, value=1100
key=candidate_8, value=1147
key=candidate_9, value=1078
summarizer
result:
key=candidate_2, value=1429
key=candidate_3, value=1488
key=candidate_1, value=1430
key=candidate_6, value=1445
key=candidate_4, value=1410
key=candidate_5, value=1397
key=candidate_8, value=1399
summarizer
result:
key=candidate_2, value=1668
key=candidate_3, value=1697
key=candidate_1, value=1684
key=candidate_6, value=1661
key=candidate_4, value=1679
key=candidate_8, value=1610
summarizer
result:
key=candidate_2, value=2079
key=candidate_3, value=1954
key=candidate_1, value=2003
key=candidate_6, value=2022
key=candidate_4, value=1941
summarizer
result:
key=candidate_2, value=2504
key=candidate_3, value=2481
key=candidate_1, value=2500
key=candidate_6, value=2514
summarizer
result:
key=candidate_2, value=3299
key=candidate_1, value=3284
key=candidate_6, value=3417
summarizer
result:
key=candidate_2, value=4961
key=candidate_6, value=5036
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
candidate candidate_6 won!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

As you can see my last summarizer invocation output following:

summarizer
result:
key=candidate_2, value=4961
key=candidate_6, value=5036
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
candidate candidate_6 won!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

I have 10000 voters but 4961 + 5036 = 9997.

Which lost 3 votes ?

P.S.

I tryed solution with locks:

class ElectoralCommission {
    public volatile boolean hasWinner;
    public volatile String winner;
    private List<String> candidates;
    private Map<String, AtomicInteger> results = new ConcurrentHashMap<>();

    ElectoralCommission(List<String> candidates) {
        this.candidates = candidates;
    }

    public void acceptVote(int index) {
        AtomicInteger currentResult = results.get(candidates.get(index));
       if(currentResult!=null){
           currentResult.incrementAndGet();
       }
    }

    public void clearResults() {
        results.clear();
    }

    public synchronized List<String> getCandidates() {
        return candidates;
    }

    public String getWinner() {
        return winner;
    }

    public boolean isHasWinner() {
        return hasWinner;
    }

    public void printResults() {
        System.out.println("result:");
        for (Map.Entry<String, AtomicInteger> entry : results.entrySet()) {
            System.out.println("key=" + entry.getKey() + ", value=" + entry.getValue());
        }
    }

    public void removeWeakCandidates() {
        int minVoteValue = Collections.min(results.values().stream().map(i->i.intValue()).collect(Collectors.toList()));
        int maxVoteValue = Collections.max(results.values().stream().map(i->i.intValue()).collect(Collectors.toList()));
        if (maxVoteValue == minVoteValue) {
            System.out.println("all candidates got same votes count");
        } else {
            List<String> loosers = results.entrySet().stream().filter(i -> i.getValue().intValue() == minVoteValue).map(i -> i.getKey()).collect(Collectors.toList());
            candidates.removeAll(loosers);
            if (candidates.size() == 1) {
                hasWinner = true;
                winner = candidates.get(0);
            }
        }
        clearResults();
    }
}

class Voter implements Runnable {
    ElectoralCommission electoralCommission;
    CyclicBarrier cyclicBarrier;

    Voter(CyclicBarrier cyclicBarrier, ElectoralCommission electoralCommission) {
        this.cyclicBarrier = cyclicBarrier;
        this.electoralCommission = electoralCommission;
    }

    @Override
    public void run() {
        while (!electoralCommission.hasWinner) {
            List<String> candidates = electoralCommission.getCandidates();
            int voteIndex = new Random().nextInt(candidates.size());
            electoralCommission.acceptVote(voteIndex);
            try {
                cyclicBarrier.await(); //wait while all voters vote
            } catch (InterruptedException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            } catch (BrokenBarrierException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            }
        }
    }
}

class Main {
    public static final int VOTE_COUNT = 10000;

    public static void main(String[] args) {
        List<String> candidates = new ArrayList<>();
        candidates.add("candidate_1");
        candidates.add("candidate_2");
        candidates.add("candidate_3");
        candidates.add("candidate_4");
        candidates.add("candidate_5");
        candidates.add("candidate_6");
        candidates.add("candidate_7");
        candidates.add("candidate_8");
        candidates.add("candidate_9");
        candidates.add("candidate_10");
        ElectoralCommission electoralCommission = new ElectoralCommission(candidates);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(VOTE_COUNT, new Summarizer(electoralCommission));
        for (int i = 0; i < VOTE_COUNT; i++) {
            new Thread(new Voter(cyclicBarrier, electoralCommission)).start();
        }

    }
}

class Summarizer implements Runnable {
    ElectoralCommission electoralCommission;

    Summarizer(ElectoralCommission electoralCommission) {
        this.electoralCommission = electoralCommission;
    }

    @Override
    public void run() {
        System.out.println("summarizer");
        electoralCommission.printResults();
        electoralCommission.removeWeakCandidates();
        if (electoralCommission.hasWinner) {
            String winnerName = electoralCommission.getWinner();
            System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
            System.out.println("candidate " + winnerName + " won!");
            System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        }
    }
}

but I see

java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at lection2.task3.Voter.run(Main.java:88)

line 88: cyclicBarrier.await();

回答1:

It seems error in method acceptVote(int index)

Try to rewrite it use method ConcurrentHashMap.putIfAbsent and AtomicInteger:

private final ConcurrentHashMap<String, AtomicInteger> results = new ConcurrentHashMap<>();

public void acceptVote(int index) {
    AtomicInteger currentResult = results.get(candidates.get(index));
    if (currentResult == null) {
        currentResult = results.putIfAbsent(candidates.get(index), new AtomicInteger(1));
        if (currentResult != null) {
            currentResult.getAndIncrement();
        }
    } else {
        currentResult.getAndIncrement();
    }
}

something like this...



回答2:

I tried several times with a synchronized list for candidates, at the results are consistent and as expected:

class Main {
    public static final int VOTE_COUNT = 10000;

    public static void main(String[] args) {
        List<String> candidates = Collections.synchronizedList(new ArrayList<>());
      .....
}

Not the best solution though.

A better solution would be to synchronize just the access to accepted voters. This can be done:

class ElectoralCommission {
  //....

    public synchronized void acceptVote(int index) {
        Integer currentResult = results.get(candidates.get(index));
        results.put(candidates.get(index), currentResult == null ? 1 : currentResult + 1);
}

but this is not the best way, as it will add lock the whole object. A better approach would be using a lock:

    class ElectoralCommission {
  //....
        private ReentrantLock lock = new ReentrantLock(); 

        public void acceptVote(int index) {
            try {
               lock.lock();
               Integer currentResult = results.get(candidates.get(index));
               results.put(candidates.get(index), currentResult == null ? 1 : currentResult + 1);
           }finally {
               lock.unlock();
           }
        }

Output 1:

result:
key=candidate_6, value=3315
key=candidate_7, value=3315
key=candidate_10, value=3370
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
candidate candidate_10 won!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!


回答3:

Building a bit on Sergey's answer, if you seed the results collection in the ElectoralCommission constructor:

ElectoralCommission(List<String> candidates) {
    this.candidates = candidates;
    for (String candidate : candidates) {
        results.put(candidate, new AtomicInteger());
    }
}

Then acceptVote becomes:

public void acceptVote(int index) {
    results.get(candidates.get(index)).getAndIncrement();
}


回答4:

The following variants works for atomics:

class ElectoralCommission {
    public volatile boolean hasWinner;
    public volatile String winner;
    private List<String> candidates;
    private Map<String, AtomicInteger> results = new ConcurrentHashMap<>();

    private Map<String, AtomicInteger> initializeMap() {
        candidates.stream().forEach(i -> results.put(i, new AtomicInteger()));
        return results;
    }

    ElectoralCommission(List<String> candidates) {
        this.candidates = candidates;
        results = initializeMap();
    }

    public void acceptVote(int index) {
        results.get(candidates.get(index)).incrementAndGet();
    }

    public void clearResults() {
        results.clear();
        initializeMap();
    }

    public List<String> getCandidates() {
        return candidates;
    }

    public String getWinner() {
        return winner;
    }

    public void printResults() {
        System.out.println("result:");
        for (Map.Entry<String, AtomicInteger> entry : results.entrySet()) {
            System.out.println("key=" + entry.getKey() + ", value=" + entry.getValue());
        }
    }

    public void removeWeakCandidates() {
        int minVoteValue = Collections.min(results.values().stream().map(i->i.intValue()).collect(Collectors.toList()));
        int maxVoteValue = Collections.max(results.values().stream().map(i->i.intValue()).collect(Collectors.toList()));
        if (maxVoteValue == minVoteValue) {
            System.out.println("all candidates got same votes count");
        } else {
            List<String> loosers = results.entrySet().stream().filter(i -> i.getValue().intValue() == minVoteValue).map(i -> i.getKey()).collect(Collectors.toList());
            candidates.removeAll(loosers);
            if (candidates.size() == 1) {
                hasWinner = true;
                winner = candidates.get(0);
            }
        }
        clearResults();
    }
}

class Voter implements Runnable {
    ElectoralCommission electoralCommission;
    CyclicBarrier cyclicBarrier;

    Voter(CyclicBarrier cyclicBarrier, ElectoralCommission electoralCommission) {
        this.cyclicBarrier = cyclicBarrier;
        this.electoralCommission = electoralCommission;
    }

    @Override
    public void run() {
        while (!electoralCommission.hasWinner) {
            List<String> candidates = electoralCommission.getCandidates();
            int voteIndex = new Random().nextInt(candidates.size());
            electoralCommission.acceptVote(voteIndex);
            try {
                cyclicBarrier.await(); //wait while all voters vote
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

class Main {
    public static final int VOTE_COUNT = 10000;

    public static void main(String[] args) {
        List<String> candidates = new ArrayList<>();
        candidates.add("candidate_1");
        candidates.add("candidate_2");
        candidates.add("candidate_3");
        candidates.add("candidate_4");
        candidates.add("candidate_5");
        candidates.add("candidate_6");
        candidates.add("candidate_7");
        candidates.add("candidate_8");
        candidates.add("candidate_9");
        candidates.add("candidate_10");
        ElectoralCommission electoralCommission = new ElectoralCommission(candidates);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(VOTE_COUNT, new Summarizer(electoralCommission));
        for (int i = 0; i < VOTE_COUNT; i++) {
            new Thread(new Voter(cyclicBarrier, electoralCommission)).start();
        }

    }
}

class Summarizer implements Runnable {
    static int roundNumber = 1;
    ElectoralCommission electoralCommission;

    Summarizer(ElectoralCommission electoralCommission) {
        this.electoralCommission = electoralCommission;
    }

    @Override
    public void run() {
        System.out.println("Round " + roundNumber++ );
        electoralCommission.printResults();
        electoralCommission.removeWeakCandidates();
        if (electoralCommission.hasWinner) {
            String winnerName = electoralCommission.getWinner();
            System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
            System.out.println("candidate " + winnerName + " won!");
            System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        }
    }
}