Spawn processes, but only 5 at a time

2019-04-07 16:38发布

问题:

i'm working off a queue with filenames. Each file has to be processed by a external binary. This works fine, but it only processes one file at a time. Is it possible two spawn a number of processes parallel?

Queue<string> queue = new Queue<string>();
queue.Enqueue("1.mp3");
queue.Enqueue("2.mp3");
queue.Enqueue("3.mp3");
...
queue.Enqueue("10000.mp3");

while (queue.Count > 0)
{
    string file = queue.Dequeue();

    Process p = new Process();    
    p.StartInfo.FileName = @"binary.exe";
    p.StartInfo.Arguments = file;
    p.StartInfo.UseShellExecute = false;
    p.StartInfo.CreateNoWindow = true;
    p.StartInfo.RedirectStandardOutput = true;
    p.Start();
    p.WaitForExit();
}

Update: I like the solution from Alex LE (Spawn processes, but only 5 at a time), but is it possible to wait for the child processes to exit as suggested by Ben Voigt?

Edit 1: i need to check for p.ExitCode == 0 to make some database updates.

回答1:

Here's what should have been possible, if the wait handle associated with the process was marked public instead of internal as it currently is (vote here to ask Microsoft to change that):

void BatchProcess()
{
    Queue<string> queue = new Queue<string>();
    queue.Enqueue("1.mp3");
    queue.Enqueue("2.mp3");
    queue.Enqueue("3.mp3");
    ...
    queue.Enqueue("10000.mp3");

    WaitHandle[] subprocesses = new WaitHandle[Math.Min(queue.Count, 5)];
    for( int i = 0; i < subprocesses.Length; i++ ) {
        subprocesses[i] = Spawn(queue.Dequeue());
    }

    while (queue.Count > 0) {
        int j = WaitHandle.WaitAny(subprocesses);
        subprocesses[j].Dispose();
        subprocesses[j] = Spawn(queue.Dequeue());
    }

    WaitHandle.WaitAll(subprocesses);
    foreach (var wh in subprocesses) {
        wh.Dispose();
    }
}

ProcessWaitHandle Spawn(string args)
{
    using (Process p = new Process()) {
        p.StartInfo.FileName = @"binary.exe";
        p.StartInfo.Arguments = args;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.Start();
        return p.WaitHandle;
    }
}

This would be the most efficient solution possible, because no synchronization objects are needed besides the Win32 processes themselves. There are no extra threads needed in the C# code and no asynchronous method invocations, therefore no locking or other synchronization is needed whatsoever.



回答2:

