Simple Java name based locks?

2019-01-02 20:40发布

MySQL has a handy function:

SELECT GET_LOCK("SomeName")

This can be used to create simple, but very specific, name based locks for an application. However, it requires a database connection.

I have many situations like:

someMethod() {
    // do stuff to user A for their data for feature X
}

It doesn't make sense to simply synchronize this method, because, for example, if this method is called for user B in the meantime, user B does not need to wait for user A to finish before it starts, only operations for the user A and feature X combination need to wait.

With the MySql lock I could do something like:

someMethod() {
    executeQuery("SELECT GET_LOCK('userA-featureX')")
    // only locked for user A for their data for feature X
    executeQuery("SELECT RELEASE_LOCK('userA-featureX')")
}

Since Java locking is based on objects, it seems like I would need to create a new object to represent the situation for this lock and then put it in a static cache somewhere so all the threads can see it. Subsequent requests to lock for that situation would then locate the lock object in the cache and acquire its lock. I tried to create something like this, but then the lock cache itself needs synchronization. Also, it is difficult to detect when a lock object is no longer being used so that it can be removed from the cache.

I have looked at the Java concurrent packages, but nothing stands out as being able to handle something like this. Is there an easy way to implement this, or am I looking at this from the wrong perspective?

Edit:

To clarify, I am not looking to create a predefined pool of locks ahead of time, I would like to create them on demand. Some pseudo code for what I am thinking is:

LockManager.acquireLock(String name) {
    Lock lock;  

    synchronized (map) {
        lock = map.get(name);

        // doesn't exist yet - create and store
        if(lock == null) {
            lock = new Lock();
            map.put(name, lock);
        }
    }

    lock.lock();
}

LockManager.releaseLock(String name) {
    // unlock
    // if this was the last hold on the lock, remove it from the cache
}

标签: java locking
22条回答
君临天下
2楼-- · 2019-01-02 20:47

TreeMap because in HashMap size of inner array can only increase

public class Locker<T> {
    private final Object lock = new Object();
    private final Map<T, Value> map = new TreeMap<T, Value>();

    public Value<T> lock(T id) {
        Value r;
        synchronized (lock) {
            if (!map.containsKey(id)) {
                Value value = new Value();
                value.id = id;
                value.count = 0;
                value.lock = new ReentrantLock();
                map.put(id, value);
            }
            r = map.get(id);
            r.count++;
        }
        r.lock.lock();
        return r;
    }

    public void unlock(Value<T> r) {
        r.lock.unlock();
        synchronized (lock) {
            r.count--;
            if (r.count == 0)
                map.remove(r.id);
        }
    }

    public static class Value<T> {

        private Lock lock;
        private long count;
        private T id;
    }
}
查看更多
梦该遗忘
3楼-- · 2019-01-02 20:50

I've created a tokenProvider based on the IdMutexProvider of McDowell. The manager uses a WeakHashMap which takes care of cleaning up unused locks.

TokenManager:

/**
 * Token provider used to get a {@link Mutex} object which is used to get exclusive access to a given TOKEN.
 * Because WeakHashMap is internally used, Mutex administration is automatically cleaned up when
 * the Mutex is no longer is use by any thread.
 *
 * <pre>
 * Usage:
 * private final TokenMutexProvider&lt;String&gt; myTokenProvider = new TokenMutexProvider&lt;String&gt;();
 *
 * Mutex mutex = myTokenProvider.getMutex("123456");
 * synchronized (mutex) {
 *  // your code here
 * }
 * </pre>
 *
 * Class inspired by McDowell.
 * url: http://illegalargumentexception.blogspot.nl/2008/04/java-synchronizing-on-transient-id.html
 *
 * @param <TOKEN> type of token. It is important that the equals method of that Object return true
 * for objects of different instances but with the same 'identity'. (see {@link WeakHashMap}).<br>
 * E.g.
 * <pre>
 *  String key1 = "1";
 *  String key1b = new String("1");
 *  key1.equals(key1b) == true;
 *
 *  or
 *  Integer key1 = 1;
 *  Integer key1b = new Integer(1);
 *  key1.equals(key1b) == true;
 * </pre>
 */
public class TokenMutexProvider<TOKEN> {

