Business logic imp in LMAX architecture in python
Asked Answered
J

0

2

I am trying to implement disruptor in Lmax architecture . As you know in lmax architecture we have a ring buffer for creating a queue to process data . Here you can see the structure of that : enter image description here

I have implemented this structure in python as you can see here :

    import multiprocessing




class CircularBuffer(object):

    def __init__(self, max_size=10):
        """Initialize the CircularBuffer with a max_size if set, otherwise
        max_size will elementsdefault to 10"""
        self.buffer = [None] * max_size
        self.blconsumer = 0
        self.receiver = 0
        self.journalerPointer=0
        self.replicatorPointer=0
        self.unmarshallerPointer=0
        self.max_size = max_size

    def __str__(self):
        """Return a formatted string representation of this CircularBuffer."""
        items = ['{!r}'.format(item) for item in self.buffer]
        return '[' + ', '.join(items) + ']'

    def size(self):
        """Return the size of the CircularBuffer
        Runtime: O(1) Space: O(1)"""
        if self.receiver >= self.blconsumer:
            return self.receiver - self.blconsumer
        return self.max_size - self.head - self.receiver


    def is_empty(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == self.blconsumer

    def is_replicator_after_receiver(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.replicatorPointer-1) % self.max_size

    def is_journaler_after_receiver(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.journalerPointer-1) % self.max_size
    
    def is_unmarshaller_after_receiver(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.unmarshallerPointer-1) % self.max_size

    def is_BusinessLogicConsumer_after_unmarshaller(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.unmarshallerPointer == (self.blconsumer-1) % self.max_size    

    def is_full(self):
        """Return True if the tail of the CircularBuffer is one before the head,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.blconsumer-1) % self.max_size

    def receive(self, item):
        """Insert an item at the back of the CircularBuffer
        Runtime: O(1) Space: O(1)"""
        if self.is_full()==False :
            self.buffer[self.receiver] = item
            self.receiver = (self.receiver + 1) % self.max_size


    def front(self):
        """Return the item at the front of the CircularBuffer
        Runtime: O(1) Space: O(1)"""
        return self.buffer[self.blconsumer]

    def consume(self):
        """Return the item at the front of the Circular Buffer and remove it
        Runtime: O(1) Space: O(1)"""
        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_BusinessLogicConsumer_after_unmarshaller()==True :
        #     raise IndexError("BusinessLogicConsumer can't be after receiver")
        if self.is_BusinessLogicConsumer_after_unmarshaller()==False and  self.is_empty()==False:

            item = self.buffer[self.blconsumer]
            self.buffer[self.blconsumer] = None
            self.blconsumer = (self.blconsumer + 1) % self.max_size
            return item

    def replicator(self):

        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_replicator_after_receiver()==True :
        #     raise IndexError("replicator can't be after receiver")
        if self.is_replicator_after_receiver()==False and  self.is_empty()==False:
            item = self.buffer[self.replicatorPointer]
            self.replicatorPointer = (self.replicatorPointer + 1) % self.max_size
            return item  

    def journaler(self):
     
        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_journaler_after_receiver()==True :
        #     raise IndexError("journaler can't be after receiver")
        if self.is_journaler_after_receiver()==False and  self.is_empty()==False:

            item = self.buffer[self.journalerPointer]
            self.journalerPointer = (self.journalerPointer + 1) % self.max_size
            return item         

    def unmarshaller(self):
     
        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_unmarshaller_after_receiver()==True :
        #     raise IndexError("unmarshaller can't be after receiver")
        if self.is_unmarshaller_after_receiver()==False and  self.is_empty()==False:

            item = self.buffer[self.journalerPointer]
            self.unmarshallerPointer = (self.unmarshallerPointer + 1) % self.max_size
            return item   
    

As you can see in the picture we have a business logic part in Lmax that fetches data from Ring Buffer into CPU for fast processing . Unfortunately I couldn't find any document to implement Business logic layer . How can I fetch data from ring buffer into cpu register in Lmax with python ?

Jacal answered 4/10, 2021 at 9:53 Comment(5)
The Business Logic Consumer must ensure that it does not go past the Replicator, Unmarshaller or Journaller yet I am not sure you are checking all three. In fact, you have in method consume: if self.is_BusinessLogicConsumer_before_unmarshaller()==True : raise IndexError("BusinessLogicConsumer can't be after receiver"). I don't understand the is_BusinessLogicConsumer_before_unmarshaller method; it seems to be poorly named because the Business Logic Consumer must always be before the Unmarshaller so if this method returns True it should not be the cause for raising an exception.Lucia
And you ask, "How can I fetch data from ring buffer into cpu register in Lmax with python ?" I am not sure what you mean by "fetch data from ring buffer into cpu register." And finally, if you have the Receiver, Unmarshaller, Journaller and Business Logic Consumer implemented as threads running against an instance of CircularBuffer you will have performance issues due to contention for the Global Interpreter Lock (you need to google Python multithreading and the Global Interpreter Lock or GIL). (more ...)Lucia
@Lucia Sorry for my fault. i updated my postJacal
And the only way I can see easily having processes (which exist in different address spaces) work against such an instance) is to have CircularBuffer be implemented as a managed class that is served in the same way as, for example, a multiprocessing.Manager().dict(). But in this case each method invocation becomes more like a remote procedure call via a socket. In other words, it is no longer high-performance.Lucia
It was the phrase 'into cpu register" that I didn't understand. The other thing is when, for example, the Business Logic Processor calls consume and is returned None is it supposed to just loop burning CPU cycles until it is returned an item, which hurts everyone's performance? I don't think so. I believe LMAX implements its own event handling and consume should block until it is able to advance.Lucia

© 2022 - 2024 — McMap. All rights reserved.