Max in a sliding window in NumPy array
Asked Answered
P

7

18

I want to create an array which holds all the max()es of a window moving through a given numpy array. I'm sorry if this sounds confusing. I'll give an example. Input:

[ 6,4,8,7,1,4,3,5,7,2,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ]

My output with a window width of 5 shall be this:

[     8,8,8,7,7,7,7,7,7,6,6,6,6,6,6,7,7,9,9,9,9     ]

Each number shall be the max of a subarray of width 5 of the input array:

[ 6,4,8,7,1,4,3,5,7,2,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ]
  \       /                 \       /
   \     /                   \     /
    \   /                     \   /
     \ /                       \ /
[     8,8,8,7,7,7,7,7,7,6,6,6,6,6,6,7,7,9,9,9,9     ]

I did not find an out-of-the-box function within numpy which would do this (but I would not be surprised if there was one; I'm not always thinking in the terms the numpy developers thought). I considered creating a shifted 2D-version of my input:

[ [ 6,4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1 ]
  [ 4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9 ]
  [ 8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4 ]
  [ 7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3 ]
  [ 1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ] ]

Then I could apply np.max(input, 0) on this and would get my results. But this does not seem efficient in my case because both my array and my window width can be large (>1000000 entries and >100000 window width). The data would be blown up more or less by a factor of the window width.

I also considered using np.convolve() in some fashion but couldn't figure out a way to achieve my goal with it.

Any ideas how to do this efficiently?

Phosphorescent answered 7/4, 2017 at 23:39 Comment(0)
R
16

Pandas has a rolling method for both Series and DataFrames, and that could be of use here:

import pandas as pd

lst = [6,4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2]
lst1 = pd.Series(lst).rolling(5).max().dropna().tolist()

# [8.0, 8.0, 8.0, 7.0, 7.0, 8.0, 8.0, 8.0, 8.0, 8.0, 6.0, 6.0, 6.0, 6.0, 6.0, 7.0, 7.0, 9.0, 9.0, 9.0, 9.0]

For consistency, you can coerce each element of lst1 to int:

[int(x) for x in lst1]

# [8, 8, 8, 7, 7, 8, 8, 8, 8, 8, 6, 6, 6, 6, 6, 7, 7, 9, 9, 9, 9]
Refugio answered 8/4, 2017 at 0:18 Comment(3)
I found that you can rephrase your solution in a simpler manner: a = np.array(…), pd.rolling_max(a, window=5). And up to now this sounds like the best option for the sizes I'm dealing with. The strides solution of @Divakar would be faster if it worked for my sizes, though, so I'm still waiting before accepting this answer.Phosphorescent
The newer version of pandas tells me that my abbreviation will not be supported anymore in the future, so yours is the best solution.Phosphorescent
what's the runtime complexity of this?Runofthemill
B
11

Approach #1 : You could use 1D max filter from Scipy -

from scipy.ndimage.filters import maximum_filter1d

def max_filter1d_valid(a, W):
    hW = (W-1)//2 # Half window size
    return maximum_filter1d(a,size=W)[hW:-hW]

Approach #2 : Here's another approach with strides : strided_app to create a 2D shifted version as view into the array pretty efficiently and that should let us use any custom reduction operation along the second axis afterwards -

def max_filter1d_valid_strided(a, W):
    return strided_app(a, W, S=1).max(axis=1)

Runtime test -

In [55]: a = np.random.randint(0,10,(10000))

# @Abdou's solution using pandas rolling
In [56]: %timeit pd.Series(a).rolling(5).max().dropna().tolist()
1000 loops, best of 3: 999 µs per loop

In [57]: %timeit max_filter1d_valid(a, W=5)
    ...: %timeit max_filter1d_valid_strided(a, W=5)
    ...: 
10000 loops, best of 3: 90.5 µs per loop
10000 loops, best of 3: 87.9 µs per loop
Beauharnais answered 8/4, 2017 at 0:12 Comment(8)
This sounded extremely promising, comparing the performance to the pandas solution. Unfortunately, for the arrays I'm dealing with this raises a ValueError: array is too big.. Try for yourself: a = np.arange(1000000), np.lib.stride_tricks.as_strided(a, shape=(1000, len(a)-1000+1), strides=(a.strides[0], a.strides[0])). And in practice I'm going to need windows of size 100k in arrays of size 10m and larger. Do you have any workaround?Phosphorescent
@Phosphorescent Just use the scipy.ndimage.maximum_filter1d approach he presented. It is almost as fast and should be really efficient even for huge arrays.Wage
@Wage Unfortunately it's slower than the pandas rolling_max(), in my tests with sizes on the lower limit of my real sizes by a factor of ~ 2.Phosphorescent
That's interesting because on my computer maximum_filter1d is 3-4 times faster for a window size of 100k and an array size of 10m. Are you using the newest version of both packages?Wage
@Phosphorescent That a = np.arange(1000000), np.lib.stride_tricks.as_strided(a, shape=(1000, len(a)-1000+1), strides=(a.strides[0], a.strides[0])) worked fine for me. Could you report your NumPy, Pandas and Scipy versions?Beauharnais
@Beauharnais I'm using numpy 1.6.1. I tried on another machine with 1.8.2 which had no problem evaluating the as_strides term. When calling .max(0) on it, however, the calculation time I estimate from trying with smaller values would be huge (>1500s on my machine for window=100000 in 10Mio).Phosphorescent
@Beauharnais I'm using scipy 0.9.0 but also tried on another machine with version 0.13.3 (which also was way slower than other implementations). Please have a look at my answer to my question which sums up which versions of which are how fast etc. (at least as far as I can tell). I'm going to write it in a minute.Phosphorescent
In case anybody uses Approach #1, it should be noted that the array that is being returned has too many elements when W is even (although, the original question provides W = 5 and the solution works in that case). It may be better (or more generalizable) to do hW = int(math.ceil((W - 1) / 2)); return maximum_filter1d(a, size=W)[hW : hW + a.shape[0] - W + 1] which should work for both even or odd values of WFuzee
E
6

Starting in Numpy 1.20, the sliding_window_view provides a way to slide/roll through windows of elements. Windows that you can then find the max for:

from numpy.lib.stride_tricks import sliding_window_view

# values = np.array([6,4,8,7,1,4,3,5,7,2,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2])
np.max(sliding_window_view(values, window_shape = 5), axis = 1)
# array([8, 8, 8, 7, 7, 7, 7, 7, 7, 6, 6, 6, 6, 6, 6, 7, 7, 9, 9, 9, 9])

where:

  • window_shape is the size of the sliding window
  • np.max(array, axis = 1) finds the max for each sub-array

and the intermediate result of the sliding is:

sliding_window_view(values, window_shape = 5)
# array([[6, 4, 8, 7, 1],
#        [4, 8, 7, 1, 4],
#        [8, 7, 1, 4, 3],
#        ...
#        [7, 1, 9, 4, 3],
#        [1, 9, 4, 3, 2]])
Espinosa answered 25/12, 2020 at 10:43 Comment(1)
This was very helpful! For anyone wondering how to do a strided sliding window with a stride step larger than 1: np.max(sliding_window_view(values, window_shape = 5)[::stride_step], axis = 1)Blatant
P
5

I have tried several variants now and would declare the Pandas version as the winner of this performance race. I tried several variants, even using a binary tree (implemented in pure Python) for quickly computing maxes of arbitrary subranges. (Source available on demand). The best algorithm I came up with myself was a plain rolling window using a ringbuffer; the max of that only needed to be recomputed completely if the current max value was dropped from it in this iteration; otherwise it would remain or increase to the next new value. Compared with the old libraries, this pure-Python implementation was faster than the rest.

In the end I found that the version of the libraries in question was highly relevant. The rather old versions I was mainly still using were way slower than the modern versions. Here are the numbers for 1M numbers, rollingMax'ed with a window of size 100k:

         old (slow HW)           new (better HW)
scipy:   0.9.0:  21.2987391949   0.13.3:  11.5804400444
pandas:  0.7.0:  13.5896410942   0.18.1:   0.0551438331604
numpy:   1.6.1:   1.17417216301  1.8.2:    0.537392139435

Here is the implementation of the pure numpy version using a ringbuffer:

def rollingMax(a, window):
  def eachValue():
    w = a[:window].copy()
    m = w.max()
    yield m
    i = 0
    j = window
    while j < len(a):
      oldValue = w[i]
      newValue = w[i] = a[j]
      if newValue > m:
        m = newValue
      elif oldValue == m:
        m = w.max()
      yield m
      i = (i + 1) % window
      j += 1
  return np.array(list(eachValue()))

For my input this works great because I'm handling audio data with lots of peaks in all directions. If you put a constantly decreasing signal into it (e. g. -np.arange(10000000)), then you will experience the worst case (and maybe you should reverse the input and the output in such cases).

I just include this in case someone wants to do this task on a machine with old libraries.

Phosphorescent answered 11/4, 2017 at 0:56 Comment(0)
D
1

First of all, I think there is a mistake in your explanation because the 10th element of your initial imput array at the beginning of your explanation is equal to 8, and below, where you apply the window, it is 2.

After correcting that, I think that the code that does what you want is the following:

import numpy as np
a=np.array([ 6,4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ])
window=5
for i in range(0,len(a)-window,1): 
    b[i] = np.amax(a[i:i+window])

I think, this way is better than creating a shifted 2D version of your imput because when you create such a version you need to use much more memory than using the original imput array, so you may run out of memory if the input is large.

Defeatism answered 8/4, 2017 at 0:52 Comment(1)
Gosh, you're right! I changed my input in the process of writing my question in order to show more cases. I wasn't consequent on that. I fixed that by now. To your proposal: I want to avoid any Python-written loop over my input because that always is slower than using any functionality of a package like numpy, scipy, pandas or the like. If you think your solution can compete, provide timeits. Otherwise: Sure, that's straight-forward and a good solution. It just doesn't meet my performance expectations.Phosphorescent
H
0

If you have two dimension data, for example stock price and want to get rolling max or whatever, this will works. Caculating without using iteration.

n = 5  # size of rolling window

data_expanded = np.expand_dims(data, 1)
data_shift = [np.roll(data_expanded, shift=-i, axis=2) for i in range(n)]
data_shift = np.concatenate(data_shift, axis=1)

data_max = np.max(data_shift, axis=1)  # max, mean, std...
Henceforth answered 7/3, 2019 at 9:35 Comment(1)
for i in range(n) looks very suspiciously like an iteration to me. In my case n will be very large, e. g. two seconds of an audio sample with 96kHz, so n > 150000. But thanks for your contribution anyway and welcome to StackOverflow :-)Phosphorescent
W
0

