I have often run into situations where I need some sort of valve construct to control the flow of a reactive pipeline. Typically, in a network-based application I have had the requirement to open/close a request stream according to the connection state.
This valve subject should support opening/closing the stream, and output delivery in FIFO order. Input values should be buffered when the valve is closed.
A ConcurrentQueue
or BlockingCollection
are typically used in such scenarios, but that immediately introduces threading into the picture. I was looking for a purely reactive solution to this problem.
Here's an implementation mainly based on Buffer()
and BehaviorSubject
. The behavior subject tracks the open/close state of the valve. Openings of the valve start buffering windows, and closings of the valve close those windows. Output of the buffer operator is "re-injected" onto the input (so that even observers themselves can close the valve):
/// <summary>
/// Subject offering Open() and Close() methods, with built-in buffering.
/// Note that closing the valve in the observer is supported.
/// </summary>
/// <remarks>As is the case with other Rx subjects, this class is not thread-safe, in that
/// order of elements in the output is indeterministic in the case of concurrent operation
/// of Open()/Close()/OnNext()/OnError(). To guarantee strict order of delivery even in the
/// case of concurrent access, <see cref="ValveSubjectExtensions.Synchronize{T}(NEXThink.Finder.Utils.Rx.IValveSubject{T})"/> can be used.</remarks>
/// <typeparam name="T">Elements type</typeparam>
public class ValveSubject<T> : IValveSubject<T>
{
private enum Valve
{
Open,
Closed
}
private readonly Subject<T> input = new Subject<T>();
private readonly BehaviorSubject<Valve> valveSubject = new BehaviorSubject<Valve>(Valve.Open);
private readonly Subject<T> output = new Subject<T>();
public ValveSubject()
{
var valveOperations = valveSubject.DistinctUntilChanged();
input.Buffer(
bufferOpenings: valveOperations.Where(v => v == Valve.Closed),
bufferClosingSelector: _ => valveOperations.Where(v => v == Valve.Open))
.SelectMany(t => t).Subscribe(input);
input.Where(t => valveSubject.Value == Valve.Open).Subscribe(output);
}
public bool IsOpen
{
get { return valveSubject.Value == Valve.Open; }
}
public bool IsClosed
{
get { return valveSubject.Value == Valve.Closed; }
}
public void OnNext(T value)
{
input.OnNext(value);
}
public void OnError(Exception error)
{
input.OnError(error);
}
public void OnCompleted()
{
output.OnCompleted();
input.OnCompleted();
valveSubject.OnCompleted();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return output.Subscribe(observer);
}
public void Open()
{
valveSubject.OnNext(Valve.Open);
}
public void Close()
{
valveSubject.OnNext(Valve.Closed);
}
}
public interface IValveSubject<T>:ISubject<T>
{
void Open();
void Close();
}
An additional method for flushing out the valve can be useful at times, e.g. to eliminate remaining requests when they are no longer relevant. Here is an implementation that builds upon the precedent, adapter-style:
/// <summary>
/// Subject with same semantics as <see cref="ValveSubject{T}"/>, but adding flushing out capability
/// which allows clearing the valve of any remaining elements before closing.
/// </summary>
/// <typeparam name="T">Elements type</typeparam>
public class FlushableValveSubject<T> : IFlushableValveSubject<T>
{
private readonly BehaviorSubject<ValveSubject<T>> valvesSubject = new BehaviorSubject<ValveSubject<T>>(new ValveSubject<T>());
private ValveSubject<T> CurrentValve
{
get { return valvesSubject.Value; }
}
public bool IsOpen
{
get { return CurrentValve.IsOpen; }
}
public bool IsClosed
{
get { return CurrentValve.IsClosed; }
}
public void OnNext(T value)
{
CurrentValve.OnNext(value);
}
public void OnError(Exception error)
{
CurrentValve.OnError(error);
}
public void OnCompleted()
{
CurrentValve.OnCompleted();
valvesSubject.OnCompleted();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return valvesSubject.Switch().Subscribe(observer);
}
public void Open()
{
CurrentValve.Open();
}
public void Close()
{
CurrentValve.Close();
}
/// <summary>
/// Discards remaining elements in the valve and reset the valve into a closed state
/// </summary>
/// <returns>Replayable observable with any remaining elements</returns>
public IObservable<T> FlushAndClose()
{
var previousValve = CurrentValve;
valvesSubject.OnNext(CreateClosedValve());
var remainingElements = new ReplaySubject<T>();
previousValve.Subscribe(remainingElements);
previousValve.Open();
return remainingElements;
}
private static ValveSubject<T> CreateClosedValve()
{
var valve = new ValveSubject<T>();
valve.Close();
return valve;
}
}
public interface IFlushableValveSubject<T> : IValveSubject<T>
{
IObservable<T> FlushAndClose();
}
As mentioned in the comment, these subjects are not "thread-safe" in the sense that order of delivery is no longer guaranteed in the case of concurrent operation. In a similar fashion as what exists for the standard Rx Subject
, Subject.Synchronize()
(https://msdn.microsoft.com/en-us/library/hh211643%28v=vs.103%29.aspx) we can introduce some extensions which provide locking around the valve:
public static class ValveSubjectExtensions
{
public static IValveSubject<T> Synchronize<T>(this IValveSubject<T> valve)
{
return Synchronize(valve, new object());
}
public static IValveSubject<T> Synchronize<T>(this IValveSubject<T> valve, object gate)
{
return new SynchronizedValveAdapter<T>(valve, gate);
}
public static IFlushableValveSubject<T> Synchronize<T>(this IFlushableValveSubject<T> valve)
{
return Synchronize(valve, new object());
}
public static IFlushableValveSubject<T> Synchronize<T>(this IFlushableValveSubject<T> valve, object gate)
{
return new SynchronizedFlushableValveAdapter<T>(valve, gate);
}
}
internal class SynchronizedValveAdapter<T> : IValveSubject<T>
{
private readonly object gate;
private readonly IValveSubject<T> valve;
public SynchronizedValveAdapter(IValveSubject<T> valve, object gate)
{
this.valve = valve;
this.gate = gate;
}
public void OnNext(T value)
{
lock (gate)
{
valve.OnNext(value);
}
}
public void OnError(Exception error)
{
lock (gate)
{
valve.OnError(error);
}
}
public void OnCompleted()
{
lock (gate)
{
valve.OnCompleted();
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
return valve.Subscribe(observer);
}
public void Open()
{
lock (gate)
{
valve.Open();
}
}
public void Close()
{
lock (gate)
{
valve.Close();
}
}
}
internal class SynchronizedFlushableValveAdapter<T> : SynchronizedValveAdapter<T>, IFlushableValveSubject<T>
{
private readonly object gate;
private readonly IFlushableValveSubject<T> valve;
public SynchronizedFlushableValveAdapter(IFlushableValveSubject<T> valve, object gate)
: base(valve, gate)
{
this.valve = valve;
this.gate = gate;
}
public IObservable<T> FlushAndClose()
{
lock (gate)
{
return valve.FlushAndClose();
}
}
}
Here is my implementation with delay operator:
source.delay(new Func1<Integer, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Integer integer) {
return valve.filter(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean aBoolean) {
return aBoolean;
}
});
}
})
.toBlocking()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("out: " + integer);
}
});
The idea is to delay all source emissions until "valve opens". If valve is already opened, there will be no delay in emission of item.
Rx valve gist