Prove me that PublishSubject in RxJava is not thre

2019-09-21 18:34发布

问题:

It is being declared that PublishSubject is not thread safe in RxJava. Ok.

I'm trying to find any example, I'm trying to construct any example to emulate race condition, that leads to unwanted results. But I can't :(

Can anyone provide an example proving that PublishSubject is not thread safe?

回答1:

Usually, people ask why their setup behaves unexpectedly and/or crashes and the answer is: because they call the onXXX methods on the Subject concurrently:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Scheduler.Worker;
import rx.exceptions.MissingBackpressureException;
import rx.observers.AssertableSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.*;

public class PublishSubjectRaceTest {

    @Test
    public void racy() throws Exception {
        Worker worker = Schedulers.computation().createWorker();
        try {
            for (int i = 0; i < 1000; i++) {
                AtomicInteger wip = new AtomicInteger(2);

                PublishSubject<Integer> ps = PublishSubject.create();

                AssertableSubscriber<Integer> as = ps.test(1);

                CountDownLatch cdl = new CountDownLatch(1);

                worker.schedule(() -> {
                    if (wip.decrementAndGet() != 0) {
                        while (wip.get() != 0) ;
                    }
                    ps.onNext(1);

                    cdl.countDown();
                });
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0) ;
                }
                ps.onNext(1);

                cdl.await();

                as.assertFailure(MissingBackpressureException.class, 1);
            }
        } finally {
            worker.unsubscribe();
        }
    }

    @Test
    public void nonRacy() throws Exception {
        Worker worker = Schedulers.computation().createWorker();
        try {
            for (int i = 0; i < 1000; i++) {
                AtomicInteger wip = new AtomicInteger(2);

                Subject<Integer, Integer> ps = PublishSubject.<Integer>create()
                    .toSerialized();

                AssertableSubscriber<Integer> as = ps.test(1);

                CountDownLatch cdl = new CountDownLatch(1);

                worker.schedule(() -> {
                    if (wip.decrementAndGet() != 0) {
                        while (wip.get() != 0) ;
                    }
                    ps.onNext(1);

                    cdl.countDown();
                });
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0) ;
                }
                ps.onNext(1);

                cdl.await();

                as.assertFailure(MissingBackpressureException.class, 1);
            }
        } finally {
            worker.unsubscribe();
        }
    }
}


回答2:

I've found the proof. I think this example more obvious then @akarnokd provided.

    AtomicInteger counter = new AtomicInteger();

    // Thread-safe
    // SerializedSubject<Object, Object> subject = PublishSubject.create().toSerialized();

    // Not Thread Safe
    PublishSubject<Object> subject = PublishSubject.create();

    Action1<Object> print = (x) -> System.out.println(Thread.currentThread().getName() + " " + counter);

    Consumer<Integer> sleep = (s) -> {
        try {
            Thread.sleep(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    subject
            .doOnNext(i -> counter.incrementAndGet())
            .doOnNext(i -> counter.decrementAndGet())
            .doOnNext(print)
            .filter(i -> counter.get() != 0)
            .doOnNext(i -> {
                        throw new NullPointerException("Concurrency detected");
                    }
            )
            .subscribe();

    Runnable r = () -> {
        for (int i = 0; i < 100000; i++) {
            sleep.accept(1);
            subject.onNext(i);
        }
    };

    ExecutorService pool = Executors.newFixedThreadPool(2);
    pool.execute(r);
    pool.execute(r);