    private final Map<Mutex, WeakReference<Mutex>> mutexMap = new WeakHashMap<Mutex, WeakReference<Mutex>>();

    /**
     * Get a {@link Mutex} for the given (non-null) token.
     */
    public Mutex getMutex(TOKEN token) {
        if (token==null) {
            throw new NullPointerException();
        }

        Mutex key = new MutexImpl(token);
        synchronized (mutexMap) {
            WeakReference<Mutex> ref = mutexMap.get(key);
            if (ref==null) {
                mutexMap.put(key, new WeakReference<Mutex>(key));
                return key;
            }
            Mutex mutex = ref.get();
            if (mutex==null) {
                mutexMap.put(key, new WeakReference<Mutex>(key));
                return key;
            }
            return mutex;
        }
    }

    public int size() {
        synchronized (mutexMap) {
            return mutexMap.size();
        }
    }

    /**
     * Mutex for acquiring exclusive access to a token.
     */
    public static interface Mutex {}

    private class MutexImpl implements Mutex {
        private final TOKEN token;

        protected MutexImpl(TOKEN token) {
            this.token = token;
        }

        @Override
        public boolean equals(Object other) {
            if (other==null) {
                return false;
            }
            if (getClass()==other.getClass()) {
                TOKEN otherToken = ((MutexImpl)other).token;
                return token.equals(otherToken);
            }
            return false;
        }

        @Override
        public int hashCode() {
            return token.hashCode();
        }
    }
}

Usage:

private final TokenMutexManager<String> myTokenManager = new TokenMutexManager<String>();

Mutex mutex = myTokenManager.getMutex("UUID_123456");
synchronized(mutex) {
    // your code here
}

or rather use Integers?

private final TokenMutexManager<Integer> myTokenManager = new TokenMutexManager<Integer>();

Mutex mutex = myTokenManager.getMutex(123456);
synchronized(mutex) {
    // your code here
}
查看更多
梦该遗忘
4楼-- · 2019-01-02 20:51

A generic solution using java.util.concurrent

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

public class LockByName<L> {

    ConcurrentHashMap<String, L> mapStringLock;

    public LockByName(){
        mapStringLock = new ConcurrentHashMap<String, L>();
    }

    public LockByName(ConcurrentHashMap<String, L> mapStringLock){
        this.mapStringLock = mapStringLock;
    }

    @SuppressWarnings("unchecked")
    public L getLock(String key) {
        L initValue = (L) createIntanceLock();
        L lock = mapStringLock.putIfAbsent(key, initValue);
        if (lock == null) {
            lock = initValue;
        }
        return lock;
    }

    protected Object createIntanceLock() {
        return new ReentrantLock();
    }

    public static void main(String[] args) {

        LockByName<ReentrantLock> reentrantLocker = new LockByName<ReentrantLock>();

        ReentrantLock reentrantLock1 = reentrantLocker.getLock("pepe");

        try {
            reentrantLock1.lock();
            //DO WORK

        }finally{
            reentrantLock1.unlock();

        }


    }

}
查看更多
美炸的是我
5楼-- · 2019-01-02 20:51

(4 years later...) My answer is similar to user2878608's but I think there are some missing edge cases in that logic. I also thought Semaphore was for locking multiple resources at once (though I suppose using it for counting lockers like that is fine too), so I used a generic POJO lock object instead. I ran one test on it which demonstrated each of the edge cases existed IMO and will be using it on my project at work. Hope it helps someone. :)

class Lock
{
    int c;  // count threads that require this lock so you don't release and acquire needlessly
}

ConcurrentHashMap<SomeKey, Lock> map = new ConcurrentHashMap<SomeKey, Lock>();

