Java executors: jak być powiadamianym, bez blokowania, gdy zadanie zostanie ukończone?

Powiedzmy, że mam kolejkę pełną zadań, które muszę przesłać do usługi executora. Mają być przetwarzane pojedynczo. Najprostszym sposobem, jaki przychodzi mi do głowy jest:

  1. Weź zadanie z kolejki
  2. prześlij go do wykonawcy
  3. Zadzwoń .get on the returned Future and block until a result is available
  4. weź kolejne zadanie z kolejki...

Staram się jednak nie blokować całkowicie. Jeśli mam 10000 takich kolejek, które jeśli ich zadania będą przetwarzane pojedynczo, zabraknie mi miejsca na stosie, ponieważ większość z nich będzie trzymać zablokowane wątki.

Chciałbym złożyć zadanie i dostarczyć call-back, który jest wywoływany po zakończeniu zadania. Użyję tego powiadomienia oddzwaniającego jako flagi do wysłania następnego zadania. (functionaljava i jetlang najwyraźniej używają takich nieblokujących algorytmów, ale nie rozumiem ich kodu)

Jak mogę to zrobić używając Javy JDK.util./ align = "left" / pisanie własnej usługi executora?

(kolejka, która karmi mnie tymi zadaniami, może sama się zablokować, ale jest to problem do rozwiązania później)

Author: Michael Myers, 2009-05-05

11 answers

Zdefiniuj interfejs wywołania zwrotnego, aby odbierał dowolne parametry, które chcesz przekazać w powiadomieniu o zakończeniu. Następnie wywołaj go na końcu zadania.

Można nawet napisać Ogólne Opakowanie Dla zadań, które można uruchomić, i przesłać je do ExecutorService. Zobacz też mechanizm wbudowany w Javę 8 poniżej.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

Z CompletableFuture, Java 8 zawiera bardziej rozbudowany sposób tworzenia potoków, w których procesy mogą być wykonywane asynchronicznie i warunkowo. Oto wymyślony, ale kompletny przykład powiadomienia.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}
 150
Author: erickson,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-08-17 16:57:52

W Javie 8 można użyć CompletableFuture. Oto przykład, który miałem w moim kodzie, gdzie używam go do pobierania użytkowników z mojej usługi użytkownika, mapowania ich do moich obiektów widoku, a następnie aktualizowania mojego widoku lub wyświetlania okna dialogowego błędu (jest to aplikacja GUI): {]}

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

Uruchamia się asynchronicznie. Używam dwóch prywatnych metod: mapUsersToUserViews i updateView.

 53
Author: Matt,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-08-29 14:20:02

Użyj guava ' s listenable future API i dodaj wywołanie zwrotne. Cf. ze strony:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});
 49
Author: Pierre-Henri,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-30 11:15:02

Można rozszerzyć klasę FutureTask i nadpisać metodę done(), a następnie dodać obiekt FutureTask do ExecutorService, więc metoda done() wywoła się, gdy FutureTask zostanie zakończona natychmiast.

 25
Author: Auguste,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-12-28 19:38:13

ThreadPoolExecutor posiada również metody Hooka beforeExecute i afterExecute, które można nadpisać i wykorzystać. Oto opis z ThreadPoolExecutor's Javadocs .

Metody Hooka

Ta klasa zapewnia protected overridable beforeExecute(java.lang.Thread, java.lang.Runnable) oraz afterExecute(java.lang.Runnable, java.lang.Throwable) metody, które są wywoływane przed i po wykonaniu każdego zadania. Mogą one być używane do manipulowania środowiskiem wykonawczym, na przykład do reinicjalizacji ThreadLocals, zbierania statystyk lub dodawania wpisów w dzienniku. Dodatkowo metoda terminated() może być nadpisany, aby wykonać dowolne specjalne przetwarzanie, które należy wykonać po całkowitym zakończeniu Executor. Jeśli metody hook lub callback rzucają wyjątki, wewnętrzne wątki robocze mogą z kolei zawieść i nagle się zakończyć.

 15
Author: Cem Catikkas,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-07-21 17:27:29

Użyj CountDownLatch.

Pochodzi z java.util.concurrent i jest to dokładnie sposób oczekiwania na zakończenie kilku wątków przed kontynuacją.

Aby osiągnąć efekt wywołania zwrotnego, o który dbasz, wymaga to trochę dodatkowej pracy. Mianowicie, obsługa tego przez siebie w osobnym wątku, który używa CountDownLatch i czeka na to, a następnie idzie o powiadomienie cokolwiek to jest trzeba powiadomić. Nie ma natywnego wsparcia dla callback, lub cokolwiek podobne do tego efektu.


EDIT: Teraz, kiedy rozumiem twoje pytanie, myślę, że niepotrzebnie posuwasz się za daleko. Jeśli pacjent przyjmuje regularne SingleThreadExecutor, daj mu wszystkie zadania, a zrobi kolejkowanie natywnie.

 6
Author: Yuval Adam,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-05-05 18:32:26

Jeśli chcesz mieć pewność, że żadne zadania nie będą uruchamiane w tym samym czasie, użyj SingleThreadedExecutor. Zadania będą przetwarzane w kolejności, w jakiej zostały złożone. Nie musisz nawet trzymać zadań, po prostu prześlij je do exec.

 5
Author: basszero,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-05-05 18:29:00

Prosty kod do implementacji mechanizmu Callback za pomocą ExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

Wyjście:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Uwagi kluczowe:

  1. jeśli chcesz, aby zadania procesowe były kolejno w kolejności FIFO, zastąp newFixedThreadPool(5) na newFixedThreadPool(1)
  2. Jeśli chcesz przetworzyć następne zadanie po przeanalizowaniu wyniku z callback poprzedniego zadania, po prostu anuluj komentarz poniżej linii

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. Możesz zastąpić newFixedThreadPool() jednym z

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    W zależności od przypadku użycia.

  4. Jeśli chcesz obsługiwać metodę callback asynchronicznie

    A. podaj współdzielone ExecutorService or ThreadPoolExecutor do zadania wywołującego

    B. Konwersja metody Callable na Callable/Runnable Zadanie

    C. Push callback task to ExecutorService or ThreadPoolExecutor

 2
Author: Ravindra babu,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-04-06 16:41:46

Jest to rozszerzenie do odpowiedzi Pache 'a za pomocą Guava' a ListenableFuture.

W szczególności, Futures.transform() zwraca ListenableFuture, więc może być używany do łańcuchowania wywołań asynchronicznych. Futures.addCallback() zwraca void, więc nie może być użyty do łączenia, ale jest dobry do obsługi sukcesu/porażki w asynchronicznym zakończeniu.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

Uwaga: oprócz łączenia zadań asynchronicznych, Futures.transform() pozwala również zaplanować każde zadanie na osobnym executorze (Nie pokazanym w tym przykładzie).

 2
Author: bcorso,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-01-29 05:56:01

Aby dodać do odpowiedzi Matta, która pomogła, oto bardziej rozbudowany przykład, aby pokazać korzystanie z połączenia zwrotnego.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

Wyjście To:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito
 1
Author: Old Jack,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-08-29 12:39:58

Możesz użyć implementacji Callable takiej, że

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

Gdzie CallbackInterface jest czymś bardzo podstawowym jak

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

A teraz główna klasa będzie wyglądała tak

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
 1
Author: Deepika Anand,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-01-09 18:00:59