I am using Redis with StackExchange.Redis. I have multiple threads that will at some point access and edit the value of the same key, so I need to synchronize the manipulation of the data.
Looking at the available functions, I see that there are two functions, TakeLock and ReleaseLock. However, these functions take both a key and a value parameter rather than the expected single key to be locked. The intellisene documentation and source on GitHub don't explain how to use the LockTake and LockRelease functions or what to pass in for the key and value parameters.
Q: What is the correct usage of LockTake and LockRelease in StackExchange.Redis?
Pseudocode example of what I'm aiming to do:
//Add Items Before Parallel Execution
redis.StringSet("myJSONKey", myJSON);
//Parallel Execution
Parallel.For(0, 100, i =>
{
//Some work here
//....
//Lock
redis.LockTake("myJSONKey");
//Manipulate
var myJSONObject = redis.StringGet("myJSONKey");
myJSONObject.Total++;
Console.WriteLine(myJSONObject.Total);
redis.StringSet("myJSONKey", myNewJSON);
//Unlock
redis.LockRelease("myJSONKey");
//More work here
//...
});
There are 3 parts to a lock:
- the key (the unique name of the lock in the database)
- the value (a caller-defined token which can be used both to indicate who "owns" the lock, and to check that releasing and extending the lock is being done correctly)
- the duration (a lock intentionally is a finite duration thing)
If no other value comes to mind, a guid might make a suitable "value". We tend to use the machine-name (or a munged version of the machine name if multiple processes could be competing on the same machine).
Also, note that taking a lock is speculative, not blocking. It is entirely possible that you fail to obtain the lock, and hence you may need to test for this and perhaps add some retry logic.
A typical example might be:
RedisValue token = Environment.MachineName;
if(db.LockTake(key, token, duration)) {
try {
// you have the lock do work
} finally {
db.LockRelease(key, token);
}
}
Note that if the work is lengthy (a loop, in particular), you may want to add some occasional LockExtend
calls in the middle - again remembering to check for success (in case it timed out).
Note also that all individual redis commands are atomic, so you don't need to worry about two discreet operations competing. For more complexing multi-operation units, transactions and scripting are options.
There is my part of code for lock->get->modify(if required)->unlock actions with comments.
public static T GetCachedAndModifyWithLock<T>(string key, Func<T> retrieveDataFunc, TimeSpan timeExpiration, Func<T, bool> modifyEntityFunc,
TimeSpan? lockTimeout = null, bool isSlidingExpiration=false) where T : class
{
int lockCounter = 0;//for logging in case when too many locks per key
Exception logException = null;
var cache = Connection.GetDatabase();
var lockToken = Guid.NewGuid().ToString(); //unique token for current part of code
var lockName = key + "_lock"; //unique lock name. key-relative.
T tResult = null;
while ( lockCounter < 20)
{
//check for access to cache object, trying to lock it
if (!cache.LockTake(lockName, lockToken, lockTimeout ?? TimeSpan.FromSeconds(10)))
{
lockCounter++;
Thread.Sleep(100); //sleep for 100 milliseconds for next lock try. you can play with that
continue;
}
try
{
RedisValue result = RedisValue.Null;
if (isSlidingExpiration)
{
//in case of sliding expiration - get object with expiry time
var exp = cache.StringGetWithExpiry(key);
//check ttl.
if (exp.Expiry.HasValue && exp.Expiry.Value.TotalSeconds >= 0)
{
//get only if not expired
result = exp.Value;
}
}
else //in absolute expiration case simply get
{
result = cache.StringGet(key);
}
//"REDIS_NULL" is for cases when our retrieveDataFunc function returning null (we cannot store null in redis, but can store pre-defined string :) )
if (result.HasValue && result == "REDIS_NULL") return null;
//in case when cache is epmty
if (!result.HasValue)
{
//retrieving data from caller function (from db from example)
tResult = retrieveDataFunc();
if (tResult != null)
{
//trying to modify that entity. if caller modifyEntityFunc returns true, it means that caller wants to resave modified entity.
if (modifyEntityFunc(tResult))
{
//json serialization
var json = JsonConvert.SerializeObject(tResult);
cache.StringSet(key, json, timeExpiration);
}
}
else
{
//save pre-defined string in case if source-value is null.
cache.StringSet(key, "REDIS_NULL", timeExpiration);
}
}
else
{
//retrieve from cache and serialize to required object
tResult = JsonConvert.DeserializeObject<T>(result);
//trying to modify
if (modifyEntityFunc(tResult))
{
//and save if required
var json = JsonConvert.SerializeObject(tResult);
cache.StringSet(key, json, timeExpiration);
}
}
//refresh exiration in case of sliding expiration flag
if(isSlidingExpiration)
cache.KeyExpire(key, timeExpiration);
}
catch (Exception ex)
{
logException = ex;
}
finally
{
cache.LockRelease(lockName, lockToken);
}
break;
}
if (lockCounter >= 20 || logException!=null)
{
//log it
}
return tResult;
}
and usage :
public class User
{
public int ViewCount { get; set; }
}
var cachedAndModifiedItem = GetCachedAndModifyWithLock<User>( "MyAwesomeKey", () =>
{
//return from db or kind of that
return new User() { ViewCount = 0 };
}, TimeSpan.FromMinutes(10), user=>
{
if (user.ViewCount< 3)
{
user.ViewCount++;
return true; //save it to cache
}
return false; //do not update it in cache
}, TimeSpan.FromSeconds(10),true);
That code can be improved (for example, you can add transactions for less count call to cache and etc), but i glad it will be helpfull for you.