可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I've been reading a lot about threading but can't figure out how to find a solution to my issue.
First let me introduce the problem. I have files which need to be processed. The hostname and filepath are located in two arrays.
Now I want to setup several threads to process the files. The number of threads to create is based on three factors:
A) The maximum thread count cannot exceed the number of unique hostnames in all scenarios.
B) Files with the same hostname MUST be processed sequentially. I.E We cannot process host1_file1 and host1_file2 at the same time. (Data integrity will be put at risk and this is beyond my control.
C) The user may throttle the number of threads available for processing. The number of threads is still limited by condition A from above. This is purely due to the fact that if we had an large number of hosts let's say 50.. we might not want 50 threads processing at the same time.
In the example above a maximum of 6 threads can be created.
The optimal processing routine is shown below.
public class file_prep_obj
{
public string[] file_paths;
public string[] hostname;
public Dictionary<string, int> my_dictionary;
public void get_files()
{
hostname = new string[]{ "host1", "host1", "host1", "host2", "host2", "host3", "host4","host4","host5","host6" };
file_paths=new string[]{"C:\\host1_file1","C:\\host1_file2","C:\\host1_file3","C:\\host2_file1","C:\\host2_file2","C:\\host2_file2",
"C:\\host3_file1","C:\\host4_file1","C:\\host4_file2","C:\\host5_file1","C:\\host6_file1"};
//The dictionary provides a count on the number of files that need to be processed for a particular host.
my_dictionary = hostname.GroupBy(x => x)
.ToDictionary(g => g.Key,
g => g.Count());
}
}
//This class contains a list of file_paths associated with the same host.
//The group_file_host_name will be the same for a host.
class host_file_thread
{
public string[] group_file_paths;
public string[] group_file_host_name;
public void process_file(string file_path_in)
{
var time_delay_random=new Random();
Console.WriteLine("Started processing File: " + file_path_in);
Task.Delay(time_delay_random.Next(3000)+1000);
Console.WriteLine("Completed processing File: " + file_path_in);
}
}
class Program
{
static void Main(string[] args)
{
file_prep_obj my_files=new file_prep_obj();
my_files.get_files();
//Create our host objects... my_files.my_dictionary.Count represents the max number of threads
host_file_thread[] host_thread=new host_file_thread[my_files.my_dictionary.Count];
int key_pair_count=0;
int file_path_position=0;
foreach (KeyValuePair<string, int> pair in my_files.my_dictionary)
{
host_thread[key_pair_count] = new host_file_thread(); //Initialise the host_file_thread object. Because we have an array of a customised object
host_thread[key_pair_count].group_file_paths=new string[pair.Value]; //Initialise the group_file_paths
host_thread[key_pair_count].group_file_host_name=new string[pair.Value]; //Initialise the group_file_host_name
for(int j=0;j<pair.Value;j++)
{
host_thread[key_pair_count].group_file_host_name[j]=pair.Key.ToString(); //Group the hosts
host_thread[key_pair_count].group_file_paths[j]=my_files.file_paths[file_path_position]; //Group the file_paths
file_path_position++;
}
key_pair_count++;
}//Close foreach (KeyValuePair<string, int> pair in my_files.my_dictionary)
//TODO PROCESS FILES USING host_thread objects.
}//Close static void Main(string[] args)
}//Close Class Program
I guess what I'm after is a guide on how to code the threaded processing routines that are in accordance with the specs above.
回答1:
You can use Stephen Toub's ForEachAsync extension method to process the files. It allows you to specify how many concurrent threads you want to use, and it is non-blocking so it frees up your main thread to do other processing. Here is the method from the article:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
In order to use it I refactored your code slightly. I changed the dictionary to be of type Dictionary<string, List<string>>
and it basically holds the host as the key and then all the paths as the values. I assumed the file path will contain the host name in it.
my_dictionary = (from h in hostname
from f in file_paths
where f.Contains(h)
select new { Hostname = h, File = f }).GroupBy(x => x.Hostname)
.ToDictionary(x => x.Key, x => x.Select(s => s.File).Distinct().ToList());
I also changed your process_file
method to be async
as you were using Task.Delay
inside it, which you need to await
otherwise it doesn't do anything.
public static async Task process_file(string file_path_in)
{
var time_delay_random = new Random();
Console.WriteLine("Started:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId);
await Task.Delay(time_delay_random.Next(3000) + 1000);
Console.WriteLine("Completed:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId);
}
To use the code, you get the maximum number of threads you want to use and pass that to my_files.my_dictionary.ForEachAsync
. You also supply an asynchronous delegate which processes each of the files for a particular host and sequentially awaits each one to be processed.
public static async Task MainAsync()
{
var my_files = new file_prep_obj();
my_files.get_files();
const int userSuppliedMaxThread = 5;
var maxThreads = Math.Min(userSuppliedMaxThread, my_files.my_dictionary.Values.Count());
Console.WriteLine("MaxThreads = " + maxThreads);
foreach (var pair in my_files.my_dictionary)
{
foreach (var path in pair.Value)
{
Console.WriteLine("Key= {0}, Value={1}", pair.Key, path);
}
}
await my_files.my_dictionary.ForEachAsync(maxThreads, async (pair) =>
{
foreach (var path in pair.Value)
{
// serially process each path for a particular host.
await process_file(path);
}
});
}
static void Main(string[] args)
{
MainAsync().Wait();
Console.ReadKey();
}//Close static void Main(string[] args)
Ouput
MaxThreads = 5
Key= host1, Value=C:\host1_file1
Key= host1, Value=C:\host1_file2
Key= host1, Value=C:\host1_file3
Key= host2, Value=C:\host2_file1
Key= host2, Value=C:\host2_file2
Key= host3, Value=C:\host3_file1
Key= host4, Value=C:\host4_file1
Key= host4, Value=C:\host4_file2
Key= host5, Value=C:\host5_file1
Key= host6, Value=C:\host6_file1
Started:C:\host1_file1 ThreadId:10
Started:C:\host2_file1 ThreadId:12
Started:C:\host3_file1 ThreadId:13
Started:C:\host4_file1 ThreadId:11
Started:C:\host5_file1 ThreadId:10
Completed:C:\host1_file1 ThreadId:13
Completed:C:\host2_file1 ThreadId:12
Started:C:\host1_file2 ThreadId:13
Started:C:\host2_file2 ThreadId:12
Completed:C:\host2_file2 ThreadId:11
Completed:C:\host1_file2 ThreadId:13
Started:C:\host6_file1 ThreadId:11
Started:C:\host1_file3 ThreadId:13
Completed:C:\host5_file1 ThreadId:11
Completed:C:\host4_file1 ThreadId:12
Completed:C:\host3_file1 ThreadId:13
Started:C:\host4_file2 ThreadId:12
Completed:C:\host1_file3 ThreadId:11
Completed:C:\host6_file1 ThreadId:13
Completed:C:\host4_file2 ThreadId:12
回答2:
I was playing around with your problem and came up with the folllowing approach. It might not be the best, but I believe it suits your needs.
Before we begin, I'm a big fan of extension methods, so here is one:
public static class IEnumerableExtensions
{
public static void Each<T>(this IEnumerable<T> ie, Action<T, int> action)
{
var i = 0;
foreach (var e in ie) action(e, i++);
}
}
What this does is looping over a collection (foreach) but keeping the item and the index. You'll see why this is needed later.
Then we have the variables.
public static string[] group_file_paths =
{
"host1", "host1", "host1", "host2", "host2", "host3", "host4", "host4",
"host5", "host6"
};
public static string[] group_file_host_name =
{
@"c:\\host1_file1", @"c:\\host1_file2", @"c:\\host1_file3", @"c:\\host2_file1", @"c:\\host2_file2", @"c:\\host3_file1",
@"c:\\host4_file1", @"c:\\host4_file2", @"c:\\host5_file1", @"c:\\host5_file2", @"c:\\host6_file1"
};
Then the main code:
public static void Main(string[] args)
{
Dictionary<string, List<string>> filesToProcess = new Dictionary<string, List<string>>();
// Loop over the 2 arrays and creates a directory that contains the host as the key, and then all the filenames.
group_file_paths.Each((host, hostIndex) =>
{
if (filesToProcess.ContainsKey(host))
{ filesToProcess[host].Add(group_file_host_name[hostIndex]); }
else
{
filesToProcess.Add(host, new List<string>());
filesToProcess[host].Add(group_file_host_name[hostIndex]);
}
});
var tasks = new List<Task>();
foreach (var kvp in filesToProcess)
{
tasks.Add(Task.Factory.StartNew(() =>
{
foreach (var file in kvp.Value)
{
process_file(kvp.Key, file);
}
}));
}
var handleTaskCompletionTask = Task.WhenAll(tasks);
handleTaskCompletionTask.Wait();
}
Some explanation might be needed here:
So I'm creating a dictionary that will contains your hosts as the key and as the value a list of files that needs to be processed.
Your dictionary will look like:
After that I'm creating a collection of tasks that will be executed by using TPL.
I execute all the tasks right now and I'm waiting for all the tasks to finish.
Your process method seems as follow, just for testing purposes:
public static void process_file(string host, string file)
{
var time_delay_random = new Random();
Console.WriteLine("Host '{0}' - Started processing the file {1}.", host, file);
Thread.Sleep(time_delay_random.Next(3000) + 1000);
Console.WriteLine("Host '{0}' - Completed processing the file {1}.", host, file);
Console.WriteLine("");
}
This post does not include a way to set the threads yourself but it can be easily achieved by using a completion handler on the tasks. Than when any task complete, you can loop again over your collection and start a new task that hasn't been finished yet.
So, I hope it helps.
回答3:
I would start by organizing your data structure a bit better. Having two separate arrays not only increases data duplication, but also creates implicit coupling which may not be obvious to the person looking at your code.
A class which would hold information about a single task might look something like:
public class TaskInfo
{
private readonly string _hostName;
public string HostName
{
get { return _hostName; }
}
private readonly ReadOnlyCollection<string> _files;
public ReadOnlyCollection<string> Files
{
get { return _files; }
}
public TaskInfo(string host, IEnumerable<string> files)
{
_hostName = host;
_files = new ReadOnlyCollection<string>(files.ToList());
}
}
Creating a list of tasks is now much more straightforward:
var list = new List<TaskInfo>()
{
new TaskInfo(
host: "host1",
files: new[] { @"c:\host1\file1.txt", @"c:\host1\file2.txt" }),
new TaskInfo(
host: "host2",
files: new[] { @"c:\host2\file1.txt", @"c:\host2\file2.txt" })
/* ... */
};
And now that you have your tasks ready, you can simply use various classes from the System.Threading.Tasks
namespace to invoke them in parallel. If you really want to limit the number of concurrent tasks, you can simply use the MaxDegreeOfParallelism property:
Parallel.ForEach(
list,
new ParallelOptions() { MaxDegreeOfParallelism = 10 },
taskInfo => Process(taskInfo)
);
If you wanted to create your own pool of threads, you could have also achieved a similar thing using a ConcurrentQueue
with multiple consumer threads, possibly waiting on a list of WaitHandle
s to know when they're done.
回答4:
I think ThreadPool
is the perfect solution for you. It will handle the threads by itself and queue their work. Moreover, you can set the maximum threads limit and it will still queue your work even if you have more than the maximum number of threads.
ThreadPool.SetMaxThreads([YourMaxThreads],[YourMaxThreads]);
foreach (var t in host_thread)
{
ThreadPool.QueueUserWorkItem(Foo, t);
}
private static void Foo(object thread)
{
foreach (var file in (thread as host_file_thread).group_file_paths)
{
(thread as host_file_thread).process_file(file);
}
}
Although I would suggest you change your data structure and keep the process_file
method from it