Decorate \\ delegate a File object to add function

2019-01-13 22:39发布

问题:

I've been writing a small Python script that executes some shell commands using the subprocess module and a helper function:

import subprocess as sp
def run(command, description):
    """Runs a command in a formatted manner. Returns its return code."""
    start=datetime.datetime.now()
    sys.stderr.write('%-65s' % description)
    s=sp.Popen(command, shell=True, stderr=sp.PIPE, stdout=sp.PIPE)
    out,err=s.communicate()
    end=datetime.datetime.now()
    duration=end-start
    status='Done' if s.returncode==0 else 'Failed'
    print '%s (%d seconds)' % (status, duration.seconds)

The following lines reads the standard output and error:

    s=sp.Popen(command, shell=True, stderr=sp.PIPE, stdout=sp.PIPE)
    out,err=s.communicate()

As you can see, stdout and stderr are not used. Suppose that I want to write the output and error messages to a log file, in a formatted way, e.g.:

[STDOUT: 2011-01-17 14:53:55] <message>
[STDERR: 2011-01-17 14:53:56] <message>

My question is, what's the most Pythonic way to do it? I thought of three options:

  1. Inherit the file object and override the write method.
  2. Use a Delegate class which implements write.
  3. Connect to the PIPE itself in some way.

UPDATE : reference test script

I'm checking the results with this script, saved as test.py:

#!/usr/bin/python
import sys

sys.stdout.write('OUT\n')
sys.stdout.flush()
sys.stderr.write('ERR\n')
sys.stderr.flush()

Any ideas?

回答1:

1 and 2 are reasonable solutions, but overriding write() won't be enough.

The problem is that Popen needs file handles to attach to the process, so Python file objects doesn't work, they have to be OS level. To solve that you have to have a Python object that has a os level file handle. The only way I can think of solving that is to use pipes, so you have an os level file handle to write to. But then you need another thread that sits and polls that pipe for things to read in so it can log it. (So this is more strictly an implementation of 2, as it delegates to logging).

Said and done:

import io
import logging
import os
import select
import subprocess
import time
import threading

LOG_FILENAME = 'output.log'
logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG)

class StreamLogger(io.IOBase):
    def __init__(self, level):
        self.level = level
        self.pipe = os.pipe()
        self.thread = threading.Thread(target=self._flusher)
        self.thread.start()

    def _flusher(self):
        self._run = True
        buf = b''
        while self._run:
            for fh in select.select([self.pipe[0]], [], [], 0)[0]:
                buf += os.read(fh, 1024)
                while b'\n' in buf:
                    data, buf = buf.split(b'\n', 1)
                    self.write(data.decode())
            time.sleep(1)
        self._run = None

    def write(self, data):
        return logging.log(self.level, data)

    def fileno(self):
        return self.pipe[1]

    def close(self):
        if self._run:
            self._run = False
            while self._run is not None:
                time.sleep(1)
            os.close(self.pipe[0])
            os.close(self.pipe[1])

So that class starts a os level pipe that Popen can attach the stdin/out/error to for the subprocess. It also starts a thread that polls the other end of that pipe once a second for things to log, which it then logs with the logging module.

Possibly this class should implement more things for completeness, but it works in this case anyway.

Example code:

with StreamLogger(logging.INFO) as out:
    with StreamLogger(logging.ERROR) as err:
        subprocess.Popen("ls", stdout=out, stderr=err, shell=True)

output.log ends up like so:

INFO:root:output.log
INFO:root:streamlogger.py
INFO:root:and
INFO:root:so
INFO:root:on

Tested with Python 2.6, 2.7 and 3.1.

I would think any implementation of 1 and 3 would need to use similar techniques. It is a bit involved, but unless you can make the Popen command log correctly itself, I don't have a better idea).



回答2:

I would suggest option 3, with the logging standard library package. In this case I'd say the other 2 were overkill.



回答3:

1 and 2 won't work. Here's an implementation of the principle:

import subprocess
import time

FileClass = open('tmptmp123123123.tmp', 'w').__class__

