Protocol errors, “no more data” errors, “Zero leng

2019-07-21 15:51发布

问题:

Would really help if someone tell me if there are issues with PooledRedisClientManager under high volume scenarios?

I am using a singleton client manager that gets called for GetClient() by multiple WCF threads 1000's of times in a min and each thread can read/update/insert into a Redis collection (am using redis hash collection).

Intermittently i see these errors and usually go away on Retries.

All GetClient() calls are within Using statements.

Thanks Rb

Here are the errors I see from the logs: Error 1

ServiceStack.Redis.RedisResponseException: Unknown reply on integer response:         123"Key":"7c3699524bcc457ab377ad1af17eb046","Value":"9527cb78-2e32-4695-ad33-7991f92eb3a2"}, sPort: 64493, LastCommand: HEXISTS urn:xxxxxxxxxxxxxxxx "118fdc26117244819eb712a82b8e86fd"
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.HashContainsEntry(String hashId, String key)
at ServiceStack.Redis.Generic.RedisTypedClient`1.HashContainsEntry[TKey](IRedisHash`2 hash, TKey key)
at ServiceStack.Redis.Generic.RedisClientHash`2.ContainsKey(TKey key)

Error 2
ServiceStack.Redis.RedisResponseException: No more data, sPort: 65005, LastCommand: HSET urn:xxxxxxxxxxxxxxxxx "9ced6120a876405faccf5cb043e70807" {"ID":"9ced6120a87...
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.SetEntryInHash(String hashId, String key, String value)
at ServiceStack.Redis.Generic.RedisTypedClient`1.SetEntryInHash[TKey](IRedisHash`2 hash, TKey key, T value)
at ServiceStack.Redis.Generic.RedisClientHash`2.set_Item(TKey key, T value)

Error 3

ServiceStack.Redis.RedisResponseException: Protocol error: expected '$', got ' ', sPort: 64993, LastCommand: HGET urn:xxxxxxxxxxxxxxxxxxxx "705befa18af74f61aafff50b4282de19"
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ParseSingleLine(String r)
at ServiceStack.Redis.Generic.RedisTypedClient`1.GetValueFromHash[TKey](IRedisHash`2 hash, TKey key)
at ServiceStack.Redis.Generic.RedisClientHash`2.get_Item(TKey key)

Error 4

ServiceStack.Redis.RedisResponseException: Protocol error: invalid multibulk length, sPort: 65154, LastCommand: HSET urn:xxxxxxxxxxxxxx "39a5023eee374b28acbe5f63561c6211" {"ID":"39a5023eee3...
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.SetEntryInHash(String hashId, String key, String value)
at ServiceStack.Redis.Generic.RedisTypedClient`1.SetEntryInHash[TKey](IRedisHash`2 hash, TKey key, T value)
at ServiceStack.Redis.Generic.RedisClientHash`2.set_Item(TKey key, T value)

Code:

Basically I created a wrapper RedisCacheCollection around RedisHash...this is to support existing code that was using .net Lists and Dictionaries.

   public class RedisCachedCollection<TKey, TValue> : CacheCollectionBase<TKey, TValue>, IEnumerable<TValue>
  {
    private string _collectionKey;
    private string _collectionLock;
    private IRedisTypedClient<TValue> _redisTypedClient = null;
    private int _locktimeout;
    private Func<TValue, TKey> _idAction;

    public RedisCachedCollection(string collectionKey, int locktimeoutsecs = 5)
    {
        _collectionKey = string.Format("urn:{0}:{1}", "XXXXX", collectionKey);
        _collectionLock = string.Format("{0}+lock", _collectionKey);
        _locktimeout = locktimeoutsecs;
    }


    private IRedisHash<TKey, TValue> GetCollection(IRedisClient redis)
    {
        _redisTypedClient = redis.As<TValue>();
        return _redisTypedClient.GetHash<TKey>(_collectionKey);
    }
    public override void Add(TValue obj)
    {
        TKey Id = GetUniqueIdAction(obj);

        RetryAction((redis) =>
        {
            GetCollection(redis).Add(Id, obj);
        });
    }

    public override bool Remove(TValue obj)
    {
        TKey Id = GetUniqueIdAction(obj);
        TKey defaultv = default(TKey);

        return RetryAction<bool>((redis) =>
        {
            if (!Id.Equals(defaultv))
            {
                {
                    return GetCollection(redis).Remove(Id);
                }
            }
            return false;
        });

    }

    public override TValue this[TKey id]
    {
        get
        {
            return RetryAction<TValue>((redis) =>
            {
                if (GetCollection(redis).ContainsKey(id))
                    return GetCollection(redis)[id];
                return default(TValue);
            });                
        }
        set
        {
            RetryAction((redis) =>
            {
                GetCollection(redis)[id] = value;
            });                
        }
    }
    public override int Count
    {
        get
        {
            return RetryAction<int>((redis) =>
            {
                return GetCollection(redis).Count;
            });
        }
    }

    public IEnumerable<TValue> Where(Func<TValue, bool> predicate)
    {
        return RetryAction<IEnumerable<TValue>>((redis) =>
        {
            return GetCollection(redis).Values.Where(predicate);
        });
    }

    public bool Any(Func<TValue, bool> predicate)
    {
        return RetryAction<bool>((redis) =>
        {
            return GetCollection(redis).Values.Any(predicate);
        });
    }


    public override IEnumerator<TValue> GetEnumerator()
    {
        return RetryAction<IEnumerator<TValue>>((redis) =>
        {
            return GetCollection(redis).Values.GetEnumerator();
        });
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return RetryAction<System.Collections.IEnumerator>((redis) =>
        {
            return ((System.Collections.IEnumerable)GetCollection(redis).Values).GetEnumerator();
        });           

    }


    public override void Clear()
    {
        RetryAction((redis) =>
        {
            GetCollection(redis).Clear();
        });
    }

    public override bool Contains(TValue obj)
    {
        TKey Id = GetUniqueIdAction(obj);
        return RetryAction<bool>((redis) =>
        {
            return GetCollection(redis).ContainsKey(Id);
        });
    }

    public override bool ContainsKey(TKey obj)
    {
        return RetryAction<bool>((redis) =>
        {
            return GetCollection(redis).ContainsKey(obj);
        });
    }



    public override void CopyTo(TValue[] array, int arrayIndex)
    {
        RetryAction((redis) =>
        {
            GetCollection(redis).Values.CopyTo(array, arrayIndex);
        });
    }

    public override bool IsReadOnly
    {
        get 
        {
            return RetryAction<bool>((redis) =>
            {
                return GetCollection(redis).IsReadOnly;
            });            
        }
    }

    public override Func<TValue, TKey> GetUniqueIdAction
    {
        get
        {
            return _idAction;
        }
        set
        {
            _idAction = value;
        }
    }
    private object _synclock = new object();

    public override IDisposable Lock
    {
        get
        {
            lock (_synclock)
            {
                try
                {
                    return new CacheTransaction(_collectionLock, _locktimeout);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    throw;
                }
            }


        }
    }
    private Dictionary<int, IRedisClient> _redisconnectionpool = new Dictionary<int, IRedisClient>();

    public IRedisClient RedisConnection
    {
        get
        {
                return RedisClientManager.Instance.GetClient();
        }
    }
    private void RetryAction(Action<IRedisClient> action)
    {
        int i = 0;

        while (true)
        {
            try
            {
                using (var redis = RedisConnection)
                {
                    action(redis);
                    return;
                }
            }
            catch (Exception ex)
            {

                if (i++ < 3)
                {

                    continue;
                }
                throw;
            }
        }
    }

    private TOut RetryAction<TOut>(Func<IRedisClient, TOut> action)
    {
        int i = 0;

        while (true)
        {
            try
            {
                using (var redis = RedisConnection)
                {
                    TOut result = action(redis);
                    return result;
                }
            }
            catch (Exception ex)
            {

                if (i++ < 3)
                {

                    continue;
                }

                throw;
            }
        }
    }
}

}

