Efficient Rolling Max and Min Window

2020-08-22 03:51发布

问题:

I want to calculate a rolling maximum and minimum value efficiently. Meaning anything better than recalculating the maximum/minimum from all the values in use every time the window moves.

There was a post on here that asked the same thing and someone posted a solution involving some kind of stack approach that supposedly worked based on its rating. However I can't find it again for the life of me.

Any help would be appreciated in finding a solution or the post. Thank you all!

回答1:

The algorithm you want to use is called the ascending minima (C++ implementation).

To do this in C#, you will want to get a double ended queue class, and a good one exists on NuGet under the name Nito.Deque.

I have written a quick C# implementation using Nito.Deque, but I have only briefly checked it, and did it from my head so it may be wrong!

public static class AscendingMinima
{
    private struct MinimaValue
    {
        public int RemoveIndex { get; set; }
        public double Value { get; set; }
    }

    public static double[] GetMin(this double[] input, int window)
    {
        var queue = new Deque<MinimaValue>();
        var result = new double[input.Length];

        for (int i = 0; i < input.Length; i++)
        {
            var val = input[i];

            // Note: in Nito.Deque, queue[0] is the front
            while (queue.Count > 0 && i >= queue[0].RemoveIndex)
                queue.RemoveFromFront();

            while (queue.Count > 0 && queue[queue.Count - 1].Value >= val)
                queue.RemoveFromBack();

            queue.AddToBack(new MinimaValue{RemoveIndex = i + window, Value = val });

            result[i] = queue[0].Value;
        }

        return result;
    }
}


回答2:

Here's one way to do it more efficiently. You still have to calculate the value occasionally but, other than certain degenerate data (ever decreasing values), that's minimised in this solution.

We'll limit ourselves to the maximum to simplify things but it's simple to extend to a minimum as well.

All you need is the following:

  • The window itself, initially empty.
  • The current maximum (max), initially any value.
  • The count of the current maximum (maxcount), initially zero.

The idea is to use max and maxcount as a cache for holding the current maximum. Where the cache is valid, you only need to return the value in it, a very fast constant-time operation.

If the cache is invalid when you ask for the maximum, it populates the cache and then returns that value. This is slower than the method in the previous paragraph but subsequent requests for the maximum once the cache is valid again use that faster method.

Here's what you do for maintaining the window and associated data:

  1. Get the next value N.

  2. If the window is full, remove the earliest entry M. If maxcount is greater than 0 and M is equal to max, decrement maxcount. Once maxcount reaches 0, the cache is invalid but we don't need to worry about that until such time the user requests the maximum value (there's no point repopulating the cache until then).

  3. Add N to the rolling window.

  4. If the window size is now 1 (that N is the only current entry), set max to N and maxcount to 1, then go back to step 1.

  5. If maxcount is greater than 0 and N is greater than max, set max to N and maxcount to 1, then go back to step 1.

  6. If maxcount is greater than 0 and N is equal to max, increment maxcount.

  7. Go back to step 1.

Now, at any point while that window management is going on, you may request the maximum value. This is a separate operation, distinct from the window management itself. This can be done using the following rules in sequence.

  1. If the window is empty, there is no maximum: raise an exception or return some sensible sentinel value.

  2. If maxcount is greater than 0, then the cache is valid: simply return max.

  3. Otherwise, the cache needs to be repopulated. Go through the entire list, setting up max and maxcount as per the code snippet below.


set max to window[0], maxcount to 0
for each x in window[]:
    if x > max:
        set max to x, maxcount to 1
    else:
        if x == max:
            increment maxcount

The fact that you mostly maintain a cache of the maximum value and only recalculate when needed makes this a much more efficient solution than simply recalculating blindly whenever an entry is added.

For some definite statistics, I created the following Python program. It uses a sliding window of size 25 and uses random numbers from 0 to 999 inclusive (you can play with these properties to see how they affect the outcome).

First some initialisation code. Note the stat variables, they'll be used to count cache hits and misses:

import random

window = []
max = 0
maxcount = 0
maxwin = 25

statCache = 0
statNonCache = 0

Then the function to add a number to the window, as per my description above:

def addNum(n):
    global window
    global max
    global maxcount
    if len(window) == maxwin:
        m = window[0]
        window = window[1:]
        if maxcount > 0 and m == max:
            maxcount = maxcount - 1

    window.append(n)

    if len(window) == 1:
        max = n
        maxcount = 1
        return

    if maxcount > 0 and n > max:
        max = n
        maxcount = 1
        return

    if maxcount > 0 and n == max:
        maxcount = maxcount + 1

