I am trying to implement disruptor in Lmax architecture . As you know in lmax architecture we have a ring buffer for creating a queue to process data . Here you can see the structure of that :
I have implemented this structure in python as you can see here :
import multiprocessing
class CircularBuffer(object):
def __init__(self, max_size=10):
"""Initialize the CircularBuffer with a max_size if set, otherwise
max_size will elementsdefault to 10"""
self.buffer = [None] * max_size
self.blconsumer = 0
self.receiver = 0
self.journalerPointer=0
self.replicatorPointer=0
self.unmarshallerPointer=0
self.max_size = max_size
def __str__(self):
"""Return a formatted string representation of this CircularBuffer."""
items = ['{!r}'.format(item) for item in self.buffer]
return '[' + ', '.join(items) + ']'
def size(self):
"""Return the size of the CircularBuffer
Runtime: O(1) Space: O(1)"""
if self.receiver >= self.blconsumer:
return self.receiver - self.blconsumer
return self.max_size - self.head - self.receiver
def is_empty(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == self.blconsumer
def is_replicator_after_receiver(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.replicatorPointer-1) % self.max_size
def is_journaler_after_receiver(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.journalerPointer-1) % self.max_size
def is_unmarshaller_after_receiver(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.unmarshallerPointer-1) % self.max_size
def is_BusinessLogicConsumer_after_unmarshaller(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.unmarshallerPointer == (self.blconsumer-1) % self.max_size
def is_full(self):
"""Return True if the tail of the CircularBuffer is one before the head,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.blconsumer-1) % self.max_size
def receive(self, item):
"""Insert an item at the back of the CircularBuffer
Runtime: O(1) Space: O(1)"""
if self.is_full()==False :
self.buffer[self.receiver] = item
self.receiver = (self.receiver + 1) % self.max_size
def front(self):
"""Return the item at the front of the CircularBuffer
Runtime: O(1) Space: O(1)"""
return self.buffer[self.blconsumer]
def consume(self):
"""Return the item at the front of the Circular Buffer and remove it
Runtime: O(1) Space: O(1)"""
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_BusinessLogicConsumer_after_unmarshaller()==True :
# raise IndexError("BusinessLogicConsumer can't be after receiver")
if self.is_BusinessLogicConsumer_after_unmarshaller()==False and self.is_empty()==False:
item = self.buffer[self.blconsumer]
self.buffer[self.blconsumer] = None
self.blconsumer = (self.blconsumer + 1) % self.max_size
return item
def replicator(self):
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_replicator_after_receiver()==True :
# raise IndexError("replicator can't be after receiver")
if self.is_replicator_after_receiver()==False and self.is_empty()==False:
item = self.buffer[self.replicatorPointer]
self.replicatorPointer = (self.replicatorPointer + 1) % self.max_size
return item
def journaler(self):
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_journaler_after_receiver()==True :
# raise IndexError("journaler can't be after receiver")
if self.is_journaler_after_receiver()==False and self.is_empty()==False:
item = self.buffer[self.journalerPointer]
self.journalerPointer = (self.journalerPointer + 1) % self.max_size
return item
def unmarshaller(self):
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_unmarshaller_after_receiver()==True :
# raise IndexError("unmarshaller can't be after receiver")
if self.is_unmarshaller_after_receiver()==False and self.is_empty()==False:
item = self.buffer[self.journalerPointer]
self.unmarshallerPointer = (self.unmarshallerPointer + 1) % self.max_size
return item
As you can see in the picture we have a business logic part in Lmax that fetches data from Ring Buffer into CPU for fast processing . Unfortunately I couldn't find any document to implement Business logic layer . How can I fetch data from ring buffer into cpu register in Lmax with python ?
consume
:if self.is_BusinessLogicConsumer_before_unmarshaller()==True : raise IndexError("BusinessLogicConsumer can't be after receiver")
. I don't understand theis_BusinessLogicConsumer_before_unmarshaller
method; it seems to be poorly named because the Business Logic Consumer must always be before the Unmarshaller so if this method returnsTrue
it should not be the cause for raising an exception. – LuciaCircularBuffer
you will have performance issues due to contention for the Global Interpreter Lock (you need to google Python multithreading and the Global Interpreter Lock or GIL). (more ...) – LuciaCircularBuffer
be implemented as a managed class that is served in the same way as, for example, amultiprocessing.Manager().dict()
. But in this case each method invocation becomes more like a remote procedure call via a socket. In other words, it is no longer high-performance. – Luciaconsume
and is returnedNone
is it supposed to just loop burning CPU cycles until it is returned an item, which hurts everyone's performance? I don't think so. I believe LMAX implements its own event handling andconsume
should block until it is able to advance. – Lucia