LockManager.acquireLock(String name) {
    Lock lock = new Lock();  // creating a new one pre-emptively or checking for null first depends on which scenario is more common in your use case
    lock.c = 0;

    while( true )
    {
        Lock prevLock = map.putIfAbsent(name, lock);
        if( prevLock != null )
            lock = prevLock;

        synchronized (lock)
        {
            Lock newLock = map.get(name);
            if( newLock == null )
                continue;  // handles the edge case where the lock got removed while someone was still waiting on it
            if( lock != newLock )
            {
                lock = newLock;  // re-use the latest lock
                continue;  // handles the edge case where a new lock was acquired and the critical section was entered immediately after releasing the lock but before the current locker entered the sync block
            }

            // if we already have a lock
            if( lock.c > 0 )
            {
                // increase the count of threads that need an offline director lock
                ++lock.c;
                return true;  // success
            }
            else
            {
                // safely acquire lock for user
                try
                {
                    perNameLockCollection.add(name);  // could be a ConcurrentHashMap or other synchronized set, or even an external global cluster lock
                    // success
                    lock.c = 1;
                    return true;
                }
                catch( Exception e )
                {
                    // failed to acquire
                    lock.c = 0;  // this must be set in case any concurrent threads are waiting
                    map.remove(name);  // NOTE: this must be the last critical thing that happens in the sync block!
                }
            }
        }
    }
}

LockManager.releaseLock(String name) {
    // unlock
    // if this was the last hold on the lock, remove it from the cache

    Lock lock = null;  // creating a new one pre-emptively or checking for null first depends on which scenario is more common in your use case

    while( true )
    {
        lock = map.get(name);
        if( lock == null )
        {
            // SHOULD never happen
            log.Error("found missing lock! perhaps a releaseLock call without corresponding acquireLock call?! name:"+name);
            lock = new Lock();
            lock.c = 1;
            Lock prevLock = map.putIfAbsent(name, lock);
            if( prevLock != null )
                lock = prevLock;
        }

        synchronized (lock)
        {
            Lock newLock = map.get(name);
            if( newLock == null )
                continue;  // handles the edge case where the lock got removed while someone was still waiting on it
            if( lock != newLock )
            {
                lock = newLock;  // re-use the latest lock
                continue;  // handles the edge case where a new lock was acquired and the critical section was entered immediately after releasing the lock but before the current locker entered the sync block
            }

            // if we are not the last locker
            if( lock.c > 1 )
            {
                // decrease the count of threads that need an offline director lock
                --lock.c;
                return true;  // success
            }
            else
            {
                // safely release lock for user
                try
                {
                    perNameLockCollection.remove(name);  // could be a ConcurrentHashMap or other synchronized set, or even an external global cluster lock
                    // success
                    lock.c = 0;  // this must be set in case any concurrent threads are waiting
                    map.remove(name);  // NOTE: this must be the last critical thing that happens in the sync block!
                    return true;
                }
                catch( Exception e )
                {
                    // failed to release
                    log.Error("unable to release lock! name:"+name);
                    lock.c = 1;
                    return false;
                }
            }
        }
    }

}
查看更多
美炸的是我
6楼-- · 2019-01-02 20:52

Maybe something like that:

public class ReentrantNamedLock {

private class RefCounterLock {

    public int counter;
    public ReentrantLock sem;

    public RefCounterLock() {
        counter = 0;
        sem = new ReentrantLock();
    }
}
private final ReentrantLock _lock = new ReentrantLock();
private final HashMap<String, RefCounterLock> _cache = new HashMap<String, RefCounterLock>();

public void lock(String key) {
    _lock.lock();
    RefCounterLock cur = null;
    try {
        if (!_cache.containsKey(key)) {
            cur = new RefCounterLock();
            _cache.put(key, cur);
        } else {
            cur = _cache.get(key);
        }
        cur.counter++;
    } finally {
        _lock.unlock();
    }
    cur.sem.lock();
}

public void unlock(String key) {
    _lock.lock();
    try {
        if (_cache.containsKey(key)) {
            RefCounterLock cur = _cache.get(key);
            cur.counter--;
            cur.sem.unlock();
            if (cur.counter == 0) { //last reference
                _cache.remove(key);
            }
            cur = null;
        }
    } finally {
        _lock.unlock();
    }
}}

I didn't test it though.

查看更多
闭嘴吧你
7楼-- · 2019-01-02 20:52

Many implementations but non similar to mine.

Called my Dynamic lock implementation as ProcessDynamicKeyLock because it's a single process lock, for any object as key (equals+hashcode for uniqueness).

TODO: Add a way to provide the actual lock, for example, ReentrantReadWriteLock instead of ReentrantLock.

Implementation:

