How to real-time filter with scipy and lfilter?
Asked Answered
A

2

13

Disclaimer: I am probably not as good at DSP as I should be and therefore have more issues than I should have getting this code to work.

I need to filter incoming signals as they happen. I tried to make this code to work, but I have not been able to so far. Referencing scipy.signal.lfilter doc

import numpy as np
import scipy.signal
import matplotlib.pyplot as plt
from lib import fnlib

samples = 100
x = np.linspace(0, 7, samples)
y = [] # Unfiltered output
y_filt1 = [] # Real-time filtered

nyq = 0.5 * samples
f1_norm = 0.1 / nyq
f2_norm = 2 / nyq
b, a = scipy.signal.butter(2, [f1_norm, f2_norm], 'band', analog=False)
zi = scipy.signal.lfilter_zi(b,a)
zi = zi*(np.sin(0) + 0.1*np.sin(15*0))

This sets zi as zi*y[0 ] initially, which in this case is 0. I have got it from the example code in the lfilter documentation, but I am not sure if this is correct at all.

Then it comes to the point where I am not sure what to do with the few initial samples. The coefficients a and b are len(a) = 5 here. As lfilter takes input values from now to n-4, do I pad it with zeroes, or do I need to wait until 5 samples have gone by and take them as a single bloc, then continuously sample each next step in the same way?

for i in range(0, len(a)-1): # Append 0 as initial values, wrong?
    y.append(0)

step = 0
for i in xrange(0, samples): #x:
    tmp = np.sin(x[i]) + 0.1*np.sin(15*x[i])
    y.append(tmp)

    # What to do with the inital filterings until len(y) ==  len(a) ?

    if (step> len(a)):
        y_filt, zi = scipy.signal.lfilter(b, a, y[-len(a):], axis=-1, zi=zi)
        y_filt1.append(y_filt[4])

print(len(y))
y = y[4:]
print(len(y))
y_filt2 = scipy.signal.lfilter(b, a, y) # Offline filtered

plt.plot(x, y, x, y_filt1, x, y_filt2)
plt.show()
Assiut answered 8/11, 2016 at 9:41 Comment(3)
Have you found a solution after all?Kirkuk
Yes, I solved this at a certain point over a year ago. I can see if I have the time to dig it back up, as there are a bunch of details that I've forgotten.Assiut
The difference in output has nothing to do with block size. It shouldn't matter how many samples you feed in each iteration through the loop. In the real-time version (second block of code) the state of the filter isn't initialized. In the offline version (first block of code) it is initialized using lfilter_zi.Cyclopentane
S
13

I think I had the same problem, and found a solution on https://github.com/scipy/scipy/issues/5116:

from scipy import zeros, signal, random

def filter_sbs():
    data = random.random(2000)
    b = signal.firwin(150, 0.004)
    z = signal.lfilter_zi(b, 1) * data[0]
    result = zeros(data.size)
    for i, x in enumerate(data):
        result[i], z = signal.lfilter(b, 1, [x], zi=z)
    return result

if __name__ == '__main__':
    result = filter_sbs()

The idea is to pass the filter state z in each subsequent call to lfilter. For the first few samples the filter may give strange results, but later (depending on the filter length) it starts to behave correctly.

