efficient circular buffer?
Asked Answered
G

15

154

I want to create an efficient circular buffer in python (with the goal of taking averages of the integer values in the buffer).

Is this an efficient way to use a list to collect values?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

What would be more efficient (and why)?

Greysun answered 11/11, 2010 at 4:17 Comment(1)
This is not an efficient way to implement circular buffer because pop(0) is O(n) operation in the list. pop(0) removes the first element in the list and all elements have to be shifted to the left. Use collections.deque with maxlen attribute instead. deque has O(1) operation for append and pop.Sofa
F
292

I would use collections.deque with a maxlen arg

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

There is a recipe in the docs for deque that is similar to what you want. My assertion that it's the most efficient rests entirely on the fact that it's implemented in C by an incredibly skilled crew that is in the habit of cranking out top notch code.

Feldspar answered 11/11, 2010 at 4:29 Comment(7)
+1 Yes it's the nice batteries included way. Operations for the circular buffer are O(1) and as you say the extra overhead is in C, so should still be quite fastSlosberg
I don't like this solution because the docs doesn't guarantee O(1) random access when maxlen is defined. O(n) is understandable when the deque can grow to infinity, but if maxlen is given, indexing an element should be constant time.Barricade
My guess is that its implemented as a linked list and not an array.Shrubbery
Seems about right, if the timings in my answer below are correct.Federative
@Ivella if maxlen is given, then n has a fixed bound, therefore everything by definition is constant time ;)Expulsion
@Expulsion O(maxlen) is no different from O(n). Just a different variable name used for the same idea. Hence it's not constant time. The access time (in the worst case) is roughly proportional to maxlen, which is runtime-controlled (the deque is allocated at runtime). What Ivella was referring to is that it's possible (in theory) to vary the implementation and keep using a linked list for the unbounded case and for the bounded case allocate a fixed contiguous chunk of memory. However, that would potentially waste memory if the fill level commonly stays significantly below the bound.Ricer
The actual python implementation (see here: github.com/python/cpython/blob/main/Modules/… ) uses a hybrid of both ideas: a linked list of fixed size chunks. I.e. it splits the deque up into equally sized pieces of memory, internally. That will alleviate some of the cost of a purely linked list implementation, but the access time for inner items of a long deque is still O(maxlen) (a.k.a. O(n))Ricer
F
34

Although there are already a great number of great answers here, I could not find any direct comparison of timings for the options mentioned. Therefore, please find my humble attempt at a comparison below.

For testing purposes only, the class can switch between a list-based buffer, a collections.deque-based buffer, and a Numpy.roll-based buffer.

Note that the update method adds only one value at a time, to keep it simple.

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            try:
                self.content.append(scalar)
                self.content.pop(0)
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            try:
                self.content.append(scalar)
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
                                                 maxlen=self.size)
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
            try:
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                   buffer_method=method)
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)

On my system this yields:

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)
Federative answered 12/4, 2018 at 15:14 Comment(0)
S
16

popping from the head of a list causes the whole list to be copied, so is inefficient

You should instead use a list/array of fixed size and an index which moves through the buffer as you add/remove items

Slosberg answered 11/11, 2010 at 4:28 Comment(4)
Agree. No matter how elegant or inelegant it may look or whatever language is used. In reality, the less you bother garbage collector (or heap manager or paging/mapping mechanisms or whatever does actual memory magic) the better.Doty
@RocketSurgeon It's not magic, it's just that it's an array whose first element is deleted. So for an array of size n this means n-1 copy operations. No garbage collector or similar device is involved here.Preponderate
I agree. Doing so is also much easier than some people think. Just use an ever increasing counter, and use the modulo operator (% arraylen) when accessing the item.Yila
idem, you may check my post above, that is how I did itTrig
R
14

Based on MoonCactus's answer, here is a circularlist class. The difference with his version is that here c[0] will always give the oldest-appended element, c[-1] the latest-appended element, c[-2] the penultimate... This is more natural for applications.

