Named Pipes - Asynchronous Peeking

2019-03-19 12:03发布

问题:

I need to find a way to be notified when a System.IO.Pipe.NamedPipeServerStream opened in asynchronous mode has more data available for reading on it- a WaitHandle would be ideal. I cannot simply use BeginRead() to obtain such a handle because it's possible that i might be signaled by another thread which wants to write to the pipe- so I have to release the lock on the pipe and wait for the write to be complete, and NamedPipeServerStream doesnt have a CancelAsync method. I also tried calling BeginRead(), then calling the win32 function CancelIO on the pipe if the thread gets signaled, but I don't think this is an ideal solution because if CancelIO is called just as data is arriving and being processed, it will be dropped- I still wish to keep this data, but process it at a later time, after the write. I suspect the win32 function PeekNamedPipe might be useful but i'd like to avoid having to continuously poll for new data with it.

In the likley event that the above text is a bit unclear, here's roughly what i'd like to be able to do...

NamedPipeServerStream pipe;
ManualResetEvent WriteFlag;
//initialise pipe
lock (pipe)
{
    //I wish this method existed
    WaitHandle NewDataHandle = pipe.GetDataAvailableWaithandle();
    Waithandle[] BreakConditions = new Waithandle[2];
    BreakConditions[0] = NewDataHandle;
    BreakConditions[1] = WriteFlag;
    int breakcode = WaitHandle.WaitAny(BreakConditions);
    switch (breakcode)
    {
        case 0:
            //do a read on the pipe
            break;
        case 1:
            //break so that we release the lock on the pipe
            break;
     }
}

回答1:

Ok, so I just ripped this out of my code, hopefully I deleted all the application logic stuff. The idea is that you try a zero-length read with ReadFile and wait on both the lpOverlapped.EventHandle (fired when the read completes) and a WaitHandle set when another thread wants to write to the pipe. If the read is to be interrupted due to a writing thread, use CancelIoEx to cancel the zero-length read.

NativeOverlapped lpOverlapped;
ManualResetEvent DataReadyHandle = new ManualResetEvent(false);
lpOverlapped.InternalHigh = IntPtr.Zero;
lpOverlapped.InternalLow = IntPtr.Zero;
lpOverlapped.OffsetHigh = 0;
lpOverlapped.OffsetLow = 0;
lpOverlapped.EventHandle = DataReadyHandle.SafeWaitHandle.DangerousGetHandle();
IntPtr x = Marshal.AllocHGlobal(1); //for some reason, ReadFile doesnt like passing NULL in as a buffer
bool rval = ReadFile(SerialPipe.SafePipeHandle, x, 0, IntPtr.Zero,
   ref lpOverlapped);
int BreakCause;
if (!rval) //operation is completing asynchronously
{
   if (GetLastError() != 997) //ERROR_IO_PENDING, which is in fact good
      throw new IOException();
   //So, we have a list of conditions we are waiting for
   WaitHandle[] BreakConditions = new WaitHandle[3];
   //We might get some input to read from the serial port...
   BreakConditions[0] = DataReadyHandle;
    //we might get told to yield the lock so that CPU can write...
   BreakConditions[1] = WriteRequiredSignal;
   //or we might get told that this thread has become expendable
   BreakConditions[2] = ThreadKillSignal;
   BreakCause = WaitHandle.WaitAny(BreakConditions, timeout);
}
else //operation completed synchronously; there is data available
{
   BreakCause = 0; //jump into the reading code in the switch below
}
switch (BreakCause)
{
   case 0:
      //serial port input
      byte[] Buffer = new byte[AttemptReadSize];
      int BRead = SerialPipe.Read(Buffer, 0, AttemptReadSize);
      //do something with your bytes.
      break;
   case 1:
      //asked to yield
      //first kill that read operation
      CancelIoEx(SerialPipe.SafePipeHandle, ref lpOverlapped);
      //should hand over the pipe mutex and wait to be told to tkae it back
      System.Threading.Monitor.Exit(SerialPipeLock);
      WriteRequiredSignal.Reset();
      WriteCompleteSignal.WaitOne();
      WriteCompleteSignal.Reset();
      System.Threading.Monitor.Enter(SerialPipeLock);
      break;
   case 2:
      //asked to die
      //we are the ones responsible for cleaning up the pipe
      CancelIoEx(SerialPipe.SafePipeHandle, ref lpOverlapped);
      //finally block will clean up the pipe and the mutex
      return; //quit the thread
}
Marshal.FreeHGlobal(x);


回答2:

Looking through MSDN, I don't see any mechanism to do what you want. The quickest solution is porbably to use interop to access PeekNamedPipe. If you don't want to use interop, you can abstract the pipe inside a custom class and provide the peek functionality within the abstraction. The abstraction would handle all the signaling and have to coordinate reading and writing to the pipe. Obviously, not a trivial task.

Another alternative, if possible in your situation, is to look into using WCF which is pretty much that abstraction.