Find max(and min) on the moving interval using pyt

2019-05-03 23:26发布

问题:

I have a array like

[5.5, 6.0, 6.0, 6.5, 6.0, 5.5, 5.5, 5.0, 4.5]. 

all numbers of this array differ by 0.5, and the maximum difference of two successive numbers is also 0.5(they can be same; as in the example). and there is a moving interval, or box, which covers, for example, 3 successive numbers, like this:

[(5.5, 6.0, 6.0), 6.5, 6.0, 5.5, 5.5, 5.0, 4.5]  # min: 5.5, max: 6.0

and the box moves toward right one by one:

[5.5, (6.0, 6.0, 6.5), 6.0, 5.5, 5.5, 5.0, 4.5]  # min: 6.0, max: 6.5

[5.5, 6.0, (6.0, 6.5, 6.0), 5.5, 5.5, 5.0, 4.5]  # min: 6.0, max: 6.5

the question is, how can I find the min and max of the numbers inside the box for each time box moves?

I can handle it when the size of box and array is small like this example, but I need to apply this to like array size 100000 and box size 10000. using my method(I calculate every max and min using for-loop for each time box passes), it took too much time(I have like 100 more array to do and need to run repeatedly). There is some time limit, so I need to run it like one calculation in 0.5 sec.

回答1:

Have a look at the rolling windows from pandas:

>>> import pandas as pd
>>> L = [5.5, 6.0, 6.0, 6.5, 6.0, 5.5, 5.5, 5.0, 4.5]
>>> a = pd.DataFrame(L)
>>> pd.rolling_max(a, 3)
     0
0  NaN
1  NaN
2  6.0
3  6.5
4  6.5
5  6.5
6  6.0
7  5.5
8  5.5
>>> pd.rolling_min(a, 3)
     0
0  NaN
1  NaN
2  5.5
3  6.0
4  6.0
5  5.5
6  5.5
7  5.0
8  4.5


回答2:

At first it seemed to me that this required a minimum of O(log(window_size)) operations per element of the big list (see my other answer). But @wim pointed me to the truly remarkable algorithm described by @adamax in this post:

Implement a queue in which push_rear(), pop_front() and get_min() are all constant time operations

Here's an implementation.

Running it on the suggested 100000 numbers with a 1000 window takes 0.6 seconds instead of the 60 seconds of the naive algorithm.

class MinMaxStack(object):

    def __init__(self):
        self.stack = []

    def push(self,val):
        if not self.stack:
            self.stack = [(val,val,val)]
        else:
            _,minimum,maximum = self.stack[-1]
            if val < minimum:
                self.stack.append((val,val,maximum))
            elif val > maximum:
                self.stack.append((val,minimum,val))
            else:
                self.stack.append((val,minimum,maximum))

    def pop(self):
        return self.stack.pop()

    def get_minimax(self):
        return self.stack[-1][1:]

    def __len__(self):
        return len(self.stack)

class RollingWindow(object):

    def __init__(self):
        self.push_stack = MinMaxStack()
        self.pop_stack = MinMaxStack()

    def push_only(self,o):
        self.push_stack.push(o)

    def push_and_pop(self,o):
        self.push_stack.push(o)
        if not self.pop_stack:
            for i in range(len(self.push_stack.stack)-1):
                self.pop_stack.push(self.push_stack.pop()[0])
            self.push_stack.pop()
        else:
            self.pop_stack.pop()

    def get_minimax(self):
        if not self.pop_stack:
            return self.push_stack.get_minimax()
        elif not self.push_stack:
            return self.pop_stack.get_minimax()
        mn1,mx1 = self.pop_stack.get_minimax()
        mn2,mx2 = self.push_stack.get_minimax()
        return min(mn1,mn2),max(mx1,mx2)



import time
import random
window = 10000
test_length = 100000
data = [random.randint(1,100) for i in range(test_length)]

s = time.time()

wr = RollingWindow()
answer1 = []
for i in range(test_length):
    if i < window:
        wr.push_only(data[i])
    else:
        wr.push_and_pop(data[i])
    answer1.append(wr.get_minimax())