Next, the code which returns the maximum value from the window:

def getMax():
    global max
    global maxcount
    global statCache
    global statNonCache

    if len(window) == 0:
        return None

    if maxcount > 0:
        statCache = statCache + 1
        return max

    max = window[0]
    maxcount = 0
    for val in window:
        if val > max:
            max = val
            maxcount = 1
        else:
            if val == max:
                maxcount = maxcount + 1
    statNonCache = statNonCache + 1

    return max

And, finally, the test harness:

random.seed()
for i in range(1000000):
    val = int(1000 * random.random())
    addNum(val)
    newmax = getMax()

print("%d cached, %d non-cached"%(statCache,statNonCache))

Note that the test harness attempts to get the maximum for every time you add a number to the window. In practice, this may not be needed. In other words, this is the worst-case scenario for the random data generated.


Running that program a few times for pseudo-statistical purposes, we get (formatted and analysed for reporting purposes):

 960579 cached,  39421 non-cached
 960373 cached,  39627 non-cached
 960395 cached,  39605 non-cached
 960348 cached,  39652 non-cached
 960441 cached,  39559 non-cached
 960602 cached,  39398 non-cached
 960561 cached,  39439 non-cached
 960463 cached,  39537 non-cached
 960409 cached,  39591 non-cached
 960798 cached,  39202 non-cached
=======         ======
9604969         395031

So you can see that, on average for random data, only about 3.95% of the cases resulted in a calculation hit (cache miss). The vast majority used the cached values. That should be substantially better than having to recalculate the maximum on every insertion into the window.

Some things that will affect that percentage will be:

  • The window size. Larger sizes means that there's more likelihood of a cache hit, improving the percentage. For example, doubling the window size pretty much halved the cache misses (to 1.95%).
  • The range of possible values. Less choice here means that there's more likely to be cache hits in the window. For example, reducing the range from 0..999 to 0..9 gave a big improvement in reducing cache misses (0.85%).


回答3:

I'm assuming that by "window" you mean a range a[start] to a[start + len], and that start moves along. Consider the minimal value, the maximal is similar, and the move to the window a[start + 1] to a[start + len + 1]. Then the minimal value of the window will change only if (a) a[start + len + 1] < min (a smaller value came in), or (b) a[start] == min (one of the smallest values just left; recompute the minimum).

Another, possibly more efficient way of doing this, is to fill a priority queue with the first window, and update with each value entering/leaving, but I don't think that is much better (priority queues aren't suited to "pick out random element from the middle" (what you need to do when advancing the window). And the code will be much more complex. Better stick to the simple solution until proven that the performance isn't acceptable, and that this code is responsible for (much of) the resource consumption.



回答4:

After writing my own algo yesterday, and asking for improvements, I was referred here. Indeed this algo is more elegant. I'm not sure it offers constant speed calc regardless of the window size, but regardless, I tested the performance vs my own caching algo (fairly simple, and probably uses the same idea as others have proposed). the caching is 8-15 times faster (tested with rolling windows of 5,50,300,1000 I don't need more). below are both alternatives with stopwatches and result validation.

static class Program
{
    static Random r = new Random();
    static int Window = 50; //(small to facilitate visual functional test). eventually could be 100 1000, but not more than 5000.
    const int FullDataSize =1000;
    static double[] InputArr = new double[FullDataSize]; //array prefilled with the random input data.

    //====================== Caching algo variables
    static double Low = 0;
    static int LowLocation = 0;
    static int CurrentLocation = 0;
    static double[] Result1 = new double[FullDataSize]; //contains the caching mimimum result
    static int i1; //incrementor, just to store the result back to the array. In real life, the result is not even stored back to array.

    //====================== Ascending Minima algo variables
    static double[] Result2 = new double[FullDataSize]; //contains ascending miminum result.
    static double[] RollWinArray = new double[Window]; //array for the caching algo
    static Deque<MinimaValue> RollWinDeque = new Deque<MinimaValue>(); //Niro.Deque nuget.
    static int i2; //used by the struct of the Deque (not just for result storage)


    //====================================== my initialy proposed caching algo
    static void CalcCachingMin(double currentNum)
    {
        RollWinArray[CurrentLocation] = currentNum;
        if (currentNum <= Low)
        {
            LowLocation = CurrentLocation;
            Low = currentNum;
        }
        else if (CurrentLocation == LowLocation)
            ReFindHighest();

        CurrentLocation++;
        if (CurrentLocation == Window) CurrentLocation = 0; //this is faster
        //CurrentLocation = CurrentLocation % Window; //this is slower, still over 10 fold faster than ascending minima

        Result1[i1++] = Low;
    }

