Handle generator exceptions in its consumer

2019-01-06 21:14发布

问题:

This is a follow-up to Handle an exception thrown in a generator and discusses a more general problem.

I have a function that reads data in different formats. All formats are line- or record-oriented and for each format there's a dedicated parsing function, implemented as a generator. So the main reading function gets an input and a generator, which reads its respective format from the input and delivers records back to the main function:

def read(stream, parsefunc):
    for record in parsefunc(stream):
        do_stuff(record)

where parsefunc is something like:

def parsefunc(stream):
    while not eof(stream):
        rec = read_record(stream)
        do some stuff
        yield rec

The problem I'm facing is that while parsefunc can throw an exception (e.g. when reading from a stream), it has no idea how to handle it. The function responsible for handling exceptions is the main read function. Note that exceptions occur on a per-record basis, so even if one record fails, the generator should continue its work and yield records back until the whole stream is exhausted.

In the previous question I tried to put next(parsefunc) in a try block, but as turned out, this is not going to work. So I have to add try-except to the parsefunc itself and then somehow deliver exceptions to the consumer:

def parsefunc(stream):
    while not eof(stream):
        try:
            rec = read_record()
            yield rec
        except Exception as e:
            ?????

I'm rather reluctant to do this because

  • it makes no sense to use try in a function that isn't intended to handle any exceptions
  • it's unclear to me how to pass exceptions to the consuming function
  • there going to be many formats and many parsefunc's, I don't want to clutter them with too much helper code.

Has anyone suggestions for a better architecture?

A note for googlers: in addition to the top answer, pay attention to senderle's and Jon's posts - very smart and insightful stuff.

回答1:

You can return a tuple of record and exception in the parsefunc and let the consumer function decide what to do with the exception:

import random

def get_record(line):
  num = random.randint(0, 3)
  if num == 3:
    raise Exception("3 means danger")
  return line


def parsefunc(stream):
  for line in stream:
    try:
      rec = get_record(line)
    except Exception as e:
      yield (None, e)
    else:
      yield (rec, None)

if __name__ == '__main__':
  with open('temp.txt') as f:
    for rec, e in parsefunc(f):
      if e:
        print "Got an exception %s" % e
      else:
        print "Got a record %s" % rec


回答2:

Thinking deeper about what would happen in a more complex case kind of vindicates the Python choice of avoiding bubbling exceptions out of a generator.

If I got an I/O error from a stream object the odds of simply being able to recover and continue reading, without the structures local to the generator being reset in some way, would be low. I would somehow have to reconcile myself with the reading process in order to continue: skip garbage, push back partial data, reset some incomplete internal tracking structure, etc.

Only the generator has enough context to do that properly. Even if you could keep the generator context, having the outer block handle the exceptions would totally flout the Law of Demeter. All the important information that the surrounding block needs to reset and move on is in local variables of the generator function! And getting or passing that information, though possible, is disgusting.

The resulting exception would almost always be thrown after cleaning up, in which case the reader-generator will already have an internal exception block. Trying very hard to maintain this cleanliness in the brain-dead-simple case only to have it break down in almost every realistic context would be silly. So just have the try in the generator, you are going to need the body of the except block anyway, in any complex case.

It would be nice if exceptional conditions could look like exceptions, though, and not like return values. So I would add an intermediate adapter to allow for this: The generator would yield either data or exceptions and the adapter would re-raise the exception if applicable. The adapter should be called first-thing inside the for loop, so that we have the option of catching it within the loop and cleaning up to continue, or breaking out of the loop to catch it and and abandon the process. And we should put some kind of lame wrapper around the setup to indicate that tricks are afoot, and to force the adapter to get called if the function is adapting.

That way each layer is presented errors that it has the context to handle, at the expense of the adapter being a tiny bit intrusive (and perhaps also easy to forget).

So we would have:

def read(stream, parsefunc):
  try:
    for source in frozen(parsefunc(stream)):
      try:
        record = source.thaw()
        do_stuff(record)
      except Exception, e:
        log_error(e)
        if not is_recoverable(e):
          raise
        recover()
  except Exception, e:
    properly_give_up()
  wrap_up()

(Where the two try blocks are optional.)

The adapter looks like:

class Frozen(object):
  def __init__(self, item):
    self.value = item
  def thaw(self):
    if isinstance(value, Exception):
      raise value
    return value

def frozen(generator):
    for item in generator:
       yield Frozen(item)

And parsefunc looks like:

def parsefunc(stream):
  while not eof(stream):
    try:
       rec = read_record(stream)
       do_some_stuff()
       yield rec
    except Exception, e:
       properly_skip_record_or_prepare_retry()
       yield e

To make it harder to forget the adapter, we could also change frozen from a function to a decorator on parsefunc.

def frozen_results(func):
  def freezer(__func = func, *args, **kw):
    for item in __func(*args, **kw):
       yield Frozen(item)
  return freezer

In which case we we would declare:

@frozen_results
def parsefunc(stream):
  ...

And we would obviously not bother to declare frozen, or wrap it around the call to parsefunc.



回答3:

Without knowing more about the system, I think it's difficult to tell what approach will work best. However, one option that no one has suggested yet would be to use a callback. Given that only read knows how to deal with exceptions, might something like this work?

def read(stream, parsefunc):
    some_closure_data = {}

    def error_callback_1(e):
        manipulate(some_closure_data, e)
    def error_callback_2(e):
        transform(some_closure_data, e)

    for record in parsefunc(stream, error_callback_1):
        do_stuff(record)

Then, in parsefunc:

def parsefunc(stream, error_callback):
    while not eof(stream):
        try:
            rec = read_record()
            yield rec
        except Exception as e:
            error_callback(e)

I used a closure over a mutable local here; you could also define a class. Note also that you can access the traceback info via sys.exc_info() inside the callback.

Another interesting approach might be to use send. This would work a little differently; basically, instead of defining a callback, read could check the result of yield, do a lot of complex logic, and send a substitute value, which the generator would then re-yield (or do something else with). This is a bit more exotic, but I thought I'd mention it in case it's useful:

>>> def parsefunc(it):
...     default = None
...     for x in it:
...         try:
...             rec = float(x)
...         except ValueError as e:
...             default = yield e
...             yield default
...         else:
...             yield rec
... 
>>> parsed_values = parsefunc(['4', '6', '5', '5h', '22', '7'])
>>> for x in parsed_values:
...     if isinstance(x, ValueError):
...         x = parsed_values.send(0.0)
...     print x
... 
4.0
6.0
5.0
0.0
22.0
7.0

On it's own this is a bit useless ("Why not just print the default directly from read?" you might ask), but you could do more complex things with default inside the generator, resetting values, going back a step, and so on. You could even wait to send a callback at this point based on the error you receive. But note that sys.exc_info() is cleared as soon as the generator yields, so you'll have to send everything from sys.exc_info() if you need access to the traceback.

Here's an example of how you might combine the two options:

import string
digits = set(string.digits)

def digits_only(v):
    return ''.join(c for c in v if c in digits)

def parsefunc(it):
    default = None
    for x in it:
        try:
            rec = float(x)
        except ValueError as e:
            callback = yield e
            yield float(callback(x))
        else:
            yield rec

parsed_values = parsefunc(['4', '6', '5', '5h', '22', '7'])
for x in parsed_values:
    if isinstance(x, ValueError):
        x = parsed_values.send(digits_only)
    print x


回答4:

An example of a possible design:

from StringIO import StringIO
import csv

blah = StringIO('this,is,1\nthis,is\n')

def parse_csv(stream):
    for row in csv.reader(stream):
        try:
            yield int(row[2])
        except (IndexError, ValueError) as e:
            pass # don't yield but might need something
        # All others have to go up a level - so it wasn't parsable
        # So if it's an IOError you know why, but this needs to catch
        # exceptions potentially, just let the major ones propogate

for record in parse_csv(blah):
    print record


回答5:

I like the given answer with the Frozen stuff. Based on that idea I came up with this, solving two aspects I did not yet like. The first was the patterns needed to write it down. The second was the loss of the stack trace when yielding an exception. I tried my best to solve the first by using decorators as good as possible. I tried keeping the stack trace by using sys.exc_info() instead of the exception alone.

My generator normally (i.e. without my stuff applied) would look like this:

def generator():
  def f(i):
    return float(i) / (3 - i)
  for i in range(5):
    yield f(i)

If I can transform it into using an inner function to determine the value to yield, I can apply my method:

def generator():
  def f(i):
    return float(i) / (3 - i)
  for i in range(5):
    def generate():
      return f(i)
    yield generate()

This doesn't yet change anything and calling it like this would raise an error with a proper stack trace:

for e in generator():
  print e

Now, applying my decorators, the code would look like this:

@excepterGenerator
def generator():
  def f(i):
    return float(i) / (3 - i)
  for i in range(5):
    @excepterBlock
    def generate():
      return f(i)
    yield generate()

Not much change optically. And you still can use it the way you used the version before:

for e in generator():
  print e

And you still get a proper stack trace when calling. (Just one more frame is in there now.)

But now you also can use it like this:

it = generator()
while it:
  try:
    for e in it:
      print e
  except Exception as problem:
    print 'exc', problem

This way you can handle in the consumer any exception raised in the generator without too much syntactic hassle and without losing stack traces.

The decorators are spelled out like this:

import sys

def excepterBlock(code):
  def wrapper(*args, **kwargs):
    try:
      return (code(*args, **kwargs), None)
    except Exception:
      return (None, sys.exc_info())
  return wrapper

class Excepter(object):
  def __init__(self, generator):
    self.generator = generator
    self.running = True
  def next(self):
    try:
      v, e = self.generator.next()
    except StopIteration:
      self.running = False
      raise
    if e:
      raise e[0], e[1], e[2]
    else:
      return v
  def __iter__(self):
    return self
  def __nonzero__(self):
    return self.running

def excepterGenerator(generator):
  return lambda *args, **kwargs: Excepter(generator(*args, **kwargs))


回答6:

About your point of propagating exception from generator to consuming function, you can try to use an error code (set of error codes) to indicate the error. Though not elegant that is one approach you can think of.

For example in the below code yielding a value like -1 where you were expecting a set of positive integers would signal to the calling function that there was an error.

In [1]: def f():
  ...:     yield 1
  ...:     try:
  ...:         2/0
  ...:     except ZeroDivisionError,e:
  ...:         yield -1
  ...:     yield 3
  ...:     


In [2]: g = f()

In [3]: next(g)
Out[3]: 1

In [4]: next(g)
Out[4]: -1

In [5]: next(g)
Out[5]: 3


回答7:

Actually, generators are quite limited in several aspects. You found one: the raising of exceptions is not part of their API.

You could have a look at the Stackless Python stuff like greenlets or coroutines which offer a lot more flexibility; but diving into that is a bit out of scope here.



回答8:

(I answered the other question linked in the OP but my answer applies to this situation as well)

I have needed to solve this problem a couple of times and came upon this question after a search for what other people have done.

One option- which will probably require refactoring things a little bit- would be to simply create an error handling generator, and throw the exception in the generator (to another error handling generator) rather than raise it.

Here is what the error handling generator function might look like:

def err_handler():
    # a generator for processing errors
    while True:
        try:
            # errors are thrown to this point in function
            yield
        except Exception1:
            handle_exc1()
        except Exception2:
            handle_exc2()
        except Exception3:
            handle_exc3()
        except Exception:
            raise

An additional handler argument is provided to the parsefunc function so it has a place to put the errors:

def parsefunc(stream, handler):
    # the handler argument fixes errors/problems separately
    while not eof(stream):
        try:
            rec = read_record(stream)
            do some stuff
            yield rec
        except Exception as e:
            handler.throw(e)
    handler.close()

Now just use almost the original read function, but now with an error handler:

def read(stream, parsefunc):
    handler = err_handler()
    for record in parsefunc(stream, handler):
        do_stuff(record)

This isn't always going to be the best solution, but it's certainly an option, and relatively easy to understand.