I have set up a filewatcher to create me an observable stream of ticks whenever a file changes. This is then used as the trigger to start another repeated observable sequence that then reads the updated file contents from an open stream.
The problem I have is if the file updates happen in large batches in quick succession, the code throws an InvalidOperationException
with the message
The stream is currently in use by a previous operation on the stream.
Stack trace
at System.IO.StreamReader.CheckAsyncTaskInProgress()
at System.IO.StreamReader.ReadLineAsync()
at System.Reactive.Linq.QueryLanguage.StartAsyncImpl[TSource](Func`1 functionAsync, IScheduler scheduler) in D:\a\1\s\Rx.NET\Source\src\System.Reactive\Linq\QueryLanguage.Async.cs:line 676
What is wrong with my RX pipeline that could be causing this?
var fileSystemWatcherChanges =
Observable
.Using(() =>
new FileSystemWatcher(path)
{
Filter = fileName,
EnableRaisingEvents = true
},
fsw =>
{
return Observable
.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Changed += h, h => fsw.Changed -= h)
.Select(x => Unit.Default);
});
// Now we open a new stream on the same file, shift it to the same position as our previous one
// finished at, so we can continue with live updates
var messages =
Observable.Using(
() => new FileStream(logFileLocation, FileMode.Open, FileAccess.Read, FileShare.ReadWrite),
fs =>
{
fs.Seek(_fileStream.Position, SeekOrigin.Begin);
return Observable.Using(() => new StreamReader(fs),
sr =>
fileSystemWatcherChanges
.StartWith(Unit.Default)
.Select(x =>
Observable
.Defer(() => Observable.FromAsync(sr.ReadLineAsync))
.Repeat()
.TakeUntil(w => w == null))
.Merge()
.Where(w => w != null))
.Select((x, n) => new LogTailMessage(n, x));
});
_logger.Info($"Started tailing log file for {logFileLocation}");
Stream = messages
.Buffer(TimeSpan.FromMilliseconds(500), 100)
.Replay()
.RefCount();