avoiding collisions when collapsing infinity lock-free buffer to circular-buffer
Asked Answered
B

1

1

I'm solving two feeds arbitrate problem of FAST protocol. Please don't worry if you not familar with it, my question is pretty general actually. But i'm adding problem description for those who interested (you can skip it).


Data in all UDP Feeds are disseminated in two identical feeds (A and B) on two different multicast IPs. It is strongly recommended that client receive and process both feeds because of possible UDP packet loss. Processing two identical feeds allows one to statistically decrease the probability of packet loss. It is not specified in what particular feed (A or B) the message appears for the first time. To arbitrate these feeds one should use the message sequence number found in Preamble or in tag 34-MsgSeqNum. Utilization of the Preamble allows one to determine message sequence number without decoding of FAST message. Processing messages from feeds A and B should be performed using the following algorithm:

  1. Listen feeds A and B
  2. Process messages according to their sequence numbers.
  3. Ignore a message if one with the same sequence number was already processed before.
  4. If the gap in sequence number appears, this indicates packet loss in both feeds (A and B). Client should initiate one of the Recovery process. But first of all client should wait a reasonable time, perhaps the lost packet will come a bit later due to packet reordering. UDP protocol can’t guarantee the delivery of packets in a sequence.

    // tcp recover algorithm further


I wrote such very simple class. It preallocates all required classes and then first thread that receive particular seqNum can process it. Another thread will drop it later:

class MsgQueue
{
public:
    MsgQueue();
    ~MsgQueue(void);
    bool Lock(uint32_t msgSeqNum);
    Msg& Get(uint32_t msgSeqNum);
    void Commit(uint32_t msgSeqNum);
private:
    void Process();
    static const int QUEUE_LENGTH = 1000000;

    // 0 - available for use; 1 - processing; 2 - ready
    std::atomic<uint16_t> status[QUEUE_LENGTH];
    Msg updates[QUEUE_LENGTH];
};

Implementation:

MsgQueue::MsgQueue()
{
        memset(status, 0, sizeof(status));
}

MsgQueue::~MsgQueue(void)
{
}

// For the same msgSeqNum should return true to only one thread 
bool MsgQueue::Lock(uint32_t msgSeqNum)
{
    uint16_t expected = 0;
    return status[msgSeqNum].compare_exchange_strong(expected, 1);
}

void MsgQueue::Commit(uint32_t msgSeqNum)
{
    status[msgSeqNum] = 2;
            Process();
}

    // this method probably should be combined with "Lock" but please ignore! :)
Msg& MsgQueue::Get(uint32_t msgSeqNum)
{
    return updates[msgSeqNum];
}

void MsgQueue::Process()
{
        // ready packets must be processed, 
}

Usage:

if (!msgQueue.Lock(seq)) {
    return;
}
Msg msg = msgQueue.Get(seq);
msg.Ticker = "HP"
msg.Bid = 100;
msg.Offer = 101;
msgQueue.Commit(seq);

This works fine if we assume that QUEUE_LENGTH is infinity. Because in this case one msgSeqNum = one updates array item.

But I have to make buffer circular because it is not possible to store entire history (many millions of packets) and there are no reason to do so. Actually I need to buffer enough packets to reconstruct the session, and once session is reconstructed i can drop them.

But having circular buffer significantly complicates algorithm. For example assume that we have circular buffer of length 1000. And at the same time we try to process seqNum = 10 000 and seqNum = 11 000 (this is VERY unlikely but still possible). Both these packets will map to the array updates at index 0 and so collision occur. In such case buffer should 'drop' old packets and process new packets.

It's trivial to implement what I want using locks but writing lock-free code on circular-buffer that used from different threads is really complicated. So I welcome any suggestions and advice how to do that. Thanks!

Baez answered 24/4, 2013 at 21:23 Comment(2)
if your bufferlength is 1000, seqNum 10000 or 11000 should be out of scope.Crocket
@MareInfinitus that's why i need to use circular bufferBaez
M
0

I don't believe you can use a ring buffer. A hashed index can be used in the status[] array. Ie, hash = seq % 1000. The issue is that the sequence number is dictated by the network and you have no control over it's ordering. You wish to lock based on this sequence number. Your array doesn't need to be infinite, just the range of the sequence number; but that is probably larger than practical.

I am not sure what is happening when the sequence number is locked. Does this mean another thread is processing it? If so, you must maintain a sub-list for hash collisions to resolve the particular sequence number.

You may also consider an array size as a power of 2. For example, 1024 will allow hash = seq & 1023; which should be quite efficient.

Mccauley answered 24/4, 2013 at 21:24 Comment(4)
thanks for you answer! after all I've created TWO circular buffer. sequence number is created by stock exchange and increasing one by one. due network problem however some of them may be skipped and I have mechanism to detect and resolve such issues. so the solution was - create TWO circular buffers instead of ONE, and then add one another SPINNING thread that merges this circular buffers and process.Baez
Hmm, I would be concerned that occasionally the packets maybe re-ordered as well.Mccauley
i do restore order. that's why i have circular buffer. (in real life btw it seems they NEVER reorder in my configuration, i was testing for several days :)Baez
Re-ordering is fairly rare. It depends on where you sit in the network. I didn't think about your solution enough. The main thing is that the original problem needed to transform the seqnum. I think you have a 'job' queue (incoming seq) and a 'work' queue (packet processing) and you no longer key anything to seq. I changed my answer to 'community mode'. You can fix it or leave your own and I will delete this one.Mccauley

© 2022 - 2024 — McMap. All rights reserved.