Różnica między CompletableFuture, Future I RxJava ' s Observable

Chciałbym poznać różnicę między CompletableFuture,Future oraz Observable RxJava.

Wiem, że wszystkie są asynchroniczne, ale

Future.get() blokuje wątek

CompletableFuture podaje metody wywołania zwrotnego

RxJava Observable --- podobne do CompletableFuture z innymi korzyściami (nie jestem pewien)

Na przykład: jeśli klient musi wykonać wiele wywołań usługi i gdy użyjemy Futures (Java) Future.get() będą wykonywane sekwencyjnie...chciałbym wiedzieć, jak jej lepiej w RxJava..

I dokumentacja http://reactivex.io/intro.html mówi

Trudno jest używać kontraktów Futures do optymalnego komponowania warunkowych przepływów asynchronicznych (lub jest to niemożliwe, ponieważ opóźnienia każdego żądania różnią się w czasie wykonywania). Można to oczywiście zrobić, ale szybko staje się skomplikowane (a tym samym podatne na błędy) lub przedwcześnie blokuje przyszłość.get(), która eliminuje korzyści z asynchronicznego wykonywania.

Naprawdę zainteresowany wiedz, jak rozwiązać ten problem. Trudno mi było to zrozumieć z dokumentacji.

Author: Hash, 2016-02-11

5 answers

Futures

Futures zostały wprowadzone w Java 5 (2004). Są one w zasadzie zastępcze dla wyniku operacji, która nie została jeszcze zakończona. Po zakończeniu operacji Future będzie zawierał ten wynik. Na przykład operacja może być instancją Runnable lub Callable, która jest przesyłana do ExecutorService. Zgłaszający operację może użyć obiektu Future do sprawdzenia, czy operacja isDone () , czy wait aby zakończyć korzystanie z metody blocking get () .

Przykład:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures zostały wprowadzone w Java 8 (2014). Są one w rzeczywistości ewolucją regularnych Futures, inspirowane przez Google ' a Listenable Futures, część Guava biblioteki. Są to Futures, które pozwalają również na ciąg zadań razem w łańcuchu. Możesz ich użyć, aby powiedzieć niektórym wątkom roboczym, aby " wykonali jakieś zadanie X, a kiedy jesteś skończony, idź zrobić tę drugą rzecz używając wyniku X". Korzystając z CompletableFutures, możesz zrobić coś z wynikiem operacji bez blokowania wątku, aby czekać na wynik. Oto prosty przykład:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava to cała biblioteka dla programowania reaktywnego stworzona w Netflix. Na pierwszy rzut oka wydaje się, że jest podobny do strumieni Java 8 . Jest, ale jest o wiele potężniejsza.

Podobnie w Futures, RxJava może być używany do ciągnięcia razem kilku synchronicznych lub asynchronicznych akcji w celu utworzenia potoku przetwarzania. W przeciwieństwie do kontraktów Futures, które są jednorazowe, RxJava działa na strumieniach zawierających zero lub więcej elementów. W tym niekończące się strumienie z nieskończoną liczbą przedmiotów. Jest również znacznie bardziej elastyczny i wydajny dzięki niewiarygodnie bogatemu zestawowi operatorów .

W przeciwieństwie do strumieni Java 8, RxJava posiada również mechanizm backpressure, który pozwala do obsługi przypadków, w których różne części rurociągu przetwarzania działają w różnych wątkach, z różną szybkością .

Minusem RxJava jest to, że pomimo solidnej dokumentacji, jest to trudna biblioteka do nauki ze względu na zmianę paradygmatu. Kod Rx może być również koszmarem do debugowania, zwłaszcza jeśli zaangażowanych jest wiele wątków, a co gorsza - jeśli potrzebne jest przeciwciśnienie.

Jeśli chcesz się do niego dostać, jest cała Strona z różnych tutoriale na oficjalnej stronie internetowej, plus oficjalna dokumentacja iJavadoc . Możesz również rzucić okiem na niektóre filmy, takie jak ten , który daje krótkie wprowadzenie do Rx, a także mówi o różnicach między Rx i Futures.

Bonus: Java 9 Reactive Streams

Java 9 ' s Reactive Streams aka Flow API to zestaw interfejsów zaimplementowanych przez różne biblioteki reactive streams, takie jak RxJava 2, strumienie Akka i Vertx. Pozwalają one na wzajemne połączenie tych reaktywnych bibliotek, zachowując wszystkie ważne przeciwciśnienie.

 314
Author: Malt,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-07-12 10:22:05

Pracuję z Rx Java od 0.9, teraz w 1.3.2 i wkrótce migracja do 2.x używam tego w prywatnym projekcie, nad którym pracuję już 8 lat.

Nie programowałbym już bez tej biblioteki. Na początku byłem sceptyczny, ale to jest zupełnie inny stan umysłu, który musisz stworzyć. Quiete trudne na początku. Czasami godzinami patrzyłem na kulki.. lol

To tylko kwestia praktyki i naprawdę poznania przepływu (aka kontrakt obserwatorów i obserwatorów), gdy już tam dotrzesz, nie będziesz chciał robić tego inaczej.

Dla mnie nie ma żadnego minusa w tej bibliotece.

Przypadek użycia: Mam Widok monitora, który zawiera 9 wskaźników (cpu, mem, sieć itp...). Podczas uruchamiania widoku, widok zapisuje się do klasy monitora systemowego, która zwraca obserwowalny (interwał), który zawiera wszystkie dane dla 9 metrów. Popchnie co sekundę nowy wynik do widoku (więc nie sondowanie !!!). Że obserwowalny używa flatmapy do jednoczesnego (asynchronicznego!) pobiera dane z 9 różnych źródeł i zamienia wynik w nowy model, który twój widok otrzyma w onnext ().

Jak do cholery to zrobisz z futures, completables itp ... Powodzenia ! :)

RX Java rozwiązuje dla mnie wiele problemów w programowaniu i w pewien sposób ułatwia...

Zalety:

    Statelss !!! (ważna rzecz, o której warto wspomnieć, może najważniejsza)
  • Zarządzanie wątkami z pudełko
  • tworzenie sekwencji, które mają swój własny cykl życia
  • Wszystko jest obserwowalne, więc łączenie jest łatwe
  • mniej kodu do napisania
  • pojedynczy słoik na classpath (bardzo lekki)
  • Highly concurrent
  • No callback hell anymore
  • oparte na abonencie (ścisła umowa między konsumentem a producentem)
  • strategie przeciwciśnienia (circuit breaker a like)
  • wspaniały błąd obsługa i odzyskiwanie
  • Very nice documentation (marbles
  • Pełna kontrola
  • Wiele innych ...

Wady: - Trudny do przetestowania

 24
Author: Kristoffer,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-06-11 13:31:59

Główną zaletą CompletableFuture nad normalną przyszłością jest to, że CompletableFuture wykorzystuje niezwykle potężne API stream i daje narzędzia do obsługi oddzwaniania do łańcucha zadań, które są absolutnie nieobecne, jeśli używasz normal Future. CompletableFuture to nie tylko architektura asynchroniczna, ale także sposób na obsługę dużych zadań obliczeniowych-redukcję zadań bez martwienia się o wydajność aplikacji.

 3
Author: asmitB,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-08-30 06:48:35

Wszystkie trzy interfejsy służą do przesyłania wartości od producenta do konsumenta. Konsumenci mogą być 2 rodzajów:

  • synchronous: konsument wykonuje blokowanie wywołania, które powraca, gdy wartość jest gotowa
  • asynchroniczny: gdy wartość jest gotowa, wywołana jest metoda wywołania zwrotnego konsumenta

Również interfejsy komunikacyjne różnią się w inny sposób:

  • możliwość przeniesienia pojedynczej wartości wielu wartości
  • jeśli jest wiele wartości, można obsługiwać przeciwciśnienie lub nie

W wyniku:

  • Future transferuje pojedynczą wartość za pomocą interfejsu synchronicznego

  • CompletableFuture transferuje pojedynczą wartość za pomocą interfejsów synchronicznych i asynchronicznych

  • Rx transferuje wiele wartości za pomocą asynchronicznego interfejsu z przeciwciśnieniem

Ponadto wszystkie te urządzenia komunikacyjne wspierają przenoszenie WYJĄTKÓW. Nie zawsze tak jest. Na przykład BlockingQueue nie.
 2
Author: Alexei Kaigorodov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-08-30 16:04:55

Future Java jest symbolem zastępczym do przechowywania czegoś, co zostanie uzupełnione w przyszłości za pomocą blokującego API. Musisz użyć jej metody' isDone(), aby okresowo sprawdzać, czy zadanie zostało zakończone. Z pewnością możesz zaimplementować własny kod asynchroniczny do zarządzania logiką ankiety. Jednak wiąże się to z większą ilością kodu kotła i obciążeniem debugowania.

Java {[2] } jest innowacją przyszłości Scali. Prowadzi wewnętrzną metodę oddzwaniania. Po zakończeniu, metoda callback będzie być wyzwalane i powiedzieć wątkowi, że operacja powinna zostać wykonana. Dlatego posiada metodę thenApply do wykonywania dalszych operacji na obiekcie zawiniętym w CompletableFuture.

RxJava ' s Observable jest ulepszoną wersją CompletableFuture. Pozwala na obsługę przeciwciśnienia. W metodzie thenApply (i nawet w przypadku jej braci thenApplyAsync), o której wspomnialiśmy powyżej, taka sytuacja może się zdarzyć: metoda downstream chce wywołać zewnętrzną usługę, która może czasami stać się niedostępna. W tym przypadku CompleteableFuture zawiedzie całkowicie i będziesz musiał poradzić sobie z błędem samodzielnie. Jednak Observable pozwala na obsługę przeciwciśnienia i kontynuowanie wykonywania, gdy usługa zewnętrzna stanie się dostępna.

Ponadto istnieje podobny interfejs Observable: Flowable. Są one przeznaczone do różnych celów. Zazwyczaj Flowable jest dedykowany do obsługi operacji zimnych i nieterminowych, podczas gdy Observable jest dedykowany do obsługi egzekucji wymagających natychmiastowej reakcji. Zobacz oficjalne dokumenty tutaj: https://github.com/ReactiveX/RxJava#backpressure

 0
Author: RESSY VAN,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-12-29 21:41:52