Implementation of Concurrent Queue + map in c++
Asked Answered
D

3

7

I am not very good at data structures, so this might be very silly question. I am looking for a way to implement a hybrid behavior of queue + maps.

I am currently using tbb::concurrent_bounded_queue (documented at Intel's developer zone) from www.threadingbuildingblocks.org in a multithreaded single producer single consumer process. The queue has market data quote objects and the producer side of the process is actually highly time sensitive, so what I need is a queue that is keyed on a market data identifier such as USDCAD, EURUSD. The Value points (through unique_ptr) to most latest market data quote that I received for this key.

So, let us say my queue has 5 elements for 5 unique identifiers and suddenly we get updated market data quote for the identifier at 3rd position in the queue, then I just store the most latest value and discard the value I previously had. So, essentially I just move my unique_ptr to the new market data quote for this key.

It's like it is similar to concurrent_bounded_queue<pair<string, unique_ptr<Quote>>> but is keyed on the first element of the pair.

I am not sure if this is already available in a third-party library (may be tbb itself) or what it is called if it is a standard data structure.

I would highly appreciate any help or guidance on this.

Thanks.

Deception answered 30/7, 2014 at 1:14 Comment(1)
an illustration of the data structure would be helpful to understand the taskTrickster
I
0

First, observe that we can easily write...

int idn_to_index(idn); // map from identifier to contiguous number sequence

...it doesn't matter much if that uses a std::map or std::unordered_map, binary search in a sorted std::vector, your own character-by-character hardcoded parser....

Then the producer could:

  1. update (using a mutex) a std::vector<unique_ptr<Quote>> at [idn_to_index(idn)]

  2. post the index to concurrent_bounded_queue<int>

The consumer:

  1. pop an index

  2. compares the pointer in std::vector<unique_ptr<Quote>> at [index] to its own array of last-seen pointers, and if they differ process the quote

The idea here is not to avoid having duplicate identifier-specific indices in the queue, but to make sure that the stalest of those still triggers processing of the newest quote, and that less-stale queue entries are ignored harmlessly until the data's genuinely been updated again.

Iloilo answered 30/7, 2014 at 2:9 Comment(0)
T
0

TBB provides

  • concurrent_undordered_map: no concurrent erase, stable iterators, no element access protection;
  • concurrent_hash_map: has concurrent erase, concurrent operations invalidate iterators, per-element access management via 'accessors'

So, if the question "It's like it is similar to concurrent_bounded_queue<pair<string, unique_ptr<Quote>>> but is keyed on the first element of the pair" means suggest a corresponding concurrent associative map container, these two are at your service. Basically, you have to choose between the ability to erase identifiers concurrently (hash_map) and the ability to traverse concurrently across all the elements (unordered_map). concurrent_hash_map also simplifies synchronization of accesses to the elements which looks useful for your case.

Trickster answered 30/7, 2014 at 16:38 Comment(2)
Yeah. I was looking at concurrent_hash_map and was trying to figure out how I can communicate between the "new message received" callback and a corresponding insert into the concurrent_hash_map. The consumer side would basically get the latest Quote from the hash_map and erase it. Hence, if the speed of inserts is faster than the speed of erases I will still have the latest quote available for processing. But, this still doesn't have a queue-like behavior..right? The hard part is to integrate this hash_map with the incoming queue of Quotes. What do you think?Deception
I'm still not sure I understand what do you want to achieve, doesn't Tony answer you question (just replacing vector with hash_map..)? From what I understood I guess, you don't need to erase quotes, just update them if the time-stamp is newer than the one which was storedTrickster
D
0

I was able to solve this problem as below:

I use a queue and a hashmap both from tbb library. Now, I push my unique identifiers on the queue and not the Quote's. My hashmap has my unique identifier as key and Quote as value

So, when I receive a Quote I iterate through the queue and check whether the queue contains that identifier, if it does, then I insert the corresponding Quote directly into the hashmap and do not add the unique identifier on the queue. If it does not, then I push the identifier on the queue and corresponding Quote in hashmap. This, ensures that my queue always as unique set of identifiers and my hashmap has the most latest Quote available for that identifier.

On the consumer side, I pop the queue to get my next identifier and get the Quote for that identifier from the hashmap.

This works pretty fast. Please let me know in case I am missing any hidden issues with this.

Deception answered 4/8, 2014 at 23:12 Comment(1)
iterating through concurrent_queue in not thread-safe. if it is not an issue somehow, why to use concurrent_queue and not std::queue?Trickster

© 2022 - 2024 — McMap. All rights reserved.