print(s-time.time())

s = time.time()
answer2 = []
for i in range(test_length):
    if i+1 < window:
        current_window = i+1
    else:
        current_window = window
    answer2.append((min(data[i+1-current_window:i+1]),max(data[i+1-current_window:i+1])))

print(s-time.time())

if answer1 != answer2:
    print("Test Fail")

Some small performance improvements are possible. This version continually grows and shrinks the python list used as a stack. It is slightly faster to never shrink it and to use an end pointer, instead. But only a few percent. If you were really desperate for a few more percent you could merge the two stacks into the window class and reduce the indirection in the calls. I built an optimised version replacing the lists with collections.deque and inlining the stack code and got it down to 0.32 seconds.

If even more speed was required, this would be pretty easy to code up in C or Cython (particularly for a fixed window size), particularly if you could restrict the type of the values on the stacks.



回答3:

l = [5.5, 6.0, 6.0, 6.5, 6.0, 5.5, 5.5, 5.0, 4.5]

windoSize = 3

for i in range(0,len(l)-windowSize+1):

    print max(l[i:i+windoSize])

output:

6.0
6.5
6.5
6.5
6.0
5.5
5.5


回答4:

This is a rolling window which can be implement in pandas as the other answer shows.

If, however, you want to implement it yourself the following code will be of assistance. This code can be optimised further and could be more pythonic but it should give a good understanding of what is happening in the algorithm.

Initially the minmum and maximum values are found for the starting window. Once this is initialised we treat the sub array as a queue and only 2 values become important, the new value being added and the old value being dropped.

If the old value is a minimum or maximum we recalculated the minimum or maximum, otherwise we check if the new value is the new maximum or minimum.

def updateMinMaxValues(minVal,maxVal,val):
    if val < minVal:
        minVal = val
    if val > maxVal:
        maxVal= val
    return minVal,maxVal

values = [5.5, 6.0, 6.0, 6.5, 6.0, 5.5, 5.5, 5.0, 4.5]
windowSize = 3
minVal,maxVal = min(values[:windowSize]),max(values[:windowSize])

print(minVal,maxVal)
for stepIndex in range(windowSize,len(values)):
    oldVal,newVal = values[stepIndex-windowSize],values[stepIndex]
    if oldVal == minVal:
        minVal = min(values[stepIndex-windowSize+1:stepIndex+1])
    if oldVal == maxVal:
        maxVal = max(values[stepIndex-(windowSize)+1:stepIndex+1])
    minVal,maxVal = updateMinMaxValues(minVal,maxVal,newVal)
    print(minVal,maxVal)

results in:

5.5 6.0
6.0 6.5
6.0 6.5
5.5 6.5
5.5 6.0
5.0 5.5
4.5 5.5


回答5:

Not sure if there is a way to efficiently exploit the slow moving structure of the number stream.

I decided the best general way to do this is with Priority Queues. I've left my description of how to do that below. It is O(log(window_size)) per new number into the window.

However, the comment by wim on the original post points out that there is an O(1) algorithm, described in this post: Implement a queue in which push_rear(), pop_front() and get_min() are all constant time operations

Simply maintaining one of these which keeps the min and max is going to be the best solution by far.

But for reference here is my attempt:

Maintain a pair of Priority Queues, one for max and one for min, and add and remove an entry from each, each time. This adds quite a bit of overhead for each new entry [ O(log(window_size)) ] but it has a nice smooth behaviour per entry and good overall efficiency.

The Python heapq module is the usual way to implement a Priority Queue in Python. However, it does not directly support removing entries, or of modifying their priority. This can be done by adding a dictionary index from number to position in the queue, with no increase of computational complexity. To remove an entry you can update its number to extremely low (or high respectively) and re-heapify so it moves to the top and can be popped off.

Here's an example, that looks OK though I haven't tested it:

http://code.activestate.com/recipes/522995-priority-dict-a-priority-queue-with-updatable-prio/

You will need to disambiguate entries with the same value in the dictionary, or to keep multiple values per key, so that you can find all the instances when the time comes to remove them.