I've just begun to explore the PTL and have a design question.
My Scenario:
I have a list of URLs that each refer to an image. I want each image to be downloaded in parallel. As soon as at least one image is downloaded, I want to execute a method that does something with the downloaded image. That method should NOT be parallelized -- it should be serial.
I think the following will work but I'm not sure if this is the right way to do it. Because I have separate classes for collecting the images and for doing "something" with the collected images, I end up passing around an array of Tasks which seems wrong since it exposes the inner workings of how images are retrieved. But I don't know a way around it. In reality there is more to both of these methods but that's not important for this. Just know that they really shouldn't be lumped into one large method that both retrieves and does something with the image.
//From the Director class
Task<Image>[] downloadTasks = collector.RetrieveImages(listOfURLs);
for (int i = 0; i < listOfURLs.Count; i++)
{
//Wait for any of the remaining downloads to complete
int completedIndex = Task<Image>.WaitAny(downloadTasks);
Image completedImage = downloadTasks[completedIndex].Result;
//Now do something with the image (this "something" must happen serially)
//Uses the "Formatter" class to accomplish this let's say
}
///////////////////////////////////////////////////
//From the Collector class
public Task<Image>[] RetrieveImages(List<string> urls)
{
Task<Image>[] tasks = new Task<Image>[urls.Count];
int index = 0;
foreach (string url in urls)
{
string lambdaVar = url; //Required... Bleh
tasks[index] = Task<Image>.Factory.StartNew(() =>
{
using (WebClient client = new WebClient())
{
//TODO: Replace with live image locations
string fileName = String.Format("{0}.png", i);
client.DownloadFile(lambdaVar, Path.Combine(Application.StartupPath, fileName));
}
return Image.FromFile(Path.Combine(Application.StartupPath, fileName));
},
TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent);
index++;
}
return tasks;
}
Typically you use WaitAny to wait for one task when you don't care about the results of any of the others. For example if you just cared about the first image that happened to get returned.
How about this instead.
This creates two tasks, one which loads images and adds them to a blocking collection. The second task waits on the collection and processes any images added to the queue. When all the images are loaded the first task closes the queue down so the second task can shut down.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Drawing;
using System.IO;
using System.Net;
using System.Threading.Tasks;
namespace ClassLibrary1
{
public class Class1
{
readonly string _path = Directory.GetCurrentDirectory();
public void Demo()
{
IList<string> listOfUrls = new List<string>();
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
BlockingCollection<Image> images = new BlockingCollection<Image>();
Parallel.Invoke(
() => // Task 1: load the images
{
Parallel.For(0, listOfUrls.Count, (i) =>
{
Image img = RetrieveImages(listOfUrls[i], i);
img.Tag = i;
images.Add(img); // Add each image to the queue
});
images.CompleteAdding(); // Done with images.
},
() => // Task 2: Process images serially
{
foreach (var img in images.GetConsumingEnumerable())
{
string newPath = Path.Combine(_path, String.Format("{0}_rot.png", img.Tag));
Console.WriteLine("Rotating image {0}", img.Tag);
img.RotateFlip(RotateFlipType.RotateNoneFlipXY);
img.Save(newPath);
}
});
}
public Image RetrieveImages(string url, int i)
{
using (WebClient client = new WebClient())
{
string fileName = Path.Combine(_path, String.Format("{0}.png", i));
Console.WriteLine("Downloading {0}...", url);
client.DownloadFile(url, Path.Combine(_path, fileName));
Console.WriteLine("Saving {0} as {1}.", url, fileName);
return Image.FromFile(Path.Combine(_path, fileName));
}
}
}
}
WARNING: The code doesn't have any error checking or cancelation. It's late and you need something to do right? :)
This is an example of the pipeline pattern. It assumes that getting an image is pretty slow and that the cost of locking inside the blocking collection isn't going to cause a problem because it happens relatively infrequently compared to the time spent downloading images.
Our book... You can read more about this and other patterns for parallel programming at http://parallelpatterns.codeplex.com/
Chapter 7 covers pipelines and the accompanying examples show pipelines with error handling and cancellation.
TPL already provides the ContinueWith function to execute one task when another finishes. Task chaining is one of the main patterns used in TPL for asynchronous operations.
The following method downloads a set of images and continues by renaming each of the files
static void DownloadInParallel(string[] urls)
{
var tempFolder = Path.GetTempPath();
var downloads = from url in urls
select Task.Factory.StartNew<string>(() =>{
using (var client = new WebClient())
{
var uri = new Uri(url);
string file = Path.Combine(tempFolder,uri.Segments.Last());
client.DownloadFile(uri, file);
return file;
}
},TaskCreationOptions.LongRunning|TaskCreationOptions.AttachedToParent)
.ContinueWith(t=>{
var filePath = t.Result;
File.Move(filePath, filePath + ".test");
},TaskContinuationOptions.ExecuteSynchronously);
var results = downloads.ToArray();
Task.WaitAll(results);
}
You should also check the WebClient Async Tasks from the ParallelExtensionsExtras samples. The DownloadXXXTask extension methods handle both the creation of tasks and the asynchronous downloading of files.
The following method uses the DownloadDataTask extension to get the image's data and rotate it before saving it to disk
static void DownloadInParallel2(string[] urls)
{
var tempFolder = Path.GetTempPath();
var downloads = from url in urls
let uri=new Uri(url)
let filePath=Path.Combine(tempFolder,uri.Segments.Last())
select new WebClient().DownloadDataTask(uri)
.ContinueWith(t=>{
var img = Image.FromStream(new MemoryStream(t.Result));
img.RotateFlip(RotateFlipType.RotateNoneFlipY);
img.Save(filePath);
},TaskContinuationOptions.ExecuteSynchronously);
var results = downloads.ToArray();
Task.WaitAll(results);
}
The best way to do this would probably be by implementing the Observer pattern: have your RetreiveImages
function implement IObservable, put your "completed image action" into an IObserver object's OnNext
method, and subscribe it to RetreiveImages
.
I haven't tried this myself yet (still have to play more with the task library) but I think this is the "right" way to do it.
//download all images
private async void GetAllImages ()
{
var downloadTasks = listOfURLs.Where(url => !string.IsNullOrEmpty(url)).Select(async url =>
{
var ret = await RetrieveImage(url);
return ret;
}).ToArray();
var counts = await Task.WhenAll(downloadTasks);
}
//From the Collector class
public async Task<Image> RetrieveImage(string url)
{
var lambdaVar = url; //Required... Bleh
using (WebClient client = new WebClient())
{
//TODO: Replace with live image locations
var fileName = String.Format("{0}.png", i);
await client.DownloadFile(lambdaVar, Path.Combine(Application.StartupPath, fileName));
}
return Image.FromFile(Path.Combine(Application.StartupPath, fileName));
}