In my current scenario speed is essential I have a map that is only being read by multiple threads and this works fine. Now a requirement came up that may require writing to the static map once in a while while the maps are being read by other threads. I believe this is a game changer since I would need to lock my maps for thread safety.This poses a problem since I have multiple threads 10-12 threads that are going to be reading the map. If one map takes a lock on the map (since its reading) I believe the lock would be necessary since something might be written to a map. Anyways as I stated earlier that if one map is reading then other maps wont have the parallel reading access to the map like they did earlier. Is there any way by which I can circumvent this issue ?
You can use a shared_mutex
beside your map to acquire shared or unique access. Generally, a write operation will require unique access, while read operations will require shared access.
Any number of threads can acquire shared access, as long as no threads are holding unique access. If a thread attempts to acquire unique access, it waits until all shared access is released.
The standard library and Boost provide shared_lock<T>
and unique_lock<T>
for scope-bounded acquisition of a shared_mutex
.
Beware that some people claim shared_mutex
performs poorly, though I haven't seen any evidence or strong analysis to support these claims. It may be worth looking into, if it matters to you.
ConcurrentModificationException
that is thrown when iterating over a Map that is being modified by another thread –
Unsex just for your c++ pleasure, read this book, you'll find WAY more worth than the money spent, your concurrency world will get open wide
C++-Concurrency in Action Practical Multithreading
the books deal with all sort of issues and practical solutions between thread's data sharing, how to wake threads, thread pools creation and more...more...and more
here an example of sharing data between threads without using atomic or shared_locks
template<class T>
class TaskQueue
{
public:
TaskQueue(){}
TaskQueue& operator = (TaskQueue&) = delete;
void Push(T value){
std::lock_guard<std::mutex> lk(mut);
data.push(value);
condition.notify_one(); //if you have many threads trying to access the data at same time, this will wake one thread only
}
void Get(T& value){
std::unique_lock<std::mutex> lk(mut);
condition.wait(lk, [this]{ return !data.empty(); }); // in this case it waits if queue is empty, if not needed you can remove this line
value = data.front();
data.pop();
lk.unlock();
}
private:
std::mutex mut;
std::queue<T> data; //in your case change this to a std::map
std::condition_variable condition;
};
One of the solution could be to keep a pointer to that map, and when you need to modify it - make a copy, modify that copy and then atomically swap the pointer to the new instance. This solution would be more memory consuming, but could be more efficient if you have many reading threads, as this method is lock free.
In the example below, only one thread can modify the map. This doesn't mean one thread at a time, it means one and same thread for the life of the data structure. Otherwise, modification would need to be done while holding a mutex that protects the entire code in updateMap
. The reader threads can access theData
as usual - without any locking.
typedef std::map<...> Data;
std::atomic<Data *> theData;
void updateMap( ... )
{
Data *newData = new Data( *theData );
// modify newData here
Data *old = theData.exchange( newData );
delete old;
}
std::atomic
will typically do a compare-and-swap of the pointer. It must be pointed out that this code only works when there's one modifying thread. This is important. –
Diba atomic_*
methods in shared_ptr
. Made my day :) thank you –
Acescent Here is my implementation of threadsafe generic resizable hashmap without using stl containers:
#pragma once
#include <iomanip>
#include <exception>
#include <mutex>
#include <condition_variable>
/*
* wrapper for items stored in the map
*/
template<typename K, typename V>
class HashItem {
public:
HashItem(K key, V value) {
this->key = key;
this->value = value;
this->nextItem = nullptr;
}
/*
* copy constructor
*/
HashItem(const HashItem & item) {
this->key = item.getKey();
this->value = item.getValue();
this->nextItem = nullptr;
}
void setNext(HashItem<K, V> * item) {
this->nextItem = item;
}
HashItem * getNext() {
return nextItem;
}
K getKey() {
return key;
}
V getValue() {
return value;
}
void setValue(V value) {
this->value = value;
}
private:
K key;
V value;
HashItem * nextItem;
};
/*
* template HashMap for storing items
* default hash function HF = std::hash<K>
*/
template <typename K, typename V, typename HF = std::hash<K>>
class HashMap {
public:
/*
* constructor
* @mSize specifies the bucket size og the map
*/
HashMap(std::size_t mSize) {
// lock initialization for single thread
std::lock_guard<std::mutex>lock(mtx);
if (mSize < 1)
throw std::exception("Number of buckets ust be greater than zero.");
mapSize = mSize;
numOfItems = 0;
// initialize
hMap = new HashItem<K, V> *[mapSize]();
}
/*
* for simplicity no copy constructor
* anyway we want test how different threads
* use same instance of the map
*/
HashMap(const HashMap & hmap) = delete;
/*
* inserts item
* replaces old value with the new one when item already exists
* @key key of the item
* @value value of the item
*/
void insert(const K & key, const V & value) {
std::lock_guard<std::mutex>lock(mtx);
insertHelper(this->hMap, this->mapSize, numOfItems, key, value);
condVar.notify_all();
}
/*
* erases item with key when siúch item exists
* @key of item to erase
*/
void erase(const K & key) {
std::lock_guard<std::mutex>lock(mtx);
// calculate the bucket where item must be inserted
std::size_t hVal = hashFunc(key) % mapSize;
HashItem<K, V> * prev = nullptr;
HashItem<K, V> * item = hMap[hVal];
while ((item != nullptr) && (item->getKey() != key)) {
prev = item;
item = item->getNext();
}
// no item found with the given key
if (item == nullptr) {
return;
}
else {
if (prev == nullptr) {
// item found is the first item in the bucket
hMap[hVal] = item->getNext();
}
else {
// item found in one of the entries in the bucket
prev->setNext(item->getNext());
}
delete item;
numOfItems--;
}
condVar.notify_all();
}
/*
* get element with the given key by reference
* @key is the key of item that has to be found
* @value is the holder where the value of item with key will be copied
*/
bool getItem(const K & key, V & value) const {
std::lock_guard<std::mutex>lock(mtx);
// calculate the bucket where item must be inserted
std::size_t hVal = hashFunc(key) % mapSize;
HashItem<K, V> * item = hMap[hVal];
while ((item != nullptr) && (item->getKey() != key))
item = item->getNext();
// item not found
if (item == nullptr) {
return false;
}
value = item->getValue();
return true;
}
/*
* get element with the given key by reference
* @key is the key of item that has to be found
* shows an example of thread waitung for some condition
* @value is the holder where the value of item with key will be copied
*/
bool getWithWait(const K & key, V & value) {
std::unique_lock<std::mutex>ulock(mtxForWait);
condVar.wait(ulock, [this] {return !this->empty(); });
// calculate the bucket where item must be inserted
std::size_t hVal = hashFunc(key) % mapSize;
HashItem<K, V> * item = hMap[hVal];
while ((item != nullptr) && (item->getKey() != key))
item = item->getNext();
// item not found
if (item == nullptr) {
return false;
}
value = item->getValue();
return true;
}
/*
* resizes the map
* creates new map on heap
* copies the elements into new map
* @newSize specifies new bucket size
*/
void resize(std::size_t newSize) {
std::lock_guard<std::mutex>lock(mtx);
if (newSize < 1)
throw std::exception("Number of buckets must be greater than zero.");
resizeHelper(newSize);
condVar.notify_all();
}
/*
* outputs all items of the map
*/
void outputMap() const {
std::lock_guard<std::mutex>lock(mtx);
if (numOfItems == 0) {
std::cout << "Map is empty." << std::endl << std::endl;
return;
}
std::cout << "Map contains " << numOfItems << " items." << std::endl;
for (std::size_t i = 0; i < mapSize; i++) {
HashItem<K, V> * item = hMap[i];
while (item != nullptr) {
std::cout << "Bucket: " << std::setw(3) << i << ", key: " << std::setw(3) << item->getKey() << ", value:" << std::setw(3) << item->getValue() << std::endl;
item = item->getNext();
}
}
std::cout << std::endl;
}
/*
* returns true when map has no items
*/
bool empty() const {
std::lock_guard<std::mutex>lock(mtx);
return numOfItems == 0;
}
void clear() {
std::lock_guard<std::mutex>lock(mtx);
deleteMap(hMap, mapSize);
numOfItems = 0;
hMap = new HashItem<K, V> *[mapSize]();
}
/*
* returns number of items stored in the map
*/
std::size_t size() const {
std::lock_guard<std::mutex>lock(mtx);
return numOfItems;
}
/*
* returns number of buckets
*/
std::size_t bucket_count() const {
std::lock_guard<std::mutex>lock(mtx);
return mapSize;
}
/*
* desctructor
*/
~HashMap() {
std::lock_guard<std::mutex>lock(mtx);
deleteMap(hMap, mapSize);
}
private:
std::size_t mapSize;
std::size_t numOfItems;
HF hashFunc;
HashItem<K, V> ** hMap;
mutable std::mutex mtx;
mutable std::mutex mtxForWait;
std::condition_variable condVar;
/*
* help method for inserting key, value item into the map hm
* mapSize specifies the size of the map, items - the number
* of stored items, will be incremented when insertion is completed
* @hm HashMap
* @mSize specifies number of buckets
* @items holds the number of items in hm, will be incremented when insertion successful
* @key - key of item to insert
* @value - value of item to insert
*/
void insertHelper(HashItem<K, V> ** hm, const std::size_t & mSize, std::size_t & items, const K & key, const V & value) {
std::size_t hVal = hashFunc(key) % mSize;
HashItem<K, V> * prev = nullptr;
HashItem<K, V> * item = hm[hVal];
while ((item != nullptr) && (item->getKey() != key)) {
prev = item;
item = item->getNext();
}
// inserting new item
if (item == nullptr) {
item = new HashItem<K, V>(key, value);
items++;
if (prev == nullptr) {
// insert new value as first item in the bucket
hm[hVal] = item;
}
else {
// append new item on previous in the same bucket
prev->setNext(item);
}
}
else {
// replace existing value
item->setValue(value);
}
}
/*
* help method to resize the map
* @newSize specifies new number of buckets
*/
void resizeHelper(std::size_t newSize) {
HashItem<K, V> ** newMap = new HashItem<K, V> *[newSize]();
std::size_t items = 0;
for (std::size_t i = 0; i < mapSize; i++) {
HashItem<K, V> * item = hMap[i];
while (item != nullptr) {
insertHelper(newMap, newSize, items, item->getKey(), item->getValue());
item = item->getNext();
}
}
deleteMap(hMap, mapSize);
hMap = newMap;
mapSize = newSize;
numOfItems = items;
newMap = nullptr;
}
/*
* help function for deleting the map hm
* @hm HashMap
* @mSize number of buckets in hm
*/
void deleteMap(HashItem<K, V> ** hm, std::size_t mSize) {
// delete all nodes
for (std::size_t i = 0; i < mSize; ++i) {
HashItem<K, V> * item = hm[i];
while (item != nullptr) {
HashItem<K, V> * prev = item;
item = item->getNext();
delete prev;
}
hm[i] = nullptr;
}
// delete the map
delete[] hm;
}
};
getWithWait
is going to work fine? Imagine one thread calls it, the other calls, say, erase
. Then different mutexes are locked, right? When called with the same key
argument a race condition occurs, isn't it the case? –
Palatalized The other two answers are quite fine, but I thought I should add a little bit of colour:
Cliff Click wrote a lock-free concurrent hash map in Java. It would be nontrivial to adapt it to C++ (no GC, different memory model, etc.) but it's the best implementation of a lock-free data structure I've ever seen. If you can use JAva instead of C++, this might be the way to go.
I'm not aware of any lock-free balanced binary tree structures. That doesn't mean they don't exist.
It's probably easiest to go with one of the two other answers (bulk copy/atomic swap/something like shared_ptr
or reader-writer locking) to control access to the map
instead. One of the two will be faster depending on the relative quantities of reads and writes and the size of the map
; you should benchmark to see which one you should use.
What you need is equivalent a ConcurrentHashMap in Java, which allows for concurrent reading and writing to the underlying hash table. This class is part of the java.util.concurrent package and provides for concurrent reading and writing (upto a concurrency level, defaults to 16).
You can find more information in the javadoc. I am quoting the javadoc here:
A hash table supporting full concurrency of retrievals and adjustable expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access. This class is fully interoperable with Hashtable in programs that rely on its thread safety but not on its synchronization details.
© 2022 - 2024 — McMap. All rights reserved.