c = circularlist(4)
c.append(1); print(c, c[0], c[-1])    #[1] (1/4 items)              1  1
c.append(2); print(c, c[0], c[-1])    #[1, 2] (2/4 items)           1  2
c.append(3); print(c, c[0], c[-1])    #[1, 2, 3] (3/4 items)        1  3
c.append(8); print(c, c[0], c[-1])    #[1, 2, 3, 8] (4/4 items)     1  8
c.append(10); print(c, c[0], c[-1])   #[2, 3, 8, 10] (4/4 items)    2  10
c.append(11); print(c, c[0], c[-1])   #[3, 8, 10, 11] (4/4 items)   3  11
d = circularlist(4, [1, 2, 3, 4, 5])  #[2, 3, 4, 5]

Class:

class circularlist(object):
    def __init__(self, size, data = []):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)
Ramage answered 24/11, 2016 at 11:1 Comment(6)
Good addition. Python lists already allow negative indices, but (-1), e.g. would not return the expected value once the circular buffer is full, since the "last" addition to the list ends up within the list.Trig
It does work @MoonCactus, see the 6 examples I gave on top of the answer; in the last ones, you can see c[-1] is always the right element. __getitem__ does it right.Ramage
oh yes, I mean mine failed, not yours, sorry :D I will make my comment clearer! -- oh I cannot, the comment is too old.Trig
nice simple solution. i added an optional argument to allow initialization of the list from existing data, it's more pythonpathetic that way.Ingest
Apart from keeping count of the number of items processed is there a way of Iterating over the member of this circular list just once? for i in c: iterates forever... Thanks – Mike T.Crackpot
There seem to be two problems with this code. First, when the list isn't full yet, the index doesn't wrap. If I do c=circularlist(5, [1,2]) I cannot do c[2], but once the list is full, it's possible to do c[5]. Second, c=circularlist(5); c.append(1) should yield the same list as c=circularlist(5, [1]), but it does not. In the former case, c.index is 1, in the latter, however, it is 0.Premolar
E
8

ok with the use of deque class, but for the requeriments of the question (average) this is my solution:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
Electrolysis answered 2/9, 2013 at 8:15 Comment(3)
I get TypeError: 'numpy.float64' object is not callable when trying to call average methodGlobigerina
Yes... in fact I guess that deque uses numpy arrays internally (after removing @property it works fine)Globigerina
I guarantee that deque does not use numpy arrays internally. collections is part of the standard library, numpy is not. Dependencies on third party libraries would make for a terrible standard library.Dusty
R
7

Python's deque is slow. You can also use numpy.roll instead How do you rotate the numbers in an numpy array of shape (n,) or (n,1)?

In this benchmark, deque is 448ms. Numpy.roll is 29ms http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

Ross answered 26/11, 2014 at 11:46 Comment(3)
But numpy.roll returns a copy of the array, right?Federative
This answer is very misleading - Python's deque appears to be quite fast, but converting from and to numpy arrays slows it down considerably in the benchmarks you link to.Immigrant
-1 As can be seen in the source of numpy.roll, it calculates slices for the first and second half of the old array, creates a new array (with empty_like) and then copies the swapped halves to the new one. This is O(n), since you always copy the whole array.Mesial
G
4

You can also see this quite old Python recipe.

Here is my own version with NumPy array:

