Jak przerwać BlockingQueue czyli blocking on take()?

Mam klasę, która pobiera obiekty z BlockingQueue i przetwarza je przez wywołanie take() W pętli ciągłej. W pewnym momencie wiem, że do kolejki nie zostaną dodane żadne obiekty. Jak przerwać metodę take(), aby przestała blokować?

Oto klasa, która przetwarza obiekty:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

A oto metoda, która używa tej klasy do przetwarzania obiektów:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}
Author: MCS, 2009-05-01

5 answers

Jeśli przerwanie wątku nie jest opcją, inną jest umieszczenie w kolejce obiektu "marker" lub "command", który zostanie rozpoznany jako taki przez MyObjHandler i wyłamanie się z pętli.

 62
Author: Chris Thornhill,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-05-01 17:29:24
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

Jednakże, jeśli to zrobisz, wątek może zostać przerwany, gdy w kolejce są jeszcze elementy oczekujące na przetworzenie. Warto rozważyć użycie poll zamiast take, co pozwoli wątkowi przetwarzającemu na timeout i zakończenie, gdy odczekał jakiś czas bez nowego wejścia.

 12
Author: erickson,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-05-01 17:27:09

Bardzo późno, ale mam nadzieję, że to pomoże innym, ponieważ spotkałem się z podobnym problemem i zastosowałem podejście poll zasugerowane przez erickson powyżej z niewielkimi zmianami,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

To rozwiązało oba problemy

  1. oznaczył BlockingQueue aby wiedział, że nie musi dłużej czekać na elementy
  2. nie przerwano pomiędzy nimi, tak aby bloki przetwarzania kończyły się tylko wtedy, gdy wszystkie elementy w kolejce są przetwarzane i nie ma elementów do wysłania dodano
 12
Author: dbw,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:10:30

Przerwij wątek:

thread.interrupt()
 1
Author: stepancheg,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-05-01 17:28:00

Albo nie przerywaj, to paskudne.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. gdy producent umieszcza obiekt w kolejce, wywołuje queue.notify(), jeśli się kończy, wywołuje queue.done()
  2. pętla while (!KolejkaisDone ()//!KolejkaisEmpty ())
  3. test take() Zwraca wartość null
 0
Author: tomasb,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-09-13 02:53:19