    //full iteration run each time lowest is overwritten.
    static void ReFindHighest()
    {
        Low = RollWinArray[0];
        LowLocation = 0; //bug fix. missing from initial version.
        for (int i = 1; i < Window; i++)
            if (RollWinArray[i] < Low)
            {
                Low = RollWinArray[i];
                LowLocation = i;
            }
    }

    //======================================= Ascending Minima algo based on http://stackoverflow.com/a/14823809/2381899 
    private struct MinimaValue
    {
        public int RemoveIndex { get; set; }
        public double Value { get; set; }
    }

    public static void CalcAscendingMinima (double newNum)
    { //same algo as the extension method below, but used on external arrays, and fed with 1 data point at a time like in the projected real time app.
            while (RollWinDeque.Count > 0 && i2 >= RollWinDeque[0].RemoveIndex)
                RollWinDeque.RemoveFromFront();
            while (RollWinDeque.Count > 0 && RollWinDeque[RollWinDeque.Count - 1].Value >= newNum)
                RollWinDeque.RemoveFromBack();
            RollWinDeque.AddToBack(new MinimaValue { RemoveIndex = i2 + Window, Value = newNum });
            Result2[i2++] = RollWinDeque[0].Value;
    }

    public static double[] GetMin(this double[] input, int window)
    {   //this is the initial method extesion for ascending mimima 
        //taken from http://stackoverflow.com/a/14823809/2381899
        var queue = new Deque<MinimaValue>();
        var result = new double[input.Length];

        for (int i = 0; i < input.Length; i++)
        {
            var val = input[i];

            // Note: in Nito.Deque, queue[0] is the front
            while (queue.Count > 0 && i >= queue[0].RemoveIndex)
                queue.RemoveFromFront();

            while (queue.Count > 0 && queue[queue.Count - 1].Value >= val)
                queue.RemoveFromBack();

            queue.AddToBack(new MinimaValue { RemoveIndex = i + window, Value = val });

            result[i] = queue[0].Value;
        }

        return result;
    }

    //============================================ Test program.
    static void Main(string[] args)
    { //this it the test program. 
        //it runs several attempts of both algos on the same data.
        for (int j = 0; j < 10; j++)
        {
            Low = 12000;
            for (int i = 0; i < Window; i++)
                RollWinArray[i] = 10000000;
            //Fill the data + functional test - generate 100 numbers and check them in as you go:
            InputArr[0] = 12000;
            for (int i = 1; i < FullDataSize; i++) //fill the Input array with random data.
                //InputArr[i] = r.Next(100) + 11000;//simple data.
                InputArr[i] = InputArr[i - 1] + r.NextDouble() - 0.5; //brownian motion data.

            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            for (int i = 0; i < FullDataSize; i++) //run the Caching algo.
                CalcCachingMin(InputArr[i]);

            stopwatch.Stop();
            Console.WriteLine("Caching  : " + stopwatch.ElapsedTicks + " mS: " + stopwatch.ElapsedMilliseconds);
            stopwatch.Reset();


            stopwatch.Start();
            for (int i = 0; i < FullDataSize; i++) //run the Ascending Minima algo
                CalcAscendingMinima(InputArr[i]);

            stopwatch.Stop();
            Console.WriteLine("AscMimima: " + stopwatch.ElapsedTicks + " mS: " + stopwatch.ElapsedMilliseconds);
            stopwatch.Reset();

            i1 = 0; i2 = 0; RollWinDeque.Clear();
        }

        for (int i = 0; i < FullDataSize; i++) //test the results.
            if (Result2[i] != Result1[i]) //this is a test that algos are valid. Errors (mismatches) are printed.
                Console.WriteLine("Current:" + InputArr[i].ToString("#.00") + "\tLowest of " + Window + "last is " + Result1[i].ToString("#.00") + " " + Result2[i].ToString("#.00") + "\t" + (Result1[i] == Result2[i])); //for validation purposes only.

        Console.ReadLine();
    }


}


回答5:

I would suggest maintaining a stack which supports getMin() or getMax().

This can be done with two stacks and costs only constant time.

fyi: https://www.geeksforgeeks.org/design-a-stack-that-supports-getmin-in-o1-time-and-o1-extra-space/