How to acquire a lock by a key
Asked Answered
P

7

39

What is the best way to prevent concurrent update of one record in a key-value set without locking the entire set? Semantically, I'm looking for some kind of locking by a key (ideally, Java implementation, but not necessarily):

interface LockByKey {
   void lock(String key); // acquire an exclusive lock for a key   
   void unlock(String key); // release lock for a key
}

This lock is intended to synchronize an access to a remote store, so some synchronized Java collection is not an option.

Potherb answered 20/6, 2012 at 17:2 Comment(3)
This has a lot to do with how you synchronize access to your remote store. Not sure if this question can be answered without knowing more about how you manage concurrency remotely.Subcartilaginous
Have a look at https://mcmap.net/q/282624/-java-synchronizing-based-on-a-parameter-named-mutex-lock.Tacnaarica
You could use: https://mcmap.net/q/280666/-simple-java-name-based-locksCoronal
U
54

Guava has something like this being released in 13.0; you can get it out of HEAD if you like.

Striped<Lock> more or less allocates a specific number of locks, and then assigns strings to locks based on their hash code. The API looks more or less like

Striped<Lock> locks = Striped.lock(stripes);
Lock l = locks.get(string);
l.lock();
try {
  // do stuff 
} finally {
  l.unlock();
}

More or less, the controllable number of stripes lets you trade concurrency against memory usage, because allocating a full lock for each string key can get expensive; essentially, you only get lock contention when you get hash collisions, which are (predictably) rare.

(Disclosure: I contribute to Guava.)

Ursulina answered 20/6, 2012 at 18:11 Comment(4)
does the memory associated with old locks get cleaned up? like they would if kept in a WeakHashMap.Staphylo
It does if you use the factory method to generate weak locks.Ursulina
@Jose Martinez: no, it does not clean anything, it does not have to, it operates on a different principle. Take for example Striped.lock(1024), that creates simple Lock[1024] array, eagerly initialized with 1024 pregenerated Lock objects; see Striped.CompactStriped. You can have application with billions of unique IDs but your lock pool stays at 1024 always the same Locks. Striped operates on statistically VERY low probability of 2, or more, IDs generating same hash trying to access mutex at the same time.Gooden
@Gooden the locks are weak, if you call Striped.lazyWeakLock instead of Striped.lock.Saretta
E
7

I've written a class that can lock on any key dynamically. It uses a static CuncurrentHashMap. But if no lock is used, the map is empty. The syntax can be confusing as a new object us created based on the key. It cleans up the lock, if not used, on unlock. There's a guarantee that any two DynamicKeyLock that were created based on two equal/hascode keys, they'll be mutually locked.

See implementation for Java 8, Java 6 and a small test.

Java 8:

public class DynamicKeyLock<T> implements Lock
{
    private final static ConcurrentHashMap<Object, LockAndCounter> locksMap = new ConcurrentHashMap<>();

    private final T key;

    public DynamicKeyLock(T lockKey)
    {
        this.key = lockKey;
    }

    private static class LockAndCounter
    {
        private final Lock lock = new ReentrantLock();
        private final AtomicInteger counter = new AtomicInteger(0);
    }

    private LockAndCounter getLock()
    {
        return locksMap.compute(key, (key, lockAndCounterInner) ->
        {
            if (lockAndCounterInner == null) {
                lockAndCounterInner = new LockAndCounter();
            }
            lockAndCounterInner.counter.incrementAndGet();
            return lockAndCounterInner;
        });
    }

    private void cleanupLock(LockAndCounter lockAndCounterOuter)
    {
        if (lockAndCounterOuter.counter.decrementAndGet() == 0)
        {
            locksMap.compute(key, (key, lockAndCounterInner) ->
            {
                if (lockAndCounterInner == null || lockAndCounterInner.counter.get() == 0) {
                    return null;
                }
                return lockAndCounterInner;
            });
        }
    }

    @Override
    public void lock()
    {
        LockAndCounter lockAndCounter = getLock();

        lockAndCounter.lock.lock();
    }

    @Override
    public void unlock()
    {
        LockAndCounter lockAndCounter = locksMap.get(key);
        lockAndCounter.lock.unlock();

        cleanupLock(lockAndCounter);
    }


    @Override
    public void lockInterruptibly() throws InterruptedException
    {
        LockAndCounter lockAndCounter = getLock();

        try
        {
            lockAndCounter.lock.lockInterruptibly();
        }
        catch (InterruptedException e)
        {
            cleanupLock(lockAndCounter);
            throw e;
        }
    }