Shush answered 30/4, 2018 at 21:20 Comment(8)
I don't think this is right. The output should be exactly the same as if the whole data set were filtered in one go: result = signal.lfilter(b,1,data). There should be no 'strange' first few values. This was the heart of the question.Cyclopentane
@Cyclopentane : Thanks for the comment! The code in the answer gives exactly the same result as signal.lfilter(b, 1, data, zi=z). The zi is a matter of choice, yet it should ensure results[0] == data[0] (see lfilter_zi). Without, or with z=zeros(b.size-1) results[0] will be close to 0. And I still argue we will always get 'strange' first values, as for the first values of result, the filter can only look at the first values of data, and thus hasn't got enough samples to correctly filter.Shush
It is absolutely incorrect to say that 'the filter ... hasn't got enough samples to correctly filter'. By definition, a DFII filter (which is what lfilter is) requires exactly n>0 input samples to compute the correct output. If its state is undefined, it will fail, but that is a separate issue. I don't understand what you mean by 'strange'. LTI systems compute output from input and internal state. There is never ever anything 'strange' about the linear combination of matrices. That is just math and no one can argue with that (the uncertainty principle not withstanding).Cyclopentane
Also, zi is not really a matter of 'choice'. There must be a reason to choose values for zi. If you the system is causal, you must initialize the filter with 0 because you can't see the future. In your loop, you initialize your filter with its previous state, which you know because it already happened. I don't see why you would choose the initial state to be a half cosine before supplying the input.Cyclopentane
Oh, I see now. lfilter_zi computes an initial steady state response based on the filter coefficients. Yes, this is a very good choice to initialize the filter with, but the OP does this only for the offline version, hence the discrepancy in output.Cyclopentane
I updated my answer. Hopefully it is more clear.Cyclopentane
Great, it is much clearer now. As for the 'not enough samples to filter': If you e.g. use an average filter and have only one sample to average, it will not know any better than returning the first value. The filter needs a certain number of samples for being able to work. So it is not a mathematical issue, it is a sample size issue.Shush
No. This is not correct. The filter has state. This is what needs to be initialized. If you have the filter: y(n) = .5x(n) + .5x(n-1) which averages each successive two samples, in the DFTII implementation x(n-1) is the filter's state---perhaps initialized to 0. I am sorry, but you are not presenting digital filter theory correctly in your comment. The statement that you need a certain number of input samples for the filter to be 'able to work' is simply wrong.Cyclopentane
C
3

The problem is not how you are buffering the input. The problem is that in the 'offline' version, the state of the filter is initialized using lfilter_zi which computes the internal state of an LTI so that the output will already be in steady-state when new samples arrive at the input. In the 'real-time' version, you skip this so that the filter's initial state is 0. You can either initialize both versions to using lfilter_zi or else initialize both to 0. Then, it doesn't matter how many samples you filter at a time.

Note, if you initialize to 0, the filter will 'ring' for a certain amount of time before reaching a steady state. In the case of FIR filters, there is an analytic solution for determining this time. For many IIR filters, there is not.

The following is correct. For simplicity's sake I initialize to 0 and feed the input one sample at a time. However, any non-zero block size will produce equivalent output.

from scipy import signal, random
from numpy import zeros

def filter_sbs(data, b):
    z = zeros(b.size-1)
    result = zeros(data.size)
    for i, x in enumerate(data):
        result[i], z = signal.lfilter(b, 1, [x], zi=z)
    return result
    
def filter(data, b):
    result = signal.lfilter(b,1,data)
    return result

if __name__ == '__main__':
    data = random.random(20000)
    b = signal.firwin(150, 0.004)
    result1 = filter_sbs(data, b)
    result2 = filter(data, b)
    print(result1 - result2)

Output:

[ 0.00000000e+00  0.00000000e+00  0.00000000e+00 ... -5.55111512e-17
  0.00000000e+00  1.66533454e-16]
Cyclopentane answered 3/6, 2021 at 3:43 Comment(3)
See comment above.Shush
The math is correct but insufficient. If a transient doesn't matter, use lfilter without the ziparameter (that is, initialized to zero) and wait. If it does matter and you can guess the average of the signal, you may use zi=lfilter_zi. This sets the filter to a state it would have after a long run of the average value. Alternatively, you could subtract the guessed average before the filter and add it to the filter output. The latter approach would also work if you know more about the signal, e.g. the harmonic pattern in the question. Use a Kalman filter if interested in uncertainties.Catenary
@Catenary you need to use zi=z in order to store the filter state between successive calls. The backend calls for lfilter (github.com/scipy/scipy/blob/v1.11.1/scipy/signal/…) are all static C functions. By definition they maintain no state. The issue of dealing with a transient has nothing to do with this post (neither does adaptive filtering). The OP is trying to match the output of a real-time filtering process to a non real-time one. That means filtering chunks of input samples, not the entire array all at once. For this, zi=z is necessary.Cyclopentane

© 2022 - 2024 — McMap. All rights reserved.