#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        """initialization"""
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)
        self._data.fill(default_value)

        self.size = 0

    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1

        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
        return(self._data)

    def get_partial(self):
        return(self.get_all()[0:self.size])

    def __getitem__(self, key):
        """get element"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '\t' + str(self.size)
        s = s + '\t' + self.get_all()[::-1].__repr__()
        s = s + '\t' + self.get_partial()[::-1].__repr__()
        return(s)

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value
Globigerina answered 3/10, 2013 at 10:47 Comment(1)
+1 for using numpy, but -1 for not implementing a circular buffer. The way you implemented it, you are shifting all data every time you add a single element, this costs O(n) time. To implement a proper circular buffer, you should have both an index and a size variable, and you need to correctly handle the case when the data 'wraps around' the end of the buffer. When retrieving data, you might have to concatenate two sections at the start and end of the buffer.Mesial
S
4

How about the solution from the Python Cookbook, including a reclassification of the ring buffer instance when it becomes full?

class RingBuffer:
    """ class that implements a not-yet-full buffer """
    def __init__(self,size_max):
        self.max = size_max
        self.data = []

    class __Full:
        """ class that implements a full buffer """
        def append(self, x):
            """ Append an element overwriting the oldest one. """
            self.data[self.cur] = x
            self.cur = (self.cur+1) % self.max
        def get(self):
            """ return list of elements in correct order """
            return self.data[self.cur:]+self.data[:self.cur]

    def append(self,x):
        """append an element at the end of the buffer"""
        self.data.append(x)
        if len(self.data) == self.max:
            self.cur = 0
            # Permanently change self's class from non-full to full
            self.__class__ = self.__Full

    def get(self):
        """ Return a list of elements from the oldest to the newest. """
        return self.data

# sample usage
if __name__=='__main__':
    x=RingBuffer(5)
    x.append(1); x.append(2); x.append(3); x.append(4)
    print(x.__class__, x.get())
    x.append(5)
    print(x.__class__, x.get())
    x.append(6)
    print(x.data, x.get())
    x.append(7); x.append(8); x.append(9); x.append(10)
    print(x.data, x.get())

The notable design choice in the implementation is that, since these objects undergo a nonreversible state transition at some point in their lifetimes—from non-full buffer to full-buffer (and behavior changes at that point)—I modeled that by changing self.__class__. This works even in Python 2.2, as long as both classes have the same slots (for example, it works fine for two classic classes, such as RingBuffer and __Full in this recipe).

Changing the class of an instance may be strange in many languages, but it is a Pythonic alternative to other ways of representing occasional, massive, irreversible, and discrete changes of state that vastly affect behavior, as in this recipe. Good thing that Python supports it for all kinds of classes.

Credit: Sébastien Keim

Suffolk answered 9/2, 2019 at 3:55 Comment(6)
I did some speed tests of this vs deque. This is about 7 times slower than deque.Windcheater
@Windcheater awesome, you should let the author know!Suffolk
what would be the point of that? It's an old published document. The point of my comment is to let others know that this answer is out of date and to use deque instead.Windcheater
@Windcheater it was probably still slower when he published it; instructions for contacting the author are in the intro to the book. I'm just relating one, possible alternative. Also, "If only speed were the best metric; alas it may only be a good one."Suffolk
@Suffolk deque also has a .clear() function. Which this does not (and from what I can tell, cannot)Archoplasm
@Archoplasm this is an instructive example from a cookbook, meant to be simplistic and intended for people to start with making their own (hence, "cookbook"). Deque is awesome, I'm glad you're enjoying it.Suffolk
S
3

I've had this problem before doing serial programming. At the time just over a year ago, I couldn't find any efficient implementations either, so I ended up writing one as a C extension and it's also available on pypi under an MIT license. It's super basic, only handles buffers of 8-bit signed chars, but is of flexible length, so you can use Struct or something on top of it if you need something other than chars. I see now with a google search that there are several options these days though, so you might want to look at those too.

Shawn answered 7/12, 2016 at 20:30 Comment(0)
P
3

From Github:

class CircularBuffer:

    def __init__(self, size):
        """Store buffer in given storage."""
        self.buffer = [None]*size
        self.low = 0
        self.high = 0
        self.size = size
        self.count = 0

    def isEmpty(self):
        """Determines if buffer is empty."""
        return self.count == 0

    def isFull(self):
        """Determines if buffer is full."""
        return self.count == self.size

    def __len__(self):
        """Returns number of elements in buffer."""
        return self.count

    def add(self, value):
        """Adds value to buffer, overwrite as needed."""
        if self.isFull():
            self.low = (self.low+1) % self.size
        else:
            self.count += 1
        self.buffer[self.high] = value
        self.high = (self.high + 1) % self.size

    def remove(self):
        """Removes oldest value from non-empty buffer."""
        if self.count == 0:
            raise Exception ("Circular Buffer is empty");
        value = self.buffer[self.low]
        self.low = (self.low + 1) % self.size
        self.count -= 1
        return value

    def __iter__(self):
        """Return elements in the circular buffer in order using iterator."""
        idx = self.low
        num = self.count
        while num > 0:
            yield self.buffer[idx]
            idx = (idx + 1) % self.size
            num -= 1

    def __repr__(self):
        """String representation of circular buffer."""
        if self.isEmpty():
            return 'cb:[]'

        return 'cb:[' + ','.join(map(str,self)) + ']'

https://github.com/heineman/python-data-structures/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py

Pertinacity answered 22/5, 2019 at 16:11 Comment(0)
T
3

Lots of answers here but none subclass the Numpy ndarray as suggested by D Left Adjoint to U. This avoids using np.roll which does not scale efficiently, and passes on all the advantages of Numpy arrays like array slicing. Using Numpy arrays will allow for most analyses you need to run, including averaging.

RingArray class

My solution subclasses np.ndarray using the guidelines written in the Numpy documentation.

The RingArray is initialised with a specified shape, and filled with np.nan values.

Itertools cycle is used to create a one dimensional cycle that gives the next row position to edit in the array. This is based on the height of the array during initialisation.

An append method is added to the ndarray methods to write data over the next position in the cycle.

class RingArray(np.ndarray):
    """A modified numpy array type that functions like a stack. 
    RingArray has a set size specified during initialisation. 
    Add new data using the append() method, which will replace the 
    next value in a cyclical fashion. The array itself has all the 
    properties of a numpy array e.g. it can be sliced and accessed as 
    normal. Initially fills the array with np.nan values.
    
    Options
    --------
    shape : tuple
        A tuple of (height, width) for the maximum size of the array.

    Attributes
    ----------
    Inherited from nd.array. Initially fills array with np.nan values.
    
    Methods
    --------
    append(data)
        Add/replace data in the next element of the cycle.
        Data should be the length of the RingArray width.
    
    """    
    def __new__(subtype, shape):
        obj = super().__new__(subtype, shape)
        
        obj = np.vectorize(lambda x: np.nan)(obj)
        
        obj._pointer = cycle(np.arange(0, shape[0]))
        
        return obj
    
    # needed by numpy
    def __array_finalize__(self, obj):
         if obj is None: return
        
    # add data to the next element (looped)
    def append(self, data):
        """Adds or replaces data in the RingArray.
        The function writes to the next row in the Array.
        Once the last row is reached, the assignment row 
        loops back to the start.

        Parameters
        ----------
        data : array_like
            Data should be the length of the RingArray width.
        """        
        self[next(self._pointer)] = data

Performance

I believe this method scales at O(1), however I am not a computer scientist, so please correct me if I'm wrong!

Possible issues

As this is a subclass of ndarray, all the methods from that class can be used on the RingArray. Removing or adding values with array functions like np.delete, will change the shape of the array. This will cause an errors with the cycle as it is set at initialisation. For this reason be cautious when editing the array by any other method than append().

This is my first stack overflow post, if there's anything I can improve upon please let me know :).

Thrasonical answered 23/4, 2023 at 10:17 Comment(0)
T
2

This one does not require any library. It grows a list and then cycle within by index.

The footprint is very small (no library), and it runs twice as fast as dequeue at least. This is good to compute moving averages indeed, but be aware that the items are not kept sorted by age as above.

class CircularBuffer(object):
    def __init__(self, size):
        """initialization"""
        self.index= 0
        self.size= size
        self._data = []

    def record(self, value):
        """append an element"""
        if len(self._data) == self.size:
            self._data[self.index]= value
        else:
            self._data.append(value)
        self.index= (self.index + 1) % self.size

    def __getitem__(self, key):
        """get element by index like a regular array"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

    def get_all(self):
        """return a list of all the elements"""
        return(self._data)

