ring buffer with numpy/ctypes
Asked Answered
E

4

7

I'm developing a client which will receive the [EEG] data over tcp and write it to the ring buffer. I thought it can be very convenient to have the buffer as a ctypes or numpy array because it's possible to create a numpy 'view' to any location of such buffer and read/write/process the data without any copying operations. Or is it a bad idea in general?

However, I don't see how to implement a circular buffer of a fixed size this way. Suppose I have created a buffer object which is contiguous in memory. What is the best way to write the data when the end of the buffer is reached?

One possible way is to start overwriting the (already old) bytes from the begining when the write pointer reaches the end of the buffer array. Near the boundaries, however, the numpy view of some chunk (for processing) can't be created (or can it?) in this case, because some of it can still be located in the end of the buffer array while another already in its begining. I've read it's impossible to create such circular slices. How to solve this?

UPD: Thanks everybody for the answers. In case somebody also faces the same problem, here's the final code I've got.

Egesta answered 18/1, 2012 at 11:3 Comment(1)
I've written the numpy_ringbuffer package to solve this problem, which provides a deque-like interface to an underlying bufferAmoral
T
7

If you need a window of N bytes, make your buffer 2*N bytes and write all input to two locations: i % N and i % N + N, where i is a byte counter. That way you always have N consecutive bytes in the buffer.

data = 'Data to buffer'
N = 4
buf = 2*N*['\00']

for i,c in enumerate(data):
    j = i % N
    buf[j] = c
    buf[j+N] = c
    if i >= N-1:
        print ''.join(buf[j+1:j+N+1]) 

prints

Data
ata 
ta t
a to
 to 
to b
o bu
 buf
buff
uffe
ffer
Terat answered 18/1, 2012 at 17:54 Comment(2)
yepp. This is what I'm trying to write now. Instead of 2*N buffer I'm having a one of some arbitrary length + N and follow the same idea. Thanks anyway!Egesta
This is fine if performance is not a concern at all; but I doubt it given your application. You are probably better off using a vectorized solution to your problemArmillas
A
4

I think you need to take a step back from C-style thinking here. Updating a ringbuffer for every single insertion is never going to be efficient. A ring-buffer is fundamentally different from the contiguous memory block interface that numpy arrays demand; including the fft you mention you want to do.

A natural solution is to sacrifice a little bit of memory for the sake of performance. For instance, if the number of elements you need to hold in your buffer is N, allocate an array of N+1024 (or some sensible number). Then you only need to move N elements around every 1024 insertions, and you always have a contiguous view of N elements to act upon directly available.

EDIT: here is a code snippet that implements the above, and should give good performance. Note though, that you would be well advised to append in chunks, rather than per element. Otherwise, the performance advantages of using numpy are quickly nullified, regardless of how you implement your ringbuffer.

import numpy as np

class RingBuffer(object):
    def __init__(self, size, padding=None):
        self.size = size
        self.padding = size if padding is None else padding
        self.buffer = np.zeros(self.size+self.padding)
        self.counter = 0

    def append(self, data):
        """this is an O(n) operation"""
        data = data[-self.padding:]
        n = len(data)
        if self.remaining < n: self.compact()
        self.buffer[self.counter+self.size:][:n] = data
        self.counter += n

    @property
    def remaining(self):
        return self.padding-self.counter
    @property
    def view(self):
        """this is always an O(1) operation"""
        return self.buffer[self.counter:][:self.size]
    def compact(self):
        """
        note: only when this function is called, is an O(size) performance hit incurred,
        and this cost is amortized over the whole padding space
        """
        print 'compacting'
        self.buffer[:self.size] = self.view
        self.counter = 0

rb = RingBuffer(10)
for i in range(4):
    rb.append([1,2,3])
    print rb.view

rb.append(np.arange(15))
print rb.view  #test overflow
Armillas answered 20/7, 2014 at 20:40 Comment(2)
Thanks Eelco. So an array of views simply doesn't work ? But how is that detected, and the whole Aptr replaced by a flat copy ? Aptr[0].flags has OWNDATA False, confusing.Cinerama
Im not sure what you mean by an array of views; views are easy to construct on the fly, so you don't really need to keep them around in an array. The crux of the matter is, that if you seek to use your data as a numpy array at any point, it needs to lie in memory contiguously. Either you incur an O(N) performance hit every time you need to access your ringbuffer contiguously after an append, or you need to allocate some extra memory, so you can delay and amortize such operations as required to keep your memory contiguous.Armillas
D
2

One possible way is to start overwriting the (already old) bytes from the begining when the write pointer reaches the end of the buffer array.

That's the only option in a fixed-size ring buffer.

I've read it's impossible to create such circular slices.

Which is why I wouldn't do this with a Numpy view. You can create a class wrapper around an ndarray instead, holding the buffer/array, the capacity and a pointer (index) to the insertion point. If you want to get the contents as a Numpy array, you'll have to make a copy like so:

buf = np.array([1,2,3,4])
indices = [3,0,1,2]
contents = buf[indices]    # copy

You can still set elements' values in-place if you implement __setitem__ and __setslice__.

Dinsdale answered 18/1, 2012 at 11:22 Comment(3)
thanks. But, if such chunk has to be copied anyways, won't it then be better to use collections.deque as a buffer instead and then do numpy.array(list(itertools.islice(buf,chstart,chend)))? Or is it much slower?Egesta
I wanted to avoid copying because doing sliding window FFT on that data will mean copying almost the same chunk of data every time a new datapoint arrivesEgesta
@dmytro: you'll have to measure whether deque is faster. I'm afraid it won't be easy to avoid copying if you want to get ring buffer-stored data into an array.Dinsdale
C
-2

A variant of @Janne Karila's answer, for C but not numpy:
If the ring buffer is very wide, like N x 1G, then instead of doubling the whole thing, double up an array of 2*N pointers to its rows. E.g. for N=3, initialize

bufp = { buf[0], buf[1], buf[2], buf[0], buf[1], buf[2] };

Then you write data only once, and anyfunc( bufp[j:j+3] ) sees the rows in buf in time order.

Cinerama answered 20/7, 2014 at 10:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.