可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I\'m using ConcurrentQueue
for a shared data structure which purpose is holding the last N objects passed to it (kind of history).
Assume we have a browser and we want to have the last 100 browsed Urls. I want a queue which automatically drop (dequeue) the oldest (first) entry upon new entry insertion (enqueue) when the capacity gets full (100 addresses in history).
How can I accomplish that using System.Collections
?
回答1:
I would write a wrapper class that on Enqueue would check the Count and then Dequeue when the count exceeds the limit.
public class FixedSizedQueue<T>
{
ConcurrentQueue<T> q = new ConcurrentQueue<T>();
private object lockObject = new object();
public int Limit { get; set; }
public void Enqueue(T obj)
{
q.Enqueue(obj);
lock (lockObject)
{
T overflow;
while (q.Count > Limit && q.TryDequeue(out overflow)) ;
}
}
}
回答2:
I\'d go for a slight variant... extend ConcurrentQueue so as to be able to use Linq extensions on FixedSizeQueue
public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
private readonly object syncObject = new object();
public int Size { get; private set; }
public FixedSizedQueue(int size)
{
Size = size;
}
public new void Enqueue(T obj)
{
base.Enqueue(obj);
lock (syncObject)
{
while (base.Count > Size)
{
T outObj;
base.TryDequeue(out outObj);
}
}
}
}
回答3:
For anyone who finds it useful, here is some working code based on Richard Schneider\'s answer above:
public class FixedSizedQueue<T>
{
readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
public int Size { get; private set; }
public FixedSizedQueue(int size)
{
Size = size;
}
public void Enqueue(T obj)
{
queue.Enqueue(obj);
while (queue.Count > Size)
{
T outObj;
queue.TryDequeue(out outObj);
}
}
}
回答4:
For what its worth, here\'s a lightweight circular buffer with some methods marked for safe and unsafe use.
public class CircularBuffer<T> : IEnumerable<T>
{
readonly int size;
readonly object locker;
int count;
int head;
int rear;
T[] values;
public CircularBuffer(int max)
{
this.size = max;
locker = new object();
count = 0;
head = 0;
rear = 0;
values = new T[size];
}
static int Incr(int index, int size)
{
return (index + 1) % size;
}
private void UnsafeEnsureQueueNotEmpty()
{
if (count == 0)
throw new Exception(\"Empty queue\");
}
public int Size { get { return size; } }
public object SyncRoot { get { return locker; } }
#region Count
public int Count { get { return UnsafeCount; } }
public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
public int UnsafeCount { get { return count; } }
#endregion
#region Enqueue
public void Enqueue(T obj)
{
UnsafeEnqueue(obj);
}
public void SafeEnqueue(T obj)
{
lock (locker) { UnsafeEnqueue(obj); }
}
public void UnsafeEnqueue(T obj)
{
values[rear] = obj;
if (Count == Size)
head = Incr(head, Size);
rear = Incr(rear, Size);
count = Math.Min(count + 1, Size);
}
#endregion
#region Dequeue
public T Dequeue()
{
return UnsafeDequeue();
}
public T SafeDequeue()
{
lock (locker) { return UnsafeDequeue(); }
}
public T UnsafeDequeue()
{
UnsafeEnsureQueueNotEmpty();
T res = values[head];
values[head] = default(T);
head = Incr(head, Size);
count--;
return res;
}
#endregion
#region Peek
public T Peek()
{
return UnsafePeek();
}
public T SafePeek()
{
lock (locker) { return UnsafePeek(); }
}
public T UnsafePeek()
{
UnsafeEnsureQueueNotEmpty();
return values[head];
}
#endregion
#region GetEnumerator
public IEnumerator<T> GetEnumerator()
{
return UnsafeGetEnumerator();
}
public IEnumerator<T> SafeGetEnumerator()
{
lock (locker)
{
List<T> res = new List<T>(count);
var enumerator = UnsafeGetEnumerator();
while (enumerator.MoveNext())
res.Add(enumerator.Current);
return res.GetEnumerator();
}
}
public IEnumerator<T> UnsafeGetEnumerator()
{
int index = head;
for (int i = 0; i < count; i++)
{
yield return values[index];
index = Incr(index, size);
}
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
#endregion
}
I like to use the Foo()/SafeFoo()/UnsafeFoo()
convention:
Foo
methods call UnsafeFoo
as a default.
UnsafeFoo
methods modify state freely without a lock, they should only call other unsafe methods.
SafeFoo
methods call UnsafeFoo
methods inside a lock.
Its a little verbose, but it makes obvious errors, like calling unsafe methods outside a lock in a method which is supposed to be thread-safe, more apparent.
回答5:
Just for fun, here is another implementation that I believe addresses most of the commenters\' concerns. In particular, thread-safety is achieved without locking and the implementation is hidden by the wrapping class.
public class FixedSizeQueue<T> : IReadOnlyCollection<T>
{
private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private int _count;
public int Limit { get; private set; }
public FixedSizeQueue(int limit)
{
this.Limit = limit;
}
public void Enqueue(T obj)
{
_queue.Enqueue(obj);
Interlocked.Increment(ref _count);
// Calculate the number of items to be removed by this thread in a thread safe manner
int currentCount;
int finalCount;
do
{
currentCount = _count;
finalCount = Math.Min(currentCount, this.Limit);
} while (currentCount !=
Interlocked.CompareExchange(ref _count, finalCount, currentCount));
T overflow;
while (currentCount > finalCount && _queue.TryDequeue(out overflow))
currentCount--;
}
public int Count
{
get { return _count; }
}
public IEnumerator<T> GetEnumerator()
{
return _queue.GetEnumerator();
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return _queue.GetEnumerator();
}
}
回答6:
My version is just a subclass of normal Queue
ones.. nothing special but seeing everyone participating and it still goes with the topic title I might as well put it here. It also returns the dequeued ones just in case.
public sealed class SizedQueue<T> : Queue<T>
{
public int FixedCapacity { get; }
public SizedQueue(int fixedCapacity)
{
this.FixedCapacity = fixedCapacity;
}
/// <summary>
/// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
/// </summary>
/// <returns>The dequeued value, if any.</returns>
public new T Enqueue(T item)
{
base.Enqueue(item);
if (base.Count > FixedCapacity)
{
return base.Dequeue();
}
return default;
}
}
回答7:
Well it depends upon the use I have noticed that some of above solution may exceed the size when used in multip-threaded environment. Anyway my use case was to display last 5 events and there are multiple threads writing events into the queue and one other thread reading from it and displaying it in a Winform Control. So this was my solution.
EDIT: Since we already using locking within our implementation we don\'t really need ConcurrentQueue it may improve the performance.
class FixedSizedConcurrentQueue<T>
{
readonly Queue<T> queue = new Queue<T>();
readonly object syncObject = new object();
public int MaxSize { get; private set; }
public FixedSizedConcurrentQueue(int maxSize)
{
MaxSize = maxSize;
}
public void Enqueue(T obj)
{
lock (syncObject)
{
queue.Enqueue(obj);
while (queue.Count > MaxSize)
{
queue.Dequeue();
}
}
}
public T[] ToArray()
{
T[] result = null;
lock (syncObject)
{
result = queue.ToArray();
}
return result;
}
public void Clear()
{
lock (syncObject)
{
queue.Clear();
}
}
}
EDIT: We don\'t really need syncObject
in above example and we can rather use queue
object since we are not re-initializing queue
in any function and its marked as readonly
anyway.
回答8:
For your coding pleasure I submit to you the \'ConcurrentDeck
\'
public class ConcurrentDeck<T>
{
private readonly int _size;
private readonly T[] _buffer;
private int _position = 0;
public ConcurrentDeck(int size)
{
_size = size;
_buffer = new T[size];
}
public void Push(T item)
{
lock (this)
{
_buffer[_position] = item;
_position++;
if (_position == _size) _position = 0;
}
}
public T[] ReadDeck()
{
lock (this)
{
return _buffer.Skip(_position).Union(_buffer.Take(_position)).ToArray();
}
}
}
Example Usage:
void Main()
{
var deck = new ConcurrentDeck<Tuple<string,DateTime>>(25);
var handle = new ManualResetEventSlim();
var task1 = Task.Factory.StartNew(()=>{
var timer = new System.Timers.Timer();
timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>(\"task1\",DateTime.Now));};
timer.Interval = System.TimeSpan.FromSeconds(1).TotalMilliseconds;
timer.Enabled = true;
handle.Wait();
});
var task2 = Task.Factory.StartNew(()=>{
var timer = new System.Timers.Timer();
timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>(\"task2\",DateTime.Now));};
timer.Interval = System.TimeSpan.FromSeconds(.5).TotalMilliseconds;
timer.Enabled = true;
handle.Wait();
});
var task3 = Task.Factory.StartNew(()=>{
var timer = new System.Timers.Timer();
timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>(\"task3\",DateTime.Now));};
timer.Interval = System.TimeSpan.FromSeconds(.25).TotalMilliseconds;
timer.Enabled = true;
handle.Wait();
});
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10));
handle.Set();
var outputtime = DateTime.Now;
deck.ReadDeck().Select(d => new {Message = d.Item1, MilliDiff = (outputtime - d.Item2).TotalMilliseconds}).Dump(true);
}
回答9:
Let\'s add one more answer. Why this over others?
1) Simplicity. Trying to guarantee size is well and good but leads to unneeded complexity that can exhibit its own problems.
2) Implements IReadOnlyCollection, meaning you can use Linq on it and pass it into a variety of things that expect IEnumerable.
3) No locking. Many of the solutions above use locks, which is incorrect on a lockless collection.
4) Implements the same set of methods, properties, and interfaces ConcurrentQueue does, including IProducerConsumerCollection, which is important if you want to use the collection with BlockingCollection.
This implementation could potentially end up with more entries than expected if TryDequeue fails, but the frequency of that occurring doesn\'t seem worth specialized code that will inevitably hamper performance and cause its own unexpected problems.
If you absolutely want to guarantee a size, implementing a Prune() or similar method seems like the best idea. You could use a ReaderWriterLockSlim read lock in the other methods (including TryDequeue) and take a write lock only when pruning.
class ConcurrentFixedSizeQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>, ICollection {
readonly ConcurrentQueue<T> m_concurrentQueue;
readonly int m_maxSize;
public int Count => m_concurrentQueue.Count;
public bool IsEmpty => m_concurrentQueue.IsEmpty;
public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty<T>(), maxSize) { }
public ConcurrentFixedSizeQueue (IEnumerable<T> initialCollection, int maxSize) {
if (initialCollection == null) {
throw new ArgumentNullException(nameof(initialCollection));
}
m_concurrentQueue = new ConcurrentQueue<T>(initialCollection);
m_maxSize = maxSize;
}
public void Enqueue (T item) {
m_concurrentQueue.Enqueue(item);
if (m_concurrentQueue.Count > m_maxSize) {
T result;
m_concurrentQueue.TryDequeue(out result);
}
}
public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result);
public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result);
public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index);
public T[] ToArray () => m_concurrentQueue.ToArray();
public IEnumerator<T> GetEnumerator () => m_concurrentQueue.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator () => GetEnumerator();
// Explicit ICollection implementations.
void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index);
object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot;
bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized;
// Explicit IProducerConsumerCollection<T> implementations.
bool IProducerConsumerCollection<T>.TryAdd (T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryAdd(item);
bool IProducerConsumerCollection<T>.TryTake (out T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryTake(out item);
public override int GetHashCode () => m_concurrentQueue.GetHashCode();
public override bool Equals (object obj) => m_concurrentQueue.Equals(obj);
public override string ToString () => m_concurrentQueue.ToString();
}
回答10:
Here\'s my take on the fixed size Queue
It uses regular Queue, to avoid the synchronization overhead when the Count
property is used on ConcurrentQueue
. It also implements IReadOnlyCollection so that LINQ methods can be used. The rest is very similar to other answers here.
public class FixedSizedQueue<T> : IReadOnlyCollection<T>
{
private readonly Queue<T> _queue = new Queue<T>();
private readonly object _lock = new object();
public int Count { get { lock (_lock) { return _queue.Count; } } }
public int Limit { get; }
public FixedSizedQueue(int limit)
{
Limit = limit;
}
public void Enqueue(T obj)
{
lock (_lock)
{
_queue.Enqueue(obj);
while (_queue.Count > Limit)
_queue.Dequeue();
}
}
public void Clear()
{
lock (_lock)
_queue.Clear();
}
public IEnumerator<T> GetEnumerator()
{
lock (_lock)
return new List<T>(_queue).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
回答11:
This is my version of the queue:
public class FixedSizedQueue<T> {
private object LOCK = new object();
ConcurrentQueue<T> queue;
public int MaxSize { get; set; }
public FixedSizedQueue(int maxSize, IEnumerable<T> items = null) {
this.MaxSize = maxSize;
if (items == null) {
queue = new ConcurrentQueue<T>();
}
else {
queue = new ConcurrentQueue<T>(items);
EnsureLimitConstraint();
}
}
public void Enqueue(T obj) {
queue.Enqueue(obj);
EnsureLimitConstraint();
}
private void EnsureLimitConstraint() {
if (queue.Count > MaxSize) {
lock (LOCK) {
T overflow;
while (queue.Count > MaxSize) {
queue.TryDequeue(out overflow);
}
}
}
}
/// <summary>
/// returns the current snapshot of the queue
/// </summary>
/// <returns></returns>
public T[] GetSnapshot() {
return queue.ToArray();
}
}
I find it useful to have a constructor that is built upon an IEnumerable and I find it useful to have a GetSnapshot to have a multithread safe list (array in this case) of the items at the moment of the call, that doesn\'t rise errors if the underlaying collection changes.
The double Count check is to prevent the lock in some circumstances.