Filestream to RX throwing InvalidOperationExceptio

2019-08-28 21:44发布

问题:

I have set up a filewatcher to create me an observable stream of ticks whenever a file changes. This is then used as the trigger to start another repeated observable sequence that then reads the updated file contents from an open stream.

The problem I have is if the file updates happen in large batches in quick succession, the code throws an InvalidOperationException with the message

The stream is currently in use by a previous operation on the stream.

Stack trace

   at System.IO.StreamReader.CheckAsyncTaskInProgress()
   at System.IO.StreamReader.ReadLineAsync()
   at System.Reactive.Linq.QueryLanguage.StartAsyncImpl[TSource](Func`1 functionAsync, IScheduler scheduler) in D:\a\1\s\Rx.NET\Source\src\System.Reactive\Linq\QueryLanguage.Async.cs:line 676

What is wrong with my RX pipeline that could be causing this?

    var fileSystemWatcherChanges =
        Observable
            .Using(() =>
                    new FileSystemWatcher(path)
                    {
                        Filter = fileName,
                        EnableRaisingEvents = true
                    },
                fsw =>
                {
                    return Observable
                        .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                            h => fsw.Changed += h, h => fsw.Changed -= h)
                        .Select(x => Unit.Default);
                });

    // Now we open a new stream on the same file, shift it to the same position as our previous one
    // finished at, so we can continue with live updates
    var messages =
            Observable.Using(
                    () => new FileStream(logFileLocation, FileMode.Open, FileAccess.Read, FileShare.ReadWrite),
                    fs =>
                    {
                        fs.Seek(_fileStream.Position, SeekOrigin.Begin);

                        return Observable.Using(() => new StreamReader(fs),
                            sr =>
                                fileSystemWatcherChanges
                                    .StartWith(Unit.Default)
                                    .Select(x =>
                                        Observable
                                            .Defer(() => Observable.FromAsync(sr.ReadLineAsync))
                                            .Repeat()
                                            .TakeUntil(w => w == null))
                                    .Merge()
                                    .Where(w => w != null))
                        .Select((x, n) => new LogTailMessage(n, x));
                    });

    _logger.Info($"Started tailing log file for {logFileLocation}");

    Stream = messages
            .Buffer(TimeSpan.FromMilliseconds(500), 100)
            .Replay()
            .RefCount();