    @Override
    public boolean tryLock()
    {
        LockAndCounter lockAndCounter = getLock();

        boolean acquired = lockAndCounter.lock.tryLock();

        if (!acquired)
        {
            cleanupLock(lockAndCounter);
        }

        return acquired;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
    {
        LockAndCounter lockAndCounter = getLock();

        boolean acquired;
        try
        {
            acquired = lockAndCounter.lock.tryLock(time, unit);
        }
        catch (InterruptedException e)
        {
            cleanupLock(lockAndCounter);
            throw e;
        }

        if (!acquired)
        {
            cleanupLock(lockAndCounter);
        }

        return acquired;
    }

    @Override
    public Condition newCondition()
    {
        LockAndCounter lockAndCounter = locksMap.get(key);

        return lockAndCounter.lock.newCondition();
    }
}

Java 6:

public class DynamicKeyLock<T> implements Lock
{
    private final static ConcurrentHashMap<Object, LockAndCounter> locksMap = new ConcurrentHashMap<Object, LockAndCounter>();
    private final T key;

    public DynamicKeyLock(T lockKey) {
        this.key = lockKey;
    }

    private static class LockAndCounter {
        private final Lock lock = new ReentrantLock();
        private final AtomicInteger counter = new AtomicInteger(0);
    }

    private LockAndCounter getLock()
    {
        while (true) // Try to init lock
        {
            LockAndCounter lockAndCounter = locksMap.get(key);

            if (lockAndCounter == null)
            {
                LockAndCounter newLock = new LockAndCounter();
                lockAndCounter = locksMap.putIfAbsent(key, newLock);

                if (lockAndCounter == null)
                {
                    lockAndCounter = newLock;
                }
            }

            lockAndCounter.counter.incrementAndGet();

            synchronized (lockAndCounter)
            {
                LockAndCounter lastLockAndCounter = locksMap.get(key);
                if (lockAndCounter == lastLockAndCounter)
                {
                    return lockAndCounter;
                }
                // else some other thread beat us to it, thus try again.
            }
        }
    }

    private void cleanupLock(LockAndCounter lockAndCounter)
    {
        if (lockAndCounter.counter.decrementAndGet() == 0)
        {
            synchronized (lockAndCounter)
            {
                if (lockAndCounter.counter.get() == 0)
                {
                    locksMap.remove(key);
                }
            }
        }
    }

    @Override
    public void lock()
    {
        LockAndCounter lockAndCounter = getLock();

        lockAndCounter.lock.lock();
    }

    @Override
    public void unlock()
    {
        LockAndCounter lockAndCounter = locksMap.get(key);
        lockAndCounter.lock.unlock();

        cleanupLock(lockAndCounter);
    }


    @Override
    public void lockInterruptibly() throws InterruptedException
    {
        LockAndCounter lockAndCounter = getLock();

        try
        {
            lockAndCounter.lock.lockInterruptibly();
        }
        catch (InterruptedException e)
        {
            cleanupLock(lockAndCounter);
            throw e;
        }
    }

    @Override
    public boolean tryLock()
    {
        LockAndCounter lockAndCounter = getLock();

        boolean acquired = lockAndCounter.lock.tryLock();

        if (!acquired)
        {
            cleanupLock(lockAndCounter);
        }

        return acquired;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
    {
        LockAndCounter lockAndCounter = getLock();

        boolean acquired;
        try
        {
            acquired = lockAndCounter.lock.tryLock(time, unit);
        }
        catch (InterruptedException e)
        {
            cleanupLock(lockAndCounter);
            throw e;
        }

        if (!acquired)
        {
            cleanupLock(lockAndCounter);
        }

        return acquired;
    }

    @Override
    public Condition newCondition()
    {
        LockAndCounter lockAndCounter = locksMap.get(key);

        return lockAndCounter.lock.newCondition();
    }
}

Test:

public class DynamicKeyLockTest
{
    @Test
    public void testDifferentKeysDontLock() throws InterruptedException
    {
        DynamicKeyLock<Object> lock = new DynamicKeyLock<>(new Object());
        lock.lock();
        AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false);
        try
        {
            new Thread(() ->
            {
                DynamicKeyLock<Object> anotherLock = new DynamicKeyLock<>(new Object());
                anotherLock.lock();
                try
                {
                    anotherThreadWasExecuted.set(true);
                }
                finally
                {
                    anotherLock.unlock();
                }
            }).start();
            Thread.sleep(100);
        }
        finally
        {
            Assert.assertTrue(anotherThreadWasExecuted.get());
            lock.unlock();
        }
    }

