Tworzenie kolejki blokującej in.NET?

Mam scenariusz, w którym mam wiele wątków dodawanych do kolejki i wiele wątków czytanych z tej samej kolejki. Jeśli kolejka osiągnie określony rozmiar wszystkie wątki, które wypełniają kolejkę, zostaną zablokowane na add, dopóki element nie zostanie usunięty z kolejki.

Poniższe rozwiązanie jest tym, czego teraz używam i moje pytanie brzmi: jak można to poprawić? Czy istnieje obiekt, który już włącza to zachowanie w BCL, którego powinienem używać?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}
Author: Eric Schoonover, 2009-02-10

9 answers

To wygląda bardzo niebezpiecznie (bardzo mała synchronizacja); może coś w stylu:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(edytuj)

W rzeczywistości, chcesz sposób, aby zamknąć kolejkę tak, że czytelnicy zaczynają opuszczać czysto - być może coś w rodzaju flagi bool-jeśli jest ustawiona, pusta kolejka po prostu powraca (zamiast blokować): {]}

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
 192
Author: Marc Gravell,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-02-09 23:50:28

Użyj. Net 4 BlockingCollection, do enqueue użyj Add(), do dequeue użyj Take (). Wewnętrznie używa nieblokującego współbieżności. Więcej informacji tutaj Fast and Best Producer/consumer queue technique BlockingCollection vs concurrent Queue

 50
Author: xhafan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:13

" Jak można to poprawić?"

Cóż, musisz przyjrzeć się każdej metodzie w swojej klasie i zastanowić się, co by się stało, gdyby inny wątek wywoływał jednocześnie tę metodę lub jakąkolwiek inną metodę. Na przykład w metodzie Usuń umieszcza się blokadę, ale nie w metodzie Dodaj. Co się stanie, jeśli jeden wątek dodaje się w tym samym czasie, co inny wątek usuwa? Złe rzeczy.

Należy również wziąć pod uwagę, że metoda może zwrócić drugi obiekt, który zapewnia dostęp do pierwszego obiektu dane wewnętrzne-na przykład GetEnumerator. Wyobraź sobie, że jeden wątek przechodzi przez ten enumerator, inny wątek modyfikuje listę w tym samym czasie. Niedobrze.

Dobrą zasadą jest, aby to uprościć, zmniejszając liczbę metod w klasie do absolutnego minimum.

W szczególności, nie Dziedzicz innej klasy kontenera, ponieważ ujawnisz wszystkie metody tej klasy, zapewniając sposób, w jaki wywołujący może uszkodzić wewnętrzne dane lub aby zobaczyć częściowo kompletne zmiany danych(tak samo źle, ponieważ dane wydają się uszkodzone w tym momencie). Ukryj wszystkie szczegóły i bądź całkowicie bezwzględny w kwestii tego, jak zezwalasz na dostęp do nich.

Zdecydowanie odradzam korzystanie z gotowych rozwiązań - Kup książkę o gwintowaniu lub skorzystaj z biblioteki stron trzecich. W przeciwnym razie, biorąc pod uwagę to, co próbujesz, będziesz debugować swój kod przez długi czas.

Również, czy nie byłoby bardziej sensowne, aby usunąć, aby zwrócić przedmiot (powiedzmy, ten, który został dodany jako pierwszy, ponieważ jest to kolejka), a nie dzwoniący wybierając konkretny element? A gdy kolejka jest pusta, być może Usuń powinien również zablokować.

Aktualizacja: odpowiedź marca faktycznie realizuje wszystkie te sugestie! :) Ale zostawię to tutaj, ponieważ może być pomocne, aby zrozumieć, dlaczego jego wersja jest taka poprawa.

 14
Author: Daniel Earwicker,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-02-09 22:56:36

Możesz użyć BlockingCollectioni ConcurrentQueue w systemie.Kolekcje.Współbieżna Przestrzeń Nazw

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}
 7
Author: Andreas,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-02-06 10:39:38

Właśnie zapukałem to używając Reactive Extensions i przypomniałem sobie to pytanie:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}
Niekoniecznie całkowicie bezpieczne, ale bardzo proste.
 5
Author: Mark Rendle,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2010-05-07 14:14:36

To jest to, co przyszedłem op dla wątku Bezpieczny blokowania kolejki.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}
 4
Author: Kevin,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-05-29 06:19:58

Nie do końca zbadałem TPL , ale mogą mieć coś, co pasuje do Twoich potrzeb, lub przynajmniej jakiś materiał odblaskowy, z którego możesz czerpać inspirację.

Mam nadzieję, że to pomoże.

 2
Author: TheMissingLINQ,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-02-09 22:11:23

Możesz spojrzeć na klasę. Poza tym-nie, musisz zrobić to sam. AFAIK nie ma takiej wbudowanej kolekcji.

 0
Author: Vilx-,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-02-09 22:05:22

Jeśli chcesz uzyskać maksymalną przepustowość, pozwalającą na odczyt wielu czytników i pisanie tylko jednego pisarza, BCL ma coś o nazwie ReaderWriterLockSlim, co powinno pomóc w odchudzeniu kodu...

 -1
Author: DavidN,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-02-09 22:04:32