To get the average value, e.g.:

q= CircularBuffer(1000000);
for i in range(40000):
    q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

Results in:

capacity= 1000000
stored= 40000
average= 19999

real 0m0.024s
user 0m0.020s
sys  0m0.000s

This is about 1/3 the time of the equivalent with dequeue.

Trig answered 14/3, 2014 at 20:59 Comment(4)
Shouldn't your __getitem__ be a bit more powerful: self._data[(key + self._index + 1) % self._size]?Setup
Why would you want to shift by +1 ? Now, yes, see Basj variant below for the ideaTrig
Your timing code has a flaw. You call append 40000 times after setting max_size to 1000000, so you only ever test the append method of the underlying list.Dipterocarpaceous
You are right; but I hardly see how it would become slower than dequeue when it happens (appends are slower than replacement - due to possible memory reallocation). Since my code was enhanced by @basj above (who naturally gets more credits for it), I leave it for others to spend more time on this and check this ;)Trig
O
1

I don't get the answers here. Obviously if you're working within NumPy, you'd want to subclass either array or ndarray (usually), that way (at least once your cyclic array is full) you can still use the NumPy array arithmetic operations on the cyclical array. The only thing you have to be careful of is that for operations that span multiple components (such as a moving average), you don't have your window be larger than what has accumulated in the buffer.

