How can I use TTask.WaitForAny from the new thread

2019-02-13 09:02发布

问题:

In an attempt to use the threading library in Delphi to calculate tasks in parallel and using TTask.WaitForAny() to get the first calculated result, an exception occationally stopped the execution.

Call stack at the exception:

First chance exception at $752D2F71. Exception class EMonitorLockException with message 'Object lock not owned'. Process Project1.exe (11248)

:752d2f71 KERNELBASE.RaiseException + 0x48
System.TMonitor.CheckOwningThread
System.ErrorAt(25,$408C70)
System.Error(reMonitorNotLocked)
System.TMonitor.CheckOwningThread
System.TMonitor.Exit
System.TMonitor.Exit($2180E40)
System.Threading.TTask.RemoveCompleteEvent(???)
System.Threading.TTask.DoWaitForAny((...),4294967295)
System.Threading.TTask.WaitForAny((...))
Project9.Parallel2
Project9.Project1
:74ff919f KERNEL32.BaseThreadInitThunk + 0xe
:7723b54f ntdll.RtlInitializeExceptionChain + 0x8f
:7723b51a ntdll.RtlInitializeExceptionChain + 0x5a

The call stack leads to the conclusion that the exception is caused by a bug in the threading library, TMonitor and/ or TTask.WaitForAny(). To verify that, the code was cut down to a minimum:

program Project1;

{$APPTYPE CONSOLE}

uses
  System.SysUtils, System.Threading, System.Classes, System.SyncObjs,
  System.StrUtils;
var
  WorkerCount : integer = 1000;

function MyTaskProc: TProc;
begin
  result := procedure
    begin
      // Do something
    end;
end;

procedure Parallel2;
var
  i : Integer;
  Ticks: Cardinal;
  tasks: array of ITask;
  LTask: ITask;
  workProc: TProc;
begin
  workProc := MyTaskProc();
  Ticks := TThread.GetTickCount;
  SetLength(tasks, WorkerCount); // number of parallel tasks to undertake
  for i := 0 to WorkerCount - 1 do // parallel tasks
    tasks[i] := TTask.Run(workProc);
  TTask.WaitForAny(tasks); // wait for the first one to finish
  for LTask in tasks do
    LTask.Cancel; // kill the remaining tasks
  Ticks := TThread.GetTickCount - Ticks;
  WriteLn('Parallel time ' + Ticks.ToString + ' ms');
end;

begin
  try
    repeat
      Parallel2;
      WriteLn('finished');
    until FALSE;
  except
    on E: Exception do
      writeln(E.ClassName, ': ', E.Message);
  end;
  Readln;
end.

Now the error reproduces after a while and the RTL bug is verified.

This was submitted as RSP-10197 TTask.WaitForAny gives exception EMonitorLockException "Object lock not owned" to Embarcadero.


Given the fact that this is currently not possible to solve with the Delphi threading library, the question is:

Is there a workaround to execute a procedure in parallel to get the first acquired solution?

回答1:

Here is an example using TParallel.For to stop the execution when an answer is produced. It uses the TParallel.LoopState to signal other members of the parallel for loop. By using the .Stop signal, all current and pending iterations should stop. Current iterations should check loopState.Stopped.

procedure Parallel3(CS: TCriticalSection);
var
  Ticks: Cardinal;
  i,ix: Integer;  // variables that are only touched once in the Parallel.For loop
begin
  i := 0;
  Ticks := TThread.GetTickCount;
  TParallel.For(1,WorkerCount,
    procedure(index:Integer; loopState: TParallel.TLoopState)
    var
      k,l,m: Integer;
    begin
      // Do something complex
      k := (1000 - index)*1000;
      for l := 0 to Pred(k) do
        m := k div 1000;
      // If criteria to stop fulfilled:
      CS.Enter;
      Try
        if loopState.Stopped then // A solution was already found
          Exit;
        loopState.Stop;  // Signal 
        Inc(i);
        ix := index;
      Finally
        CS.Leave;
      End;
    end
  );
  Ticks := TThread.GetTickCount - Ticks;
  WriteLn('Parallel time ' + Ticks.ToString + ' ticks', ' i :',i,' index:',ix);
end;

The critical section protects the calculated results, here for simplicity i,ix.


Disclaimer, given the state of bugs galore within the System.Threading library, I would recommend another solution using the OTL framework. At least until the library has reached a stable foundation.