回答1:

I've added a stress test with your HashCollection code above, using as much of it which I can get to compile and got it to run the API calls (shown in the StackTrace above) concurrently in 64 threads:

clientsManager = new PooledRedisClientManager(ipAddress);
redisCollection = new RedisCachedCollection<string, string>(
    clientsManager, "Thread: " + Thread.CurrentThread.ManagedThreadId);

var StartedAt = DateTime.UtcNow;
Interlocked.Increment(ref running);

"Starting HashCollectionStressTests with {0} threads".Print(noOfThreads);
var threads = new List<Thread>();
for (int i = 0; i < noOfThreads; i++)
{
    threads.Add(new Thread(WorkerLoop));
}
threads.ForEach(t => t.Start());

"Press Enter to Stop...".Print();
Console.ReadLine();

Interlocked.Decrement(ref running);

"Writes: {0}, Reads: {1}".Print(writeCount, readCount);
"{0} EndedAt: {1}".Print(GetType().Name, DateTime.UtcNow.ToLongTimeString());
"{0} TimeTaken: {1}s".Print(GetType().Name,(DateTime.UtcNow-StartedAt).TotalSeconds);

Here's the WorkerLoop:

public void WorkerLoop()
{
    while (Interlocked.CompareExchange(ref running, 0, 0) > 0)
    {
        redisCollection.ContainsKey("key");
        Interlocked.Increment(ref readCount);

        redisCollection["key"] = "value " + readCount;
        Interlocked.Increment(ref writeCount);

        var value = redisCollection["key"];
        Interlocked.Increment(ref readCount);

        if (value == null)
            Console.WriteLine("value == null");
    }
}

I've also modified your RetryAction API to immediately log and throw so I can detect the first exception thrown:

private void RetryAction(Action<IRedisClient> action)
{
    try
    {
        using (var redis = RedisConnection)
        {
            action(redis);
            return;
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
        throw;
    }
}

I've ran this stress test against a local and networked redis-server instance and have yet to see an Exception. The last time after letting it run for nearly 40 minutes yielded this response:

Starting HashCollectionStressTests with 64 threads
Press Enter to Stop...

Writes: 876755, Reads: 1753518
HashCollectionStressTests EndedAt: 2:10:01 AM
HashCollectionStressTests TimeTaken: 2292.985048s

Basically showing it executed 2.6M+ Hash Collection API concurrently without any Exception.

Unfortunately I can't determine what issue you're running into without being able to reproduce it. I did find it strange that you're keeping a non-thread-safe instance reference of the _redisTypedClient around:

private IRedisTypedClient<TValue> _redisTypedClient = null;

Which gets populated here:

private IRedisHash<TKey, TValue> GetCollection(IRedisClient redis)
{
    _redisTypedClient = redis.As<TValue>();
    return _redisTypedClient.GetHash<TKey>(_collectionKey);
}

Which isn't necessary as this could've been a local variable. As the code provided was incomplete (i.e. doesn't compile) I'm not sure if this instance is being used by other API calls called in multiple threads?

If you could put together a repro that shows the issue that would help in identifying the issue. A stand-alone example would also help in being able to see how the code is used.