Also, as all the commenters mentioned, don't use rolling as that defeats the purpose of efficiency. If you need a growing array, you simply double its size each time a resize is required (this is different from a cyclical array implementation).

Outstation answered 26/2, 2022 at 18:30 Comment(0)
R
0

The original question was: "efficient" circular buffer. According to this efficiency asked for, the answer from aaronasterling seems to be definitively correct. Using a dedicated class programmed in Python and comparing time processing with collections.deque shows a x5.2 times acceleration with deque! Here is very simple code to test this:

class cb:
    def __init__(self, size):
        self.b = [0]*size
        self.i = 0
        self.sz = size
    def append(self, v):
        self.b[self.i] = v
        self.i = (self.i + 1) % self.sz

b = cb(1000)
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 1.097 second on my laptop

from collections import deque
b = deque( [], 1000 )
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 0.211 second on my laptop

To transform a deque into a list, just use:

my_list = [v for v in my_deque]

You will then get O(1) random access to the deque items. Of course, this is only valuable if you need to do many random accesses to the deque after having set it once.

Rigid answered 15/5, 2016 at 22:8 Comment(0)
D
0

This is applying the same principal to some buffers intended to hold the most recent text messages.

import time
import datetime
import sys, getopt

class textbffr(object):
    def __init__(self, size_max):
        #initialization
        self.posn_max = size_max-1
        self._data = [""]*(size_max)
        self.posn = self.posn_max

    def append(self, value):
        #append an element
        if self.posn == self.posn_max:
            self.posn = 0
            self._data[self.posn] = value   
        else:
            self.posn += 1
            self._data[self.posn] = value

    def __getitem__(self, key):
        #return stored element
        if (key + self.posn+1) > self.posn_max:
            return(self._data[key - (self.posn_max-self.posn)])
        else:
            return(self._data[key + self.posn+1])


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max):
        stored = bffr[ind]
        if stored != "":
            print(stored)
    print ( '\n' )

def make_time_text(time_value):
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
      + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
      + str(time_value.second).zfill(2))


def main(argv):
    #Set things up 
    starttime = datetime.datetime.now()
    log_max = 5
    status_max = 7
    log_bffr = textbffr(log_max)
    status_bffr = textbffr(status_max)
    scan_count = 1

    #Main Loop
    # every 10 secounds write a line with the time and the scan count.
    while True: 

        time_text = make_time_text(datetime.datetime.now())
        #create next messages and store in buffers
        status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
        log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")

        #print whole buffers so far
        print_bffr(log_bffr,log_max)
        print_bffr(status_bffr,status_max)

        time.sleep(2)
        scan_count += 1 

if __name__ == '__main__':
    main(sys.argv[1:])  
Diclinous answered 19/1, 2018 at 1:4 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.