I have a tcp listener which listens and writes data from the server. I used a BlockingCollection
to store data. Here I don't know when the file ends. So, my filestream is always open.
Part of my code is:
private static BlockingCollection<string> Buffer = new BlockingCollection<string>();
Process()
{
var consumer = Task.Factory.StartNew(() =>WriteData());
while()
{
string request = await reader.ReadLineAsync();
Buffer.Add(request);
}
}
WriteData()
{
FileStream fStream = new FileStream(filename,FileMode.Append,FileAccess.Write,FileShare.Write, 16392);
foreach(var val in Buffer.GetConsumingEnumerable(token))
{
fStream.Write(Encoding.UTF8.GetBytes(val), 0, val.Length);
fStream.Flush();
}
}
The problem is I cannot dispose filestream within loop otherwise I have to create filestream for each line and the loop may never end.
This would be much easier in .NET 4.5 if you used a DataFlow ActionBlock. An ActionBlock accepts and buffers incoming messages and processes them asynchronously using one or more Tasks.
You could write something like this:
public static async Task ProcessFile(string sourceFileName,string targetFileName)
{
//Pass the target stream as part of the message to avoid globals
var block = new ActionBlock<Tuple<string, FileStream>>(async tuple =>
{
var line = tuple.Item1;
var stream = tuple.Item2;
await stream.WriteAsync(Encoding.UTF8.GetBytes(line), 0, line.Length);
});
//Post lines to block
using (var targetStream = new FileStream(targetFileName, FileMode.Append,
FileAccess.Write, FileShare.Write, 16392))
{
using (var sourceStream = File.OpenRead(sourceFileName))
{
await PostLines(sourceStream, targetStream, block);
}
//Tell the block we are done
block.Complete();
//And wait fo it to finish
await block.Completion;
}
}
private static async Task PostLines(FileStream sourceStream, FileStream targetStream,
ActionBlock<Tuple<string, FileStream>> block)
{
using (var reader = new StreamReader(sourceStream))
{
while (true)
{
var line = await reader.ReadLineAsync();
if (line == null)
break;
var tuple = Tuple.Create(line, targetStream);
block.Post(tuple);
}
}
}
Most of the code deals with reading each line and posting it to the block. By default, an ActionBlock uses only a single Task to process one message at a time, which is fine in this scenario. More tasks can be used if needed to process data in parallel.
Once all lines are read, we notify the block with a call to Complete
and await for it to finish processing with await block.Completion
.
Once the block's Completion
task finishes we can close the target stream.
The beauty of the DataFlow library is that you can link multiple blocks together, to create a pipeline of processing steps. ActionBlock is typically the final step in such a chain. The library takes care to pass data from one block to the next and propagate completion down the chain.
For example, one step can read files from a log, a second can parse them with a regex to find specific patterns (eg error messages) and pass them on, a third can receive the error messages and write them to another file. Each step will execute on a different thread, with intermediate messages buffered at each step.