Prevent reading data from an empty FIFO from block

2019-04-16 10:10发布

问题:

Within a Python 3 web application, I need to shell out to a command line utility that processes an image, writes its output to a named pipe (fifo), and then parse that output (the content of the pipe) into a PIL/Pillow Image. Here's the basic flow (and working code so long and there are no errors!):

from os import mkfifo
from os import unlink
from PIL import Image
from subprocess import DEVNULL
from subprocess import PIPE
from subprocess import Popen

fifo_path = '/tmp/myfifo.bmp'
cmd = '/usr/bin/convert -resize 100 /path/to/some.tif ' + fifo_path
# make a named pipe
mkfifo(fifo_path)
# execute
proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)
# parse the image
pillow_image = Image.open(fifo_path)
# finish the process:
proc_exit = proc.wait()
# remove the pipe:
unlink(fifo_path)
# just for proof:
pillow_image.show()

(I've replaced the utility I actually have to work with with ImageMagick in the example above, just because you're not likely to have it--it doesn't influence the problem at all.)

This works great in most circumstances, and I can handle most exceptions (left out above for clarity), but there's one case I can't manage to work out how to handle, which is what to do if something goes wrong in the shellout, resulting in an empty pipe e.g. if the image doesn't exist or is corrupt for some reason, e.g.:

fifo_path = '/tmp/myfifo.bmp'
cmd = '/usr/bin/convert -resize 100 /path/to/some/bad_or_missing.tif ' + fifo_path
# make a named pipe
mkfifo(fifo_path)
# execute
proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)
# parse the image
pillow_image = Image.open(fifo_path) # STUCK
...

The application just hangs here, and because I can't get to proc_exit = proc.wait() I can't set timeout (e.g. proc_exit = proc.wait(timeout=2)), which is what I'd normally do.

I've tried wrapping the whole business in a context manager, similar to this answer, but that recipe is not thread safe, which is a problem, and I can't find a threading or multiprocessing solution that gives me access to the PIL/Pillow Image instance when I join the thread or process (not my strong suit, but something like this):

from multiprocessing import Process
from os import mkfifo
from os import unlink
from PIL import Image
from subprocess import DEVNULL
from subprocess import PIPE
from subprocess import Popen

def do_it(cmd, fifo_path):
    mkfifo(fifo_path)
    # I hear you like subprocesses with your subprocesses...
    sub_proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)
    pillow_image = Image.open(fifo_path)
    proc_exit = sub_proc.wait()
    unlink(fifo_path)

fifo_path = '/tmp/myfifo.bmp'
cmd = '/usr/bin/convert -resize 100 /path/to/some/bad_or_missing.tif ' + fifo_path
proc = Process(target=do_it, args=(cmd, fifo_path))
proc.daemon = True
proc.start()
proc.join(timeout=3) # I can set a timeout here
# Seems heavy anyway, and how do I get pillow_image back for further work?
pillow_image.show()

Hopefully these illustrate my problem and what I've tried. Thanks in advance.

回答1:

POSIX read(2):

When attempting to read from an empty pipe or FIFO:

If no process has the pipe open for writing, read() shall return 0 to indicate end-of-file.

Image.open(fifo_path) may stuck if and only if the command dies without opening fifo_path for writing while it is blocked.

Normally, opening the FIFO blocks until the other end is opened also.

Here's a normal sequence:

  1. cmd blocks while trying to open fifo_open for writing
  2. your Python code blocks while trying to open for reading
  3. once the FIFO is opened by both processes the data flow starts. Except for the name, FIFO is similar to a pipe--there is exactly one pipe object--the kernel passes all data internally without writing it to the filesystem. The pipe is not a seekable file and therefore Image.open() may read until EOF
  4. cmd closes its end of the pipe. Your code receives EOF because no other process has the FIFO opened for writing and Image.open(fifo_path) returns.

    It doesn't matter why the cmd's end of the pipe is closed due to a successful completion or due to an error, whether cmd is killed abruptly or not: as long as its end is closed.

    It doesn't matter whether your process calls proc.wait() or not. proc.wait() does not kill the cmd. proc.wait() does not keep the other end of the pipe from being opened or closed. The only thing proc.wait() has to do is to wait until a child process dies and/or to return you the exit status of an already dead child process.

Here's the deadlock case:

  1. By the time of the Image.open() call, cmd does not even try to open fifo_open for writing for whatever reason e.g., there is no /usr/bin/convert, wrong command-line arguments, wrong/no input, etc
  2. your Python code blocks while trying to open for reading

fifo_open is not opened for writing and therefore Image.open(fifo_open) is stuck forever trying to open it for reading.


You could open the FIFO for writing in a background thread and close it when the parent opens the FIFO for reading:

#!/usr/bin/env python3
import contextlib
import os
import subprocess
import sys
import textwrap
import threading

fifo_path = "fifo"
with contextlib.ExitStack() as stack:
    os.mkfifo(fifo_path)
    stack.callback(os.remove, fifo_path)
    child = stack.enter_context(
        subprocess.Popen([
            sys.executable, '-c', textwrap.dedent('''
            import random
            import sys
            import time
            if random.random() < 0.5: # 50%
                open(sys.argv[1], 'w').write("ok")
            else:
                sys.exit("fifo is not opened for writing in the child")
            '''), fifo_path
        ]))
    stack.callback(child.kill)
    opened = threading.Event()  # set when the FIFO is opened for reading
    threading.Thread(target=open_for_writing, args=[fifo_path, opened, child],
                     daemon=True).start()
    pipe = stack.enter_context(open(fifo_path))  # open for reading
    opened.set()  # the background thread may close its end of the pipe now
    print(pipe.read()) # read data from the child or return in 3 seconds
sys.exit(child.returncode)

On EOF, the child is killed.

Where open_for_writing() opens the FIFO, to unblock open(fifo_path) that in turn enables closing it. To avoid pipe.read() returning too soon, it gives the child 3 seconds to open the FIFO for writing:

def open_for_writing(path, opened, child):
    with open(path, 'w'):
        opened.wait()  # don't close until opened for reading in the main thread
        try:
            child.wait(timeout=3)  # the child has 3 seconds to open for writing
        except subprocess.TimeoutExpired:
            pass

If you are sure that the child process is either tries to open the FIFO or exits eventually (or you are ok with the Python process hanging while the child runs then you could drop the timeout and use child.wait() instead of child.wait(timeout=3). With that change there are no arbitrary timeouts left and the code may work on an arbitrarily slow system (for whatever reason).

The code demonstrates why threads should be avoided if possible or why one should prefer established patterns (that are less general but are guaranteed to work correctly) such as synchronization via communication.

The code in the answer should work in a variety of cases but the parts are intricately tangled. The effects even of a small change might not be apparent until a very specific case materializes.