Jak przerwać BlockingQueue czyli blocking on take()?
Mam klasę, która pobiera obiekty z BlockingQueue
i przetwarza je przez wywołanie take()
W pętli ciągłej. W pewnym momencie wiem, że do kolejki nie zostaną dodane żadne obiekty. Jak przerwać metodę take()
, aby przestała blokować?
Oto klasa, która przetwarza obiekty:
public class MyObjHandler implements Runnable {
private final BlockingQueue<MyObj> queue;
public class MyObjHandler(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
MyObj obj = queue.take();
// process obj here
// ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
A oto metoda, która używa tej klasy do przetwarzania obiektów:
public void testHandler() {
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
// what code should go here to tell the handler
// to stop waiting for more objects?
}
5 answers
Jeśli przerwanie wątku nie jest opcją, inną jest umieszczenie w kolejce obiektu "marker" lub "command", który zostanie rozpoznany jako taki przez MyObjHandler i wyłamanie się z pętli.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-05-01 17:29:24
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
thread.interrupt();
Jednakże, jeśli to zrobisz, wątek może zostać przerwany, gdy w kolejce są jeszcze elementy oczekujące na przetworzenie. Warto rozważyć użycie poll
zamiast take
, co pozwoli wątkowi przetwarzającemu na timeout i zakończenie, gdy odczekał jakiś czas bez nowego wejścia.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-05-01 17:27:09
Bardzo późno, ale mam nadzieję, że to pomoże innym, ponieważ spotkałem się z podobnym problemem i zastosowałem podejście poll
zasugerowane przez erickson powyżej z niewielkimi zmianami,
class MyObjHandler implements Runnable
{
private final BlockingQueue<MyObj> queue;
public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
public MyObjHandler(BlockingQueue queue)
{
this.queue = queue;
Finished = false;
}
@Override
public void run()
{
while (true)
{
try
{
MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
{
// process obj here
// ...
}
if(Finished && queue.isEmpty())
return;
}
catch (InterruptedException e)
{
return;
}
}
}
}
public void testHandler()
{
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjHandler handler = new MyObjHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
{
queue.put(i.next());
}
// what code should go here to tell the handler to stop waiting for more objects?
handler.Finished = true; //THIS TELLS HIM
//If you need you can wait for the termination otherwise remove join
myThread.join();
}
To rozwiązało oba problemy
- oznaczył
BlockingQueue
aby wiedział, że nie musi dłużej czekać na elementy - nie przerwano pomiędzy nimi, tak aby bloki przetwarzania kończyły się tylko wtedy, gdy wszystkie elementy w kolejce są przetwarzane i nie ma elementów do wysłania dodano
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:10:30
Przerwij wątek:
thread.interrupt()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-05-01 17:28:00
Albo nie przerywaj, to paskudne.
public class MyQueue<T> extends ArrayBlockingQueue<T> {
private static final long serialVersionUID = 1L;
private boolean done = false;
public ParserQueue(int capacity) { super(capacity); }
public void done() { done = true; }
public boolean isDone() { return done; }
/**
* May return null if producer ends the production after consumer
* has entered the element-await state.
*/
public T take() throws InterruptedException {
T el;
while ((el = super.poll()) == null && !done) {
synchronized (this) {
wait();
}
}
return el;
}
}
- gdy producent umieszcza obiekt w kolejce, wywołuje
queue.notify()
, jeśli się kończy, wywołujequeue.done()
- pętla while (!KolejkaisDone ()//!KolejkaisEmpty ())
- test take() Zwraca wartość null
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-09-13 02:53:19