可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
Please, observe the following unit test:
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace UnitTests
{
[TestClass]
public class TestRx
{
public const int UNIT_TEST_TIMEOUT = 5000;
private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
{
int value = i;
obs.OnNext(await Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
return value;
}));
}
});
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void Subscribe()
{
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
}, e => Assert.Fail(), () =>
{
Assert.AreEqual(100, i);
tcs.TrySetResult(null);
});
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeCancel()
{
var tcs = new TaskCompletionSource<object>();
var cts = new CancellationTokenSource();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
cts.Cancel();
}
}, e =>
{
Assert.IsTrue(i < 100);
tcs.TrySetResult(null);
}, () =>
{
Assert.IsTrue(i < 100);
tcs.TrySetResult(null);
}, cts.Token);
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeThrow()
{
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
throw new Exception("xo-xo");
}
}, e =>
{
Assert.AreEqual("xo-xo", e.Message);
tcs.TrySetResult(null);
}, Assert.Fail);
tcs.Task.Wait();
}
}
}
The unit tests SubscribeCancel
and SubscribeThrow
time out, because the OnError
callback is never called and thus the waiting on the task never ends.
What is wrong?
P.S.
This question is related to How to wrap SqlDataReader with IObservable properly?
EDIT
In the meantime I have created a new Rx issue - https://rx.codeplex.com/workitem/74
Also http://social.msdn.microsoft.com/Forums/en-US/5d0a4808-3ee0-4ff0-ab11-8cd36460cd66/why-is-the-onerror-callback-never-called-when-throwing-from-the-given-subscriber?forum=rx
EDIT2
The following observer implementation produces exactly the same result, even though it complies with the paragraph 6.5 of the Rx Design Guidelines - "Subscribe implementations should not throw":
private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
try
{
for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
{
int value = i;
obs.OnNext(await Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
return value;
}));
}
obs.OnCompleted();
}
catch (Exception exc)
{
obs.OnError(exc);
}
});
}
EDIT3
I am starting to believe that one is supposed to write a code like this when an asynchronous observable sequence is integrated into an otherwise synchronous code (which would usually be the case on a server side in one place or another):
var tcs = new TaskCompletionSource<object>();
GetObservable().Subscribe(n =>
{
try
{
...
}
catch (Exception e)
{
DoErrorLogic();
tcs.TrySetException(e);
}
}, e =>
{
DoErrorLogic();
tcs.TrySetException(e);
}, () =>
{
DoCompletedLogic();
tcs.TrySetResult(null);
});
tcs.Task.Wait();
Is it really so?
EDIT 4
I think it finally starts trickling down my rusty brain what you are trying to say. I will switch to my other post now - How to wrap SqlDataReader with IObservable properly?
回答1:
This behaviour is by design. If the subscriber throws an exception (which is bad practice by the way), the Rx framework correctly reasons it is dead and communicates with it no further. If a subscription is cancelled, this is also not an error - merely a request to send no further events of any kind - which Rx honours.
Edit in response to comments
I don't think there is an easy reference to point at in documentation - the behaviour you are seeing is so intrinsic it's implicit. The closest I can get is to point you at the source code for AnonymousSafeObserver and AutoDetatchObserver. The latter has an explanatory scenario that might help, but it's a little involved.
Perhaps an analogy would help. Imagine the data stream events are newspapers being delivered by a newsagent. and the subscribers are households.
Subscriber throws an exception
The newsagent happily delivers newspapers until one day, one of the subscribers - a Mr. Jones - leaves his gas on and his house explodes killing Mr. Jones and destroying the house (throw unhandled exception). The newsagent realises that he can no longer deliver newspapers to Mr. Jones and neither can he send a termination notice and there is no problem with the newspaper supply (so OnError or OnCompleted is not appropriate) and the newsagent carries on with one less subscriber.
Contrast this with the newspaper printers inadvertently using inflammable ink and sending the factory up in flames. Now the poor newsagent must indeed send an explanatory note (OnError) to all it's subscribers that supply has stopped indefinitely.
Subscriber cancels subscription
Mr. Jones is receiving newspapers from his subscription, until one day he decides he is sick of the endless torrent of depressing stories and asks to cancel his subscription. The newsagent obliges. He does not send Mr. Jones a note explaining the newspaper has stopping printing editions (no OnCompleted) - they haven't. Neither does he send Mr. Jones a note explaining the newspaper has gone out of business (no OnError) - he just stops delivering newspapers, as Mr. Jones requested.
Response to Edit3
I sympathise with your struggle. I note throughout your code you have been trying to mesh the TPL (Task) idiom with that of Rx. Such attempts often feel clumsy because they are really quite different worlds. It's quite hard to comment on a paragraph like this:
I am starting to believe that one is supposed to write a code like this when an asynchronous observable sequence is integrated into an otherwise synchronous code (which would usually be the case on a server side in one place or another):
In strong agreement with Brandon's well-made assertion, I can't think of instances where it's really appropriate to integrate asynchronous code with synchronous code on the server side in the way you are attempting. This feels like a design smell to me. Idiomatically, one would try to keep the code reactive - make the subscription, and let the subscriber handle work reactively. I can't recall coming across a necessity to transition into synchronous code the way you describe.
Certainly, looking at the code you wrote in Edit3, it's not clear what you are trying to achieve. It's not the responsibility of the source to react to errors in a subscriber. This is the tail wagging the dog. The exception handlers that need to be there to ensure continuity of service of a subscriber should be in the subscription handling code, not in the source observable - it should only concern itself with protection from rogue observer behaviour. Such logic is implemented in the AnonymousSafeObserver linked above and is used by most of the Rx supplied operators. The observable may very well have logic to handle continuity of its source data - but that is a different concern, and not one you are addressing in your code.
Wherever you are attempting to bridge to synchronous code via calls to ToTask
or Wait
there is probably a cause to consider your design carefully.
I feel that providing a more concrete problem statement - perhaps drawn from a real world scenario you are trying to solve - would serve to elicit more helpful advice for you. The 'SqlDataReader` example where you say...
Finally people might use the observable [wrapping a SqlDataReader] directly by subscribing to it, but they would have to wait for the end (blocking the thread) at some point, since most of the code around is still synchronous.
... highlights the design quagmire you are in. In this case as you infer such consumers would clearly be better off using an IEnumerable<T>
interface - or perhaps asking for an IObservable<List<T>>
. But the key is to look at the bigger picture, the fact you are trying to wrap a SqlDataReader in an observable wrapper at all is a design smell - because this is a of supply of fixed data in response to a specific one-time request. This is possibly an asynchronous scenario - but not really a reactive one. Contrast with a more typically reactive scenario like "send me prices for stock X whenever you get them" where you are setting up a future flow of data entirely at the behest of the source for subscribers to then react.
回答2:
It is not explicitly stated in the guidelines, but it is implied by the Rx Grammar and the purpose of IObservables
. IObservables communicate information from the source to one or more observers. Information that is communicated are data (via OnNext
), optionally followed by an OnCompleted
or an OnError
. It is important to remember that these callbacks are triggered by the source. They cannot and should not be triggered as a result of an observer.
If OnError
is called, it will be because something in the source observable chain has failed. It will never be because an observer has failed.
In your SubscribeThrow
example, the observer (constructed from your 3 supplied lambdas for OnNext
, OnError
, OnCompleted
) is failing. Such errors in observers cannot and should not cause the source observable itself to fail.
RX 2.0 introduced safeguards to ensure this contract. Read the "Revamped error handling strategy" section of the RX 2.0 release blog post.
Related question: How to handle exceptions in OnNext when using ObserveOn?
Edit3
That is certainly one way to do it, but it is pretty ugly. Firstly, I'll challenge your assertion that asynchronous server-side code usually ends up needing to be synchronous to interact with some synchronous code. I find that only to be true in unit tests.
But anyway, I believe you are just subscribing too early. My experience with Rx is that whenever I encountering friction, it is because I am subscribing too soon and instead should be extending the observable monad chain.
In your example, instead of subscribing to the stream of data and processing it in your observer, think of your processer logic as just another projection of the incoming data. Your logic in this case is just transforming a piece of data into a work result. This allows you to treat the success or failure of your logic as part of the stream which you can then observe the way you wish. You end up with this:
var data = GetObservable();
var results = data.Select(item =>
{
DoWork(item);
// since your work does not produce anything...
// it either succeeds or throws an exception
// and you cannot make an Observable<void>
// return Unit.Default. Unit is the Rx equivalent of
// void.
return Unit.Default;
});
// subscribe to the stream and wait synchronously for it to finish
results.Wait(); // this will throw an exception the first time DoWork fails
// or asynchronously await the stream to finish...just like a Task
await results;
// or turn the stream into a Task that completes when the processing is complete.
var task = results.ToTask();
Or, what if you do not want to stop processing on the first error, but instead just accumulate the errors. This is easy now that you think of your work as a projection...
var results = data.Select(item =>
{
try
{
DoWork(item);
return null; // no error
}
catch (Exception e)
{
return e;
}
}).Where(e => e != null).ToList();
var errorList = results.Wait();
// or var errorList = await results;
// or Task<List<Exception>> errorListTask = results.ToTask();
both of these examples seem much simpler and cleaner and are possible by just thinking about the problem differently.