I ran into this same problem and I think the existing answers overlook a major optimization opportunity. If the input is size N, and window is W, then these are all O(N * W) complexity. However computing max on these overlapping segments is highly redundant and can be reduced to O(N log(W)) with a multi-pass approach.

def sliding_max(x, win):
    cur_win = 1
    while cur_win < win:
        step = min(cur_win, win-cur_win)
        x = np.maximum(x[step:], x[:-step])
        cur_win += step
        print(f"{step=} {cur_win=} {x=}")
    return x

win = 5
x = np.array([6,4,8,7,1,4,3,5,7,2,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2])

sliding_max(x, win)

prints

step=1 cur_win=2 x=array([6, 8, 8, 7, 4, 4, 5, 7, 7, 4, 6, 6, 2, 3, 5, 6, 6, 4, 7, 7, 9, 9,
       4, 3])
step=2 cur_win=4 x=array([8, 8, 8, 7, 5, 7, 7, 7, 7, 6, 6, 6, 5, 6, 6, 6, 7, 7, 9, 9, 9, 9])
step=1 cur_win=5 x=array([8, 8, 8, 7, 7, 7, 7, 7, 7, 6, 6, 6, 6, 6, 6, 7, 7, 9, 9, 9, 9])

It scales very well:

win = 100000
x = -np.arange(10000000)

%timeit sliding_max(x, win)
> 205 ms ± 334 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

And sanity check of course:

x = np.random.randint(0, 9999, size=10000)
win = 99

y = sliding_max(x, win)

y2 = np.zeros(len(y))
for i in range(len(x)-win+1): 
    y2[i] = np.max(x[i:i+win])

assert np.all(y == y2)
Wherein answered 22/2, 2024 at 4:5 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.