class WrappedFile(FileClass):
    TIMETPL = "%Y-%m-%d %H:%M:%S"
    TEMPLATE = "[%s: %s] "

    def __init__(self, name, mode='r', buffering=None, title=None):
        self.title = title or name

        if buffering is None:
            super(WrappedFile, self).__init__(name, mode)
        else:
            super(WrappedFile, self).__init__(name, mode, buffering)

    def write(self, s):
        stamp = time.strftime(self.TIMETPL)
        if not s:
            return 
        # Add a line with timestamp per line to be written
        s = s.split('\n')
        spre = self.TEMPLATE % (self.title, stamp)
        s = "\n".join(["%s %s" % (spre, line) for line in s]) + "\n"
        super(WrappedFile, self).write(s)

The reason it doesn't work is that Popen never calls stdout.write. A wrapped file will work fine when we call its write method and will even be written to if passed to Popen, but the write will happen in a lower layer, skipping the write method.



回答4:

This simple solution worked for me:

import sys
import datetime
import tempfile
import subprocess as sp
def run(command, description):
    """Runs a command in a formatted manner. Returns its return code."""
    with tempfile.SpooledTemporaryFile(8*1024) as so:
        print >> sys.stderr, '%-65s' % description
        start=datetime.datetime.now()
        retcode = sp.call(command, shell=True, stderr=sp.STDOUT, stdout=so)
        end=datetime.datetime.now()
        so.seek(0)
        for line in so.readlines():
            print >> sys.stderr,'logging this:', line.rstrip()
        duration=end-start
        status='Done' if retcode == 0 else 'Failed'
        print >> sys.stderr, '%s (%d seconds)' % (status, duration.seconds)

REF_SCRIPT = r"""#!/usr/bin/python
import sys

sys.stdout.write('OUT\n')
sys.stdout.flush()
sys.stderr.write('ERR\n')
sys.stderr.flush()
"""

SCRIPT_NAME = 'refscript.py'

if __name__ == '__main__':
    with open(SCRIPT_NAME, 'w') as script:
        script.write(REF_SCRIPT)
    run('python ' + SCRIPT_NAME, 'Reference script')


回答5:

This uses Adam Rosenfield's make_async and read_async. Whereas my original answer used select.epoll and was thus Linux-only, it now uses select.select, which should work under Unix or Windows.

This logs output from the subprocess to /tmp/test.log as it occurs:

import logging
import subprocess
import shlex
import select
import fcntl
import os
import errno

def make_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''add the O_NONBLOCK flag to a file descriptor'''
    fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)

def read_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''read some data from a file descriptor, ignoring EAGAIN errors'''
    try:
        return fd.read()
    except IOError, e:
        if e.errno != errno.EAGAIN:
            raise e
        else:
            return ''

def log_process(proc,stdout_logger,stderr_logger):
    loggers = { proc.stdout: stdout_logger, proc.stderr:  stderr_logger }
    def log_fds(fds):
        for fd in fds:
            out = read_async(fd)
            if out.strip():
                loggers[fd].info(out)
    make_async(proc.stdout)
    make_async(proc.stderr)
    while True:
        # Wait for data to become available 
        rlist, wlist, xlist = select.select([proc.stdout, proc.stderr], [], [])
        log_fds(rlist)
        if proc.poll() is not None:
            # Corner case: check if more output was created
            # between the last call to read_async and now
            log_fds([proc.stdout, proc.stderr])                
            break

if __name__=='__main__':
    formatter = logging.Formatter('[%(name)s: %(asctime)s] %(message)s')
    handler = logging.FileHandler('/tmp/test.log','w')
    handler.setFormatter(formatter)

    stdout_logger=logging.getLogger('STDOUT')
    stdout_logger.setLevel(logging.DEBUG)
    stdout_logger.addHandler(handler)

    stderr_logger=logging.getLogger('STDERR')
    stderr_logger.setLevel(logging.DEBUG)
    stderr_logger.addHandler(handler)        

    proc = subprocess.Popen(shlex.split('ls -laR /tmp'),
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
    log_process(proc,stdout_logger,stderr_logger)