This works (this will be easier with C# 5.0 async await):

Queue<string> queue = new Queue<string>();
queue.Enqueue("1.mp3");
queue.Enqueue("2.mp3");
queue.Enqueue("3.mp3");
...
queue.Enqueue("10000.mp3");

int runningProcesses = 0;
const int MaxRunningProcesses = 5;
object syncLock = new object();

Action<string> run = new Action<string>(delegate(string file) {
    using (Process p = new Process()) {
        p.StartInfo.FileName = @"binary.exe";
        p.StartInfo.Arguments = file;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.StartInfo.RedirectStandardOutput = true;
        p.Start();
        p.WaitForExit();
    }
});

Action runNext = null;
runNext = delegate() {
    lock (syncLock) {
        if (queue.Count > 0) {
            run.BeginInvoke(queue.Dequeue(), new AsyncCallback(delegate {
                runNext();
            }), null);
        }
    }
};

while (runningProcesses++ < MaxRunningProcesses) {
    runNext();
}


回答3:

Exctracting some parts of your code and adding a semaphore:

Semaphore semX = new Semaphore(5, int.MaxValue);

void f(name, args) {
    using (Process p = new Process())
    {
        p.StartInfo.FileName = name;
        p.StartInfo.Arguments = args;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.StartInfo.RedirectStandardOutput = true;
        p.Start();
        p.WaitForExit();
    }

    semX.Release();     // !!! This one is important
}

You then use

while (queue.Count > 0)
{
    string file = queue.Dequeue();
    semX.WaitOne();    // !!!
    (new Thread((ThreadStart) (() => f(file, "")))).Start();    // dirty unreadable code to start a routine async
}

for (int n = 5; n > 0; n--)        // Wait for the last 5 to finish
    semX.WaitOne();

semX.Dispose();                    // Dispose the semaphore


回答4:

Basically you have a producer consumer problem. So you should absolutely use the collections in the System.Collections.Concurrent namespace. Here is a simple example that you can simply apply to your problem - as a added bonus you can start filling the queue and its processing at the same time!

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;

class Program
{
    static readonly BlockingCollection<string> _collection = new BlockingCollection<string>();

    static void Main()
    {
        const int maxTasks = 5;
        var tasks = new List<Task> {
            // startup publisher task...
            Task.Factory.StartNew(() => { 
                for(var i = 0; i < 1000; i++)
                {
                    _collection.Add(i + ".mp3");
                }
                Console.WriteLine("Publisher finished");
                _collection.CompleteAdding();
            }),
        };
        for (var i = 0; i < maxTasks; i++)
        {
            tasks.Add(Task.Factory.StartNew(ConsumerTask(i)));
        }
        Task.WaitAll(tasks.ToArray()); // wait for completion
    }

    static Action ConsumerTask(int id)
    {
        // return a closure just so the id can get passed
        return () =>
        {
            string item;
            while (true)
            {
                if (_collection.TryTake(out item, -1))
                {
                    using(Process p = new Process())
                    {
                        p.StartInfo.FileName = "binary.exe";
                        p.StartInfo.Arguments = item;
                        p.Start();
                        p.WaitForExit();
                        var exitCode = p.ExitCode;
                        // TODO handle exit code
                    }
                }
                else if (_collection.IsAddingCompleted)
                {
                    break; // exit loop
                }
            }
            Console.WriteLine("Consumer {0} finished", id);
        };
    }
}


回答5:

You can use semaphores for this and asynchronously call the long running process as much as you want:

private Semaphore _semaphore;
private delegate void Processor(string fileName);
[Test]
public void SetterTest() {
  var queue = new Queue<string>();
  queue.Enqueue("1.mp3");
  queue.Enqueue("2.mp3");
  queue.Enqueue("3.mp3");
  // ..
  queue.Enqueue("10000.mp3");
  var noOfThreads = 5;
  using (_semaphore = new Semaphore(noOfThreads, noOfThreads)) {
    while (queue.Count > 0) {
      string fileName;
      fileName = queue.Dequeue();
      _semaphore.WaitOne();
      new Processor(ProcessFile).BeginInvoke(fileName, null, null);
    }
    for (int i=0; i<noOfThreads; i++) _semaphore.WaitOne();
  }
}
private void ProcessFile(string file) {
  Process p;
  using (p = new Process()) {
    p.StartInfo.FileName = @"binary.exe";
    p.StartInfo.Arguments = file;
    p.StartInfo.UseShellExecute = false;
    p.StartInfo.CreateNoWindow = true;
    p.StartInfo.RedirectStandardOutput = true;
    p.Start();
    p.WaitForExit();
  }
  _semaphore.Release();
}

hope this helps



回答6:

This is one will block the Main thread partially base on Ben's answer, but this already run.

static void Run(string file)
{
    using (Process p = new Process()) {
        p.StartInfo.FileName = @"binary.exe";
        p.StartInfo.Arguments = file;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.Start();
        p.WaitForExit();
    }
}

static WaitHandle RunAsync(string file)
{
    Action<string> result = new Action<string>(Run).BeginInvoke(file, null, null);
    return result.AsyncWaitHandle;
}

static void Main()
{
    Queue<string> queue = new Queue<string>();
    queue.Enqueue("1.mp3");
    queue.Enqueue("2.mp3");
    queue.Enqueue("3.mp3");
    queue.Enqueue("4.mp3");
    queue.Enqueue("5.mp3");
    queue.Enqueue("6.mp3");
    // ...
    queue.Enqueue("10000.mp3");


    const int MaxRunningProcesses = 5;

    List<WaitHandle> runningProcesses = new List<WaitHandle>(MaxRunningProcesses);

    while (queue.Count > 0 && runningProcesses.Count < MaxRunningProcesses) {
        runningProcesses.Add(RunAsync(queue.Dequeue()));
    }

    while (runningProcesses.Count > 0) {
        int j = WaitHandle.WaitAny(runningProcesses.ToArray());
        runningProcesses[j].Close();
        runningProcesses.RemoveAt(j);
        if (queue.Count > 0) {
            runningProcesses.Add(RunAsync(queue.Dequeue()));
        }
    }
}