    @Test
    public void testSameKeysLock() throws InterruptedException
    {
        Object key = new Object();
        DynamicKeyLock<Object> lock = new DynamicKeyLock<>(key);
        lock.lock();
        AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false);
        try
        {
            new Thread(() ->
            {
                DynamicKeyLock<Object> anotherLock = new DynamicKeyLock<>(key);
                anotherLock.lock();
                try
                {
                    anotherThreadWasExecuted.set(true);
                }
                finally
                {
                    anotherLock.unlock();
                }
            }).start();
            Thread.sleep(100);
        }
        finally
        {
            Assert.assertFalse(anotherThreadWasExecuted.get());
            lock.unlock();
        }
    }
}
Esparza answered 27/5, 2018 at 7:30 Comment(3)
Hi. I couldn't find your contact info other than Google Hangouts, so could you please contact me via email ( filip(dot)malczak(at)gmail.com )? What you've written here is really reusable, and I'd like to publish it as maven module to JFrog OSS Artifactory, but I don't want to sign my name under your work. If you'd just create a github repo with basic gradle and license setup, I'd take care of pull request introducing deployment. Hope to hear from you.Keilakeily
bitbucket.org/kilaka/kuava/commits/tag/1.0.0 - MIT license. Enjoy :) @FilipMalczakEsparza
jcenter.bintray.com/kilaka/kilaka/kuava/1.0.0 and dl.bintray.com/kilaka/kilaka/kilaka/kilaka/kuava/1.0.0Esparza
N
7
private static final Set<String> lockedKeys = new HashSet<>();

private void lock(String key) throws InterruptedException {
    synchronized (lockedKeys) {
        while (!lockedKeys.add(key)) {
            lockedKeys.wait();
        }
    }
}

private void unlock(String key) {
    synchronized (lockedKeys) {
        lockedKeys.remove(key);
        lockedKeys.notifyAll();
    }
}

public void doSynchronously(String key) throws InterruptedException {
    try {
        lock(key);

        //Do what you need with your key.
        //For different keys this part is executed in parallel.
        //For equal keys this part is executed synchronously.

    } finally {
        unlock(key);
    }
}

try-finally - is very important - you must guarantee to unlock waiting threads after your operation even if your operation threw exception.

Nawab answered 12/2, 2019 at 19:2 Comment(2)
Doesn't the sync block of unlock make it wait for the lock block, which in turn waits to acquire a lock? I might be wrong, but to me this looks like a possible deadlock?Archaeology
@Archaeology deadlock is not possible here. I'll try to explain. Yes, you are right that lock and unlock can happen simultaneously and can wait for each other because they are synchronized on the same object lockedKeys. But this waiting will be very short because adding and removing elements in the hash set are very lightweight and quick operations. You should also strictly understand how wait/notify/notifyAll operations work. Wait/notify/notifyAll can only be called inside synchronized and when wait is called then it releases the synchronized context in which it was called.Nawab
R
0

If the "record" you mention is a mutable object and "update" means that the object's internal state is modified without disturbing the structure of the container, then you can accomplish what you want just by locking the record object.

If however "update" means removing the record object from the container and replacing it, then you must lock then entire container to prevent other threads from seeing it in an inconsistent state.

In either case, you should be looking at the classes in the java.util.concurrent package.

Regardant answered 20/6, 2012 at 17:21 Comment(0)
W
0

a simplest impl :

public final class ReentrantKeyLock<K> {
    private final ConcurrentHashMap<K, LockWrapper> locks = new ConcurrentHashMap<>();

    public void lock(@NotNull K key) {
        locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.incrementLock()).lock.lock();
    }

    public void unlock(@NotNull K key) {
        locks.computeIfPresent(key, (k, v) -> v.isFreeAfterUnlock() ? null : v);
    }

    private static final class LockWrapper {
        private final ReentrantLock lock = new ReentrantLock();
        private int lockCount = 1;

        private LockWrapper incrementLock() {
            lockCount++;
            return this;
        }

        private boolean isFreeAfterUnlock() {
            lock.unlock();
            return --lockCount == 0;
        }
    }
}
Welton answered 17/1, 2024 at 2:45 Comment(0)
B
-1

Keep a mutex/lock per bucket. This will ensure that only collisions wait on that mutex.

Burchfield answered 20/6, 2012 at 17:4 Comment(0)
R
-1

This is how; i did it. And yes I agree if two different strings shares the same hashcode will end up with acquiring the same lock.

class LockByKey {
    ObjectForString objHolder = new ObjectForString(100);
    public void lockThenWorkForKey (String key) {
        synchronized(objHolder.valueOf(key)){
            //DoSomeWork
        }
    }
}

public final class ObjectForString {

    private final Object[] cache;
    private final int cacheSize;
    final int mask;

    public ObjectForString(int size) {
        // Find power-of-two sizes best matching arguments
        int ssize = 1;
        while (ssize < size) {
            ssize <<= 1;
        }

        mask = ssize - 1;
        cache = new Object[ssize];
        cacheSize = ssize;
        //build the Cache
        for (int i = 0; i < cacheSize; i++) {
            this.cache[i] = new Object();
        }
    }

    public Object valueOf(String key) {
        int index = key.hashCode();
        return cache[index & mask];
    }
}
Ricciardi answered 17/3, 2016 at 8:17 Comment(2)
not only string with the same hash code ... with your code and size 100 "0", "°" and so on shares the lockPimp
even with size 10000 (which means you alloc 10kb for nothing) you will have you have collision too oftenPimp

© 2022 - 2025 — McMap. All rights reserved.