public class ProcessDynamicKeyLock<T> implements Lock
{
    private final static ConcurrentHashMap<Object, LockAndCounter> locksMap = new ConcurrentHashMap<>();

    private final T key;

    public ProcessDynamicKeyLock(T lockKey)
    {
        this.key = lockKey;
    }

    private static class LockAndCounter
    {
        private final Lock lock = new ReentrantLock();
        private final AtomicInteger counter = new AtomicInteger(0);
    }

    private LockAndCounter getLock()
    {
        return locksMap.compute(key, (key, lockAndCounterInner) ->
        {
            if (lockAndCounterInner == null) {
                lockAndCounterInner = new LockAndCounter();
            }
            lockAndCounterInner.counter.incrementAndGet();
            return lockAndCounterInner;
        });
    }

    private void cleanupLock(LockAndCounter lockAndCounterOuter)
    {
        if (lockAndCounterOuter.counter.decrementAndGet() == 0)
        {
            locksMap.compute(key, (key, lockAndCounterInner) ->
            {
                if (lockAndCounterInner == null || lockAndCounterInner.counter.get() == 0) {
                    return null;
                }
                return lockAndCounterInner;
            });
        }
    }

    @Override
    public void lock()
    {
        LockAndCounter lockAndCounter = getLock();

        lockAndCounter.lock.lock();
    }

    @Override
    public void unlock()
    {
        LockAndCounter lockAndCounter = locksMap.get(key);
        lockAndCounter.lock.unlock();

        cleanupLock(lockAndCounter);
    }


    @Override
    public void lockInterruptibly() throws InterruptedException
    {
        LockAndCounter lockAndCounter = getLock();

        try
        {
            lockAndCounter.lock.lockInterruptibly();
        }
        catch (InterruptedException e)
        {
            cleanupLock(lockAndCounter);
            throw e;
        }
    }

    @Override
    public boolean tryLock()
    {
        LockAndCounter lockAndCounter = getLock();

        boolean acquired = lockAndCounter.lock.tryLock();

        if (!acquired)
        {
            cleanupLock(lockAndCounter);
        }

        return acquired;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
    {
        LockAndCounter lockAndCounter = getLock();

        boolean acquired;
        try
        {
            acquired = lockAndCounter.lock.tryLock(time, unit);
        }
        catch (InterruptedException e)
        {
            cleanupLock(lockAndCounter);
            throw e;
        }

        if (!acquired)
        {
            cleanupLock(lockAndCounter);
        }

        return acquired;
    }

    @Override
    public Condition newCondition()
    {
        LockAndCounter lockAndCounter = locksMap.get(key);

        return lockAndCounter.lock.newCondition();
    }
}

Simple test:

public class ProcessDynamicKeyLockTest
{
    @Test
    public void testDifferentKeysDontLock() throws InterruptedException
    {
        ProcessDynamicKeyLock<Object> lock = new ProcessDynamicKeyLock<>(new Object());
        lock.lock();
        AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false);
        try
        {
            new Thread(() ->
            {
                ProcessDynamicKeyLock<Object> anotherLock = new ProcessDynamicKeyLock<>(new Object());
                anotherLock.lock();
                try
                {
                    anotherThreadWasExecuted.set(true);
                }
                finally
                {
                    anotherLock.unlock();
                }
            }).start();
            Thread.sleep(100);
        }
        finally
        {
            Assert.assertTrue(anotherThreadWasExecuted.get());
            lock.unlock();
        }
    }

    @Test
    public void testSameKeysLock() throws InterruptedException
    {
        Object key = new Object();
        ProcessDynamicKeyLock<Object> lock = new ProcessDynamicKeyLock<>(key);
        lock.lock();
        AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false);
        try
        {
            new Thread(() ->
            {
                ProcessDynamicKeyLock<Object> anotherLock = new ProcessDynamicKeyLock<>(key);
                anotherLock.lock();
                try
                {
                    anotherThreadWasExecuted.set(true);
                }
                finally
                {
                    anotherLock.unlock();
                }
            }).start();
            Thread.sleep(100);
        }
        finally
        {
            Assert.assertFalse(anotherThreadWasExecuted.get());
            lock.unlock();
        }
    }
}
查看更多
登录 后发表回答