Niestandardowa Pula wątków w Java 8 parallel stream

Czy jest możliwe określenie własnej puli wątków dla Java 8 parallel stream ? Nie mogę go nigdzie znaleźć.

Wyobraź sobie, że mam aplikację serwerową i chciałbym używać parallel streams. Ale aplikacja jest duża i wielowątkowa, więc chcę ją podzielić. Nie chcę wolno działającego zadania w jednym module aplikacji blokuj zadania z innego modułu.

Jeśli nie mogę używać różnych puli wątków dla różnych modułów, oznacza to, że nie mogę bezpiecznie używaj równoległych strumieni w większości rzeczywistych sytuacji.

Wypróbuj poniższy przykład. Istnieją pewne zadania wymagające dużej ilości procesora wykonywane w oddzielnych wątkach. Zadania wykorzystują równoległe strumienie. Pierwsze zadanie jest przerwane, więc każdy krok zajmuje 1 sekundę (symulowany przez sen wątku). Problem polega na tym, że inne wątki utkną i czekają na ukończenie zepsutego zadania. Jest to wymyślony przykład, ale wyobraź sobie aplikację servleta i kogoś, kto wysyła długo działające zadanie do wspólnej puli połączeń fork.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}
Author: Stuart Marks, 2014-01-16

10 answers

Istnieje sztuczka, jak wykonać operację równoległą w określonej puli Fork-join. Jeśli wykonasz go jako zadanie w Puli Fork-join, pozostaje tam i nie używa wspólnego.

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

Sztuczka opiera się na ForkJoinTask.fork , który określa: "organizuje asynchronicznie wykonanie tego zadania w Puli, w której uruchomione jest bieżące zadanie, jeśli dotyczy, lub za pomocą ForkJoinPool.commonPool () if not inForkJoinPool () "

 304
Author: Lukas,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-02-26 06:17:50

Parallel streams używa domyślnego ForkJoinPool.commonPool, który domyślnie ma o jeden wątek mniej niż procesory , zwracane przez Runtime.getRuntime().availableProcessors() (oznacza to, że parallel streams używają wszystkich procesorów, ponieważ również używają głównego wątku):

Dla aplikacji, które wymagają osobnych lub niestandardowych pul, ForkJoinPool może być skonstruowany z określonym poziomem równoległości docelowej; domyślnie równa liczbie dostępnych procesorów.

Oznacza to również, że jeśli zagnieżdżono parallel streams lub wiele równoległych strumieni uruchomionych jednocześnie, wszystkie będą dzielić tę samą pulę. Zaleta: nigdy nie użyjesz więcej niż domyślna (liczba dostępnych procesorów). Wada: możesz nie mieć "wszystkich procesorów" przypisanych do każdego zainicjowanego strumienia równoległego (jeśli zdarzy ci się, że masz więcej niż jeden). (Widocznie możesz użyć ManagedBlocker , aby to obejść.)

Aby zmienić sposób wykonywania równoległych strumieni, można albo

  • prześlij wykonanie parallel stream do własnego ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); lub
  • możesz zmienić rozmiar wspólnej puli używając właściwości systemu: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") dla równoległości docelowej 20 wątków.

Przykład tego ostatniego na moim komputerze, który ma 8 procesorów. Jeśli uruchamiam następujący program:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

Wyjście to:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

Więc ty widać, że parallel stream przetwarza 8 elementów na raz, tzn. używa 8 wątków. Jednak, jeśli odkomentuję linię komentarza, wyjście będzie następujące:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

Tym razem, parallel stream użył 20 wątków i wszystkie 20 elementów w strumieniu zostały przetworzone jednocześnie.

 153
Author: assylias,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-24 09:35:22

Alternatywnie, aby uruchomić obliczenia równoległe wewnątrz własnego forkJoinPool, możesz również przekazać tę pulę do CompletableFuture.metoda supplyAsync jak w:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);
 31
Author: Mario Fusco,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-01-03 08:05:57

Używanie ForkJoinPool i submit dla równoległego strumienia nie wykorzystuje niezawodnie wszystkich wątków. Jeśli spojrzysz na to (Parallel stream z HashSet nie działa w parallel ) I this ( Dlaczego parallel stream nie używa wszystkich wątków ForkJoinPool?), zobaczysz rozumowanie.

Skrócona wersja: jeśli ForkJoinPool / submit nie działa dla ciebie, użyj

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");
 15
Author: Tod Casasent,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:36

Do tej pory korzystałem z rozwiązań opisanych w odpowiedziach na to pytanie. Teraz wymyśliłem małą bibliotekę o nazwie Parallel Stream Support do tego:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

Ale jak zauważył @PabloMatiasGomez w komentarzach, istnieją wady dotyczące mechanizmu podziału równoległych strumieni, który w dużym stopniu zależy od wielkości wspólnej puli. Zobacz Parallel stream z HashSet nie działa równolegle.

Używam tego rozwiązania tylko po to, aby mieć oddzielne pule dla różnych rodzajów pracy, ale nie mogę ustawić wielkości wspólnej puli na 1, nawet jeśli nie używam go.

 7
Author: Stefan Ferstl,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:36

Aby zmierzyć rzeczywistą liczbę użytych wątków, możesz sprawdzić Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

To może produkować na 4-rdzeniowym procesorze wyjście takie jak:

5 // common pool
23 // custom pool

Bez .parallel() daje:

3 // common pool
4 // custom pool
 5
Author: charlie,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-01-21 17:49:58

Idź po AbacusUtil . Numer wątku może być określony dla strumienia równoległego. Oto przykładowy kod:

LongStream.range(4, 1_000_000).parallel(threadNum)...

Ujawnienie: jestem twórcą AbacusUtil.

 1
Author: user_3380739,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-02 03:26:08

Jeśli nie masz nic przeciwko użyciu biblioteki innej firmy, za pomocą cyclops-react Możesz mieszać sekwencyjne i równoległe strumienie w tym samym potoku i dostarczać niestandardowe ForkJoinPools. Na przykład

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

Lub jeśli chcemy kontynuować przetwarzanie w sekwencyjnym strumieniu

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[Disclosure I am the lead developer of cyclops-react]

 0
Author: John McClean,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-03-10 12:04:19

Próbowałem custom ForkJoinPool w następujący sposób, aby dostosować rozmiar basenu:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

Oto wynik mówiący, że pula używa więcej wątków niż domyślne 4.

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

Ale faktycznie istnieje dziwak , kiedy próbowałem osiągnąć ten sam wynik używając ThreadPoolExecutor następująco:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));
Ale zawiodłem.

Rozpocznie się tylko parallelStream w nowym wątku i wtedy wszystko inne będzie po prostu takie samo, coPonownie udowadnia że parallelStream użyje forkjoinpool do uruchomienia swoich wątków potomnych.

 0
Author: Hearen,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-05-29 01:11:32

Uwaga: Wydaje się, że w JDK 10 zaimplementowano poprawkę, która zapewnia, że niestandardowa Pula wątków wykorzystuje oczekiwaną liczbę wątków.

Wykonywanie Parallel stream w ramach niestandardowego ForkJoinPool powinno być zgodne z równoległością https://bugs.openjdk.java.net/browse/JDK-8190974

 0
Author: Scott Langley,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-06-13 20:09:32