Statistical accumulator in Python

2019-03-31 19:40发布

问题:

An statistical accumulator allows one to perform incremental calculations. For instance, for computing the arithmetic mean of a stream of numbers given at arbitrary times one could make an object which keeps track of the current number of items given, n and their sum, sum. When one requests the mean, the object simply returns sum/n.

An accumulator like this allows you to compute incrementally in the sense that, when given a new number, you don't need to recompute the entire sum and count.

Similar accumulators can be written for other statistics (cf. boost library for a C++ implementation).

How would you implement accumulators in Python? The code I came up with is:

class Accumulator(object):
    """
    Used to accumulate the arithmetic mean of a stream of
    numbers. This implementation does not allow to remove items
    already accumulated, but it could easily be modified to do
    so. also, other statistics could be accumulated.
    """
    def __init__(self):
     # upon initialization, the numnber of items currently
     # accumulated (_n) and the total sum of the items acumulated
     # (_sum) are set to zero because nothing has been accumulated
     # yet.
     self._n = 0
     self._sum = 0.0

    def add(self, item):
     # the 'add' is used to add an item to this accumulator
     try:
        # try to convert the item to a float. If you are
        # successful, add the float to the current sum and
        # increase the number of accumulated items
        self._sum += float(item)
        self._n += 1
     except ValueError:
        # if you fail to convert the item to a float, simply
        # ignore the exception (pass on it and do nothing)
        pass

    @property
    def mean(self):
     # the property 'mean' returns the current mean accumulated in
     # the object
     if self._n > 0:
        # if you have more than zero items accumulated, then return
        # their artithmetic average
        return self._sum / self._n
     else:
        # if you have no items accumulated, return None (you could
        # also raise an exception)
        return None

# using the object:

# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated

# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0

# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5

# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')

Interesting design questions arise:

  1. How to make the accumulator thread-safe?
  2. How to safely remove items?
  3. How to architect in a way that allows other statistics to be plugged in easily (a factory for statistics)

回答1:

For a generalized, threadsafe higher-level function, you could use something like the following in combination with the Queue.Queue class and some other bits:

from Queue import Empty

def Accumulator(f, q, storage):
    """Yields successive values of `f` over the accumulation of `q`.

    `f` should take a single iterable as its parameter.

    `q` is a Queue.Queue or derivative.

    `storage` is a persistent sequence that provides an `append` method.
    `collections.deque` may be particularly useful, but a `list` is quite acceptable.

    >>> from Queue import Queue
    >>> from collections import deque
    >>> from threading import Thread
    >>> def mean(it):
    ...     vals = tuple(it)
    ...     return sum(it) / len(it)
    >>> value_queue = Queue()
    >>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(LastThreeAverage)
    [0, 1, 2, 4, 6, 8]
    """
    try:
        while True:
            storage.append(q.get(timeout=0.1))
            q.task_done()
            yield f(storage)
    except Empty:
        pass

This generator function evades most of its purported responsibility by delegating it to other entities:

  • It relies on Queue.Queue to supply its source elements in a thread-safe manner
  • A collections.deque object can be passed in as the value of the storage parameter; this provides, among other things, a convenient way to only use the last n (in this case 3) values
  • The function itself (in this case mean) is passed as a parameter. This will result in less-than-optimally efficient code in some cases, but is readily applied to all sorts of situations.

Note that there is a possibility of the accumulator timing out if your producer thread takes longer than 0.1 seconds per value. This is easily remedied by passing a longer timeout or by removing the timeout parameter entirely. In the latter case the function will block indefinitely at the end of the queue; this usage makes more sense in a case where it's being used in a sub thread (usually a daemon thread). Of course you can also parametrize the arguments that are passed to q.get as a fourth argument to Accumulator.

If you want to communicate end of queue, i.e. that there are no more values to come, from the producer thread (here putting_thread), you can pass and check for a sentinel value or use some other method. There is more info in this thread; I opted to write a subclass of Queue.Queue called CloseableQueue that provides a close method.

There are various other ways you could customize the behaviour of such a function, for example by limiting the queue size; this is just an example of usage.

edit

As mentioned above, this loses some efficiency because of the necessity of recalculation and also, I think, doesn't really answer your question.

A generator function can also accept values through its send method. So you can write a mean generator function like

def meangen():
    """Yields the accumulated mean of sent values.

    >>> g = meangen()
    >>> g.send(None) # Initialize the generator
    >>> g.send(4)
    4.0
    >>> g.send(10)
    7.0
    >>> g.send(-2)
    4.0
    """
    sum = yield(None)
    count = 1
    while True:
        sum += yield(sum / float(count))
        count += 1

Here the yield expression is both bringing values —the arguments to send— into the function, while simultaneously passing the calculated values out as the return value of send.

You can pass the generator returned by a call to that function to a more optimizable accumulator generator function like this one:

def EfficientAccumulator(g, q):
    """Similar to Accumulator but sends values to a generator `g`.

    >>> from Queue import Queue
    >>> from threading import Thread
    >>> value_queue = Queue()
    >>> g = meangen()
    >>> g.send(None)
    >>> mean_accumulator = EfficientAccumulator(g, value_queue)
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(mean_accumulator)
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
    """
    try:
        while True:
            yield(g.send(q.get(timeout=0.1)))
            q.task_done()
    except Empty:
        pass


回答2:

If I were doing this in Python, there are two things I would do differently:

  1. Separate out the functionality of each accumulator.
  2. Not use @property in any way you did.

For the first one, I would likely want to come up with an API for performing an accumulation, perhaps something like:

def add(self, num) # add a number
def compute(self) # compute the value of the accumulator

Then I would create a AccumulatorRegistry that holds onto these accumulators, and allows the user to call actions and add to all of them. The code may look like:

class Accumulators(object):
    _accumulator_library = {}

    def __init__(self):
        self.accumulator_library = {}
        for key, value in Accumulators._accumulator_library.items():
            self.accumulator_library[key] = value()

    @staticmethod
    def register(name, accumulator):
        Accumulators._accumulator_library[name] = accumulator

    def add(self, num):
        for accumulator in self.accumulator_library.values():
            accumulator.add(num)

    def compute(self, name):
        self.accumulator_library[name].compute()

    @staticmethod
    def register_decorator(name):
        def _inner(cls):
            Accumulators.register(name, cls)
            return cls


@Accumulators.register_decorator("Mean")
class Mean(object):
    def __init__(self):
        self.total = 0
        self.count = 0

    def add(self, num):
        self.count += 1
        self.total += num

    def compute(self):
        return self.total / float(self.count)

I should probably speak to your thread-safe question. Python's GIL protects you from a lot of threading issues. There are a few things you may way to do to protect yourself though:

  • If these objects are localized to one thread, use threading.local
  • If not, you can wrap the operations in a lock, using the with context syntax to deal with holding the lock for you.