Creating a standard map that is thread safe
Asked Answered
I

6

23

In my current scenario speed is essential I have a map that is only being read by multiple threads and this works fine. Now a requirement came up that may require writing to the static map once in a while while the maps are being read by other threads. I believe this is a game changer since I would need to lock my maps for thread safety.This poses a problem since I have multiple threads 10-12 threads that are going to be reading the map. If one map takes a lock on the map (since its reading) I believe the lock would be necessary since something might be written to a map. Anyways as I stated earlier that if one map is reading then other maps wont have the parallel reading access to the map like they did earlier. Is there any way by which I can circumvent this issue ?

Injudicious answered 2/10, 2013 at 17:37 Comment(3)
You should search for multiple-readers single-writer locks.Unwholesome
Any suggestions ? Does boost have them ?Injudicious
You can find questions on this site where people are using Boost for that.Unwholesome
C
19

You can use a shared_mutex beside your map to acquire shared or unique access. Generally, a write operation will require unique access, while read operations will require shared access.

Any number of threads can acquire shared access, as long as no threads are holding unique access. If a thread attempts to acquire unique access, it waits until all shared access is released.

The standard library and Boost provide shared_lock<T> and unique_lock<T> for scope-bounded acquisition of a shared_mutex.

Beware that some people claim shared_mutex performs poorly, though I haven't seen any evidence or strong analysis to support these claims. It may be worth looking into, if it matters to you.

Corotto answered 2/10, 2013 at 17:43 Comment(2)
Marked as answer after timer. ThanksInjudicious
Watch for iterators ... a thread might have gotten hold of an unprotected iterator of that map pointing to memory locations that can potentially be modified (as result of adding, or erasing) from seemengly safe mutex-protected areas but overall it would not be thread-safe. This is why in Java this use-case is covered with the dreaded ConcurrentModificationException that is thrown when iterating over a Map that is being modified by another threadUnsex
M
10

just for your c++ pleasure, read this book, you'll find WAY more worth than the money spent, your concurrency world will get open wide
C++-Concurrency in Action Practical Multithreading
the books deal with all sort of issues and practical solutions between thread's data sharing, how to wake threads, thread pools creation and more...more...and more

here an example of sharing data between threads without using atomic or shared_locks

template<class T>
class TaskQueue
{
public:
    TaskQueue(){}
    TaskQueue& operator = (TaskQueue&) = delete;

    void Push(T value){
        std::lock_guard<std::mutex> lk(mut);
        data.push(value);
        condition.notify_one(); //if you have many threads trying to access the data at same time, this will wake one thread only
    }

    void Get(T& value){
        std::unique_lock<std::mutex> lk(mut);
        condition.wait(lk, [this]{ return !data.empty(); }); // in this case it waits if queue is empty, if not needed  you can remove this line
        value = data.front();
        data.pop();
        lk.unlock();
    }

private:
    std::mutex mut;
    std::queue<T> data; //in your case change this to a std::map
    std::condition_variable condition;
};
Moire answered 2/10, 2013 at 17:50 Comment(1)
Ill definitely have a look at thisInjudicious
C
8

One of the solution could be to keep a pointer to that map, and when you need to modify it - make a copy, modify that copy and then atomically swap the pointer to the new instance. This solution would be more memory consuming, but could be more efficient if you have many reading threads, as this method is lock free.

In the example below, only one thread can modify the map. This doesn't mean one thread at a time, it means one and same thread for the life of the data structure. Otherwise, modification would need to be done while holding a mutex that protects the entire code in updateMap. The reader threads can access theData as usual - without any locking.

typedef std::map<...> Data;

std::atomic<Data *> theData;

void updateMap( ... )
{
   Data *newData = new Data( *theData );
   // modify newData here
   Data *old = theData.exchange( newData );
   delete old;
}
Cavein answered 2/10, 2013 at 17:48 Comment(16)
Then I would need to manage swapping and when that should take place. This would require a lot of management and testingInjudicious
@Injudicious no, if you use std::atomic or something similar that would be pretty simple. Put example to my answer.Cavein
It does double read lookup time though, from 1 pointer dereference to 2.Welltimed
@Welltimed explain pleaseCavein
Yes, it is "lock free" by duplicating the data, but that doesn't mean that it's lock free on real hardware. The highly parallel performance of this code would be abysmal because every write access would cause the entire cache of this data to be lost in all reader threads. The cache costs of one write are thus magnified by the number of readers. It's not a good solution unless the data structure is "small".Diba
@OllieB: In practice such cost is immaterial.Diba
@KubaOber please read question carefully and notice that map is going to be changed pretty seldom. So price of mising cache in writing will be minimal vs price of locking mutex every time a thread needs to read data.Cavein
@Slave: True, but modern mutexes are optimized for non-contended case, it's merely a CAS then. It's still more expensive than no CAS, of course. Yes, I did miss the seldomness of changes. Your solution is probably the best in such case.Diba
@KubaOber in case of more often changes it is not clear when read/write mutex will become more effective, I forgot to mention that all reading threads are blocked during map modification, which could even more significant when regular map is rebalanced or hash map rehashed. So probably only testing would answer to that.Cavein
@Slava: One can simply use lock-free concurrent data structures in such cases. They are designed to behave sanely, for example hash maps can do cooperative resizing by all accessing, etc.Diba
@Slava: Note that the reading threads are likely not be blocked here. std::atomic will typically do a compare-and-swap of the pointer. It must be pointed out that this code only works when there's one modifying thread. This is important.Diba
I don't get how this is supposed to work. A thread could be reading the current map while the writer frees it.Acescent
@AurélienVallée you are right, the map has to be stored as shared_ptr or have a usage counter (ie shared_ptr implemented manually).Cavein
@Cavein Using a shared_ptr is not a solution here, since you have no way to atomically both retrieve the pointer behind the atomic, and the object behind the shared_ptr. That is, the object could now be destroyed in between the load() and the reference counting.Acescent
@AurélienVallée you do not need to retrieve object behing shared_ptr atomically, you only need to swap shared_ptr atomically. According to this en.cppreference.com/w/cpp/memory/shared_ptr/atomic it is possible. Or you can implement that manually.Cavein
@Cavein Oooh i didn't know of these special atomic_* methods in shared_ptr. Made my day :) thank youAcescent
W
4

Here is my implementation of threadsafe generic resizable hashmap without using stl containers:

#pragma once

#include <iomanip>
#include <exception>
#include <mutex>
#include <condition_variable>

/*
*  wrapper for items stored in the map
*/
template<typename K, typename V>
class HashItem {
public:
    HashItem(K key, V value) {
        this->key = key;
        this->value = value;
        this->nextItem = nullptr;
    }

    /*
    * copy constructor
    */
    HashItem(const HashItem & item) {
        this->key = item.getKey();
        this->value = item.getValue();
        this->nextItem = nullptr;
    }

    void setNext(HashItem<K, V> * item) {
        this->nextItem = item;
    }

    HashItem * getNext() {
        return nextItem;
    }

    K getKey() {
        return key;
    }

    V getValue() {
        return value;
    }

    void setValue(V value) {
        this->value = value;
    }

private:
    K key;
    V value;
    HashItem * nextItem;

};

/*
* template HashMap for storing items
* default hash function HF = std::hash<K>
*/
template <typename K, typename V, typename HF = std::hash<K>>
class HashMap {
public:
    /*
    * constructor
    * @mSize specifies the bucket size og the map
    */
    HashMap(std::size_t mSize) {
        // lock initialization for single thread
        std::lock_guard<std::mutex>lock(mtx);
        if (mSize < 1)
            throw std::exception("Number of buckets ust be greater than zero.");

        mapSize = mSize;
        numOfItems = 0;
        // initialize
        hMap = new HashItem<K, V> *[mapSize]();
    }

    /*
    * for simplicity no copy constructor
    * anyway we want test how different threads 
    * use same instance of the map
    */
    HashMap(const HashMap & hmap) = delete;

    /*
    * inserts item
    * replaces old value with the new one when item already exists
    * @key key of the item
    * @value value of the item
    */
    void insert(const K & key, const V & value) {
        std::lock_guard<std::mutex>lock(mtx);
        insertHelper(this->hMap, this->mapSize, numOfItems, key, value);
        condVar.notify_all();
    }

    /*
    * erases item with key when siúch item exists
    * @key of item to erase
    */
    void erase(const K & key) {
        std::lock_guard<std::mutex>lock(mtx);
        // calculate the bucket where item must be inserted
        std::size_t hVal = hashFunc(key) % mapSize;
        HashItem<K, V> * prev = nullptr;
        HashItem<K, V> * item = hMap[hVal];

        while ((item != nullptr) && (item->getKey() != key)) {
            prev = item;
            item = item->getNext();
        }
        // no item found with the given key
        if (item == nullptr) {
            return;
        }
        else {
            if (prev == nullptr) {
                // item found is the first item in the bucket
                hMap[hVal] = item->getNext();
            }
            else {
                // item found in one of the entries in the bucket
                prev->setNext(item->getNext());
            }
            delete item;
            numOfItems--;
        }
        condVar.notify_all();
    }

    /*
    * get element with the given key by reference
    * @key is the key of item that has to be found
    * @value is the holder where the value of item with key will be copied
    */
    bool getItem(const K & key, V & value) const {
        std::lock_guard<std::mutex>lock(mtx);
        // calculate the bucket where item must be inserted
        std::size_t hVal = hashFunc(key) % mapSize;
        HashItem<K, V> * item = hMap[hVal];

        while ((item != nullptr) && (item->getKey() != key))
            item = item->getNext();
        // item not found
        if (item == nullptr) {
            return false;
        }

        value = item->getValue();
        return true;
    }


    /*
    * get element with the given key by reference
    * @key is the key of item that has to be found
    * shows an example of thread waitung for some condition
    * @value is the holder where the value of item with key will be copied
    */
    bool getWithWait(const K & key, V & value) {
        std::unique_lock<std::mutex>ulock(mtxForWait);
        condVar.wait(ulock, [this] {return !this->empty(); });
        // calculate the bucket where item must be inserted
        std::size_t hVal = hashFunc(key) % mapSize;
        HashItem<K, V> * item = hMap[hVal];

        while ((item != nullptr) && (item->getKey() != key))
            item = item->getNext();
        // item not found
        if (item == nullptr) {
            return false;
        }

        value = item->getValue();
        return true;
    }


    /*
    * resizes the map
    * creates new map on heap
    * copies the elements into new map
    * @newSize specifies new bucket size
    */
    void resize(std::size_t newSize) {
        std::lock_guard<std::mutex>lock(mtx);
        if (newSize < 1)
            throw std::exception("Number of buckets must be greater than zero.");

        resizeHelper(newSize);
        condVar.notify_all();
    }

    /*
    * outputs all items of the map
    */
    void outputMap() const {
        std::lock_guard<std::mutex>lock(mtx);
        if (numOfItems == 0) {
            std::cout << "Map is empty." << std::endl << std::endl;
            return;
        }
        std::cout << "Map contains " << numOfItems << " items." << std::endl;
        for (std::size_t i = 0; i < mapSize; i++) {
            HashItem<K, V> * item = hMap[i];
            while (item != nullptr) {
                std::cout << "Bucket: " << std::setw(3) << i << ", key: " << std::setw(3) << item->getKey() << ", value:" << std::setw(3) << item->getValue() << std::endl;
                item = item->getNext();
            }
        }
        std::cout << std::endl;
    }

    /*
    * returns true when map has no items
    */
    bool empty() const {
        std::lock_guard<std::mutex>lock(mtx);
        return numOfItems == 0;
    }

    void clear() {
        std::lock_guard<std::mutex>lock(mtx);
        deleteMap(hMap, mapSize);
        numOfItems = 0;
        hMap = new HashItem<K, V> *[mapSize]();
    }

    /*
    * returns number of items stored in the map
    */
    std::size_t size() const {
        std::lock_guard<std::mutex>lock(mtx);
        return numOfItems;
    }

    /*
    * returns number of buckets
    */
    std::size_t bucket_count() const {
        std::lock_guard<std::mutex>lock(mtx);
        return mapSize;
    }

    /*
    * desctructor
    */
    ~HashMap() {
        std::lock_guard<std::mutex>lock(mtx);
        deleteMap(hMap, mapSize);
    }

private:
    std::size_t mapSize;
    std::size_t numOfItems;
    HF hashFunc;
    HashItem<K, V> ** hMap;
    mutable std::mutex mtx;
    mutable std::mutex mtxForWait;
    std::condition_variable condVar;

    /*
    * help method for inserting key, value item into the map hm
    * mapSize specifies the size of the map, items - the number
    * of stored items, will be incremented when insertion is completed
    * @hm HashMap
    * @mSize specifies number of buckets
    * @items holds the number of items in hm, will be incremented when insertion successful
    * @key - key of item to insert
    * @value - value of item to insert
    */
    void insertHelper(HashItem<K, V> ** hm, const std::size_t & mSize, std::size_t & items, const K & key, const V & value) {
        std::size_t hVal = hashFunc(key) % mSize;
        HashItem<K, V> * prev = nullptr;
        HashItem<K, V> * item = hm[hVal];

        while ((item != nullptr) && (item->getKey() != key)) {
            prev = item;
            item = item->getNext();
        }

        // inserting new item
        if (item == nullptr) {
            item = new HashItem<K, V>(key, value);
            items++;
            if (prev == nullptr) {
                // insert new value as first item in the bucket
                hm[hVal] = item;
            }
            else {
                // append new item on previous in the same bucket
                prev->setNext(item);
            }
        }
        else {
            // replace existing value
            item->setValue(value);
        }
    }

    /*
    * help method to resize the map
    * @newSize specifies new number of buckets
    */
    void resizeHelper(std::size_t newSize) {
        HashItem<K, V> ** newMap = new HashItem<K, V> *[newSize]();
        std::size_t items = 0;
        for (std::size_t i = 0; i < mapSize; i++) {
            HashItem<K, V> * item = hMap[i];
            while (item != nullptr) {
                insertHelper(newMap, newSize, items, item->getKey(), item->getValue());
                item = item->getNext();
            }
        }

        deleteMap(hMap, mapSize);
        hMap = newMap;
        mapSize = newSize;
        numOfItems = items;
        newMap = nullptr;

    }

    /*
    * help function for deleting the map hm
    * @hm HashMap
    * @mSize number of buckets in hm
    */
    void deleteMap(HashItem<K, V> ** hm, std::size_t mSize) {
        // delete all nodes
        for (std::size_t i = 0; i < mSize; ++i) {
            HashItem<K, V> * item = hm[i];
            while (item != nullptr) {
                HashItem<K, V> * prev = item;
                item = item->getNext();
                delete prev;
            }
            hm[i] = nullptr;
        }
        // delete the map
        delete[] hm;
    }
};
Wollastonite answered 15/8, 2016 at 15:45 Comment(1)
Sorry for reviving a rather old thread... but are you sure the getWithWait is going to work fine? Imagine one thread calls it, the other calls, say, erase. Then different mutexes are locked, right? When called with the same key argument a race condition occurs, isn't it the case?Palatalized
P
0

The other two answers are quite fine, but I thought I should add a little bit of colour:

Cliff Click wrote a lock-free concurrent hash map in Java. It would be nontrivial to adapt it to C++ (no GC, different memory model, etc.) but it's the best implementation of a lock-free data structure I've ever seen. If you can use JAva instead of C++, this might be the way to go.

I'm not aware of any lock-free balanced binary tree structures. That doesn't mean they don't exist.

It's probably easiest to go with one of the two other answers (bulk copy/atomic swap/something like shared_ptr or reader-writer locking) to control access to the map instead. One of the two will be faster depending on the relative quantities of reads and writes and the size of the map; you should benchmark to see which one you should use.

Panay answered 2/10, 2013 at 18:3 Comment(2)
This implementation is actually easy to translate to C++, but there's really no need to. Intel's TBB works great, and provides a couple other lock-free data structures as well. I haven't checked, though, whether Intel's implementation does cooperative resizing like Cliff Click's implementation. Now if only someone had a concurrent min-max heap...Diba
The link seems to be broken, here is a working link : web.stanford.edu/class/ee380/Abstracts/070221_LockFreeHash.pdfExclusion
W
-3

What you need is equivalent a ConcurrentHashMap in Java, which allows for concurrent reading and writing to the underlying hash table. This class is part of the java.util.concurrent package and provides for concurrent reading and writing (upto a concurrency level, defaults to 16).

You can find more information in the javadoc. I am quoting the javadoc here:

A hash table supporting full concurrency of retrievals and adjustable expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access. This class is fully interoperable with Hashtable in programs that rely on its thread safety but not on its synchronization details.

Whoop answered 2/10, 2013 at 17:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.