I have a tcp listener which listens and writes data from the server. I used a BlockingCollection
to store data. Here I don't know when the file ends. So, my filestream is always open.
Part of my code is:
private static BlockingCollection<string> Buffer = new BlockingCollection<string>();
Process()
{
var consumer = Task.Factory.StartNew(() =>WriteData());
while()
{
string request = await reader.ReadLineAsync();
Buffer.Add(request);
}
}
WriteData()
{
FileStream fStream = new FileStream(filename,FileMode.Append,FileAccess.Write,FileShare.Write, 16392);
foreach(var val in Buffer.GetConsumingEnumerable(token))
{
fStream.Write(Encoding.UTF8.GetBytes(val), 0, val.Length);
fStream.Flush();
}
}
The problem is I cannot dispose filestream within loop otherwise I have to create filestream for each line and the loop may never end.
This would be much easier in .NET 4.5 if you used a DataFlow ActionBlock. An ActionBlock accepts and buffers incoming messages and processes them asynchronously using one or more Tasks.
You could write something like this:
Most of the code deals with reading each line and posting it to the block. By default, an ActionBlock uses only a single Task to process one message at a time, which is fine in this scenario. More tasks can be used if needed to process data in parallel.
Once all lines are read, we notify the block with a call to
Complete
and await for it to finish processing withawait block.Completion
.Once the block's
Completion
task finishes we can close the target stream.The beauty of the DataFlow library is that you can link multiple blocks together, to create a pipeline of processing steps. ActionBlock is typically the final step in such a chain. The library takes care to pass data from one block to the next and propagate completion down the chain.
For example, one step can read files from a log, a second can parse them with a regex to find specific patterns (eg error messages) and pass them on, a third can receive the error messages and write them to another file. Each step will execute on a different thread, with intermediate messages buffered at each step.