Niestandardowa Pula wątków w Java 8 parallel stream
Czy jest możliwe określenie własnej puli wątków dla Java 8 parallel stream ? Nie mogę go nigdzie znaleźć.
Wyobraź sobie, że mam aplikację serwerową i chciałbym używać parallel streams. Ale aplikacja jest duża i wielowątkowa, więc chcę ją podzielić. Nie chcę wolno działającego zadania w jednym module aplikacji blokuj zadania z innego modułu.
Jeśli nie mogę używać różnych puli wątków dla różnych modułów, oznacza to, że nie mogę bezpiecznie używaj równoległych strumieni w większości rzeczywistych sytuacji.
Wypróbuj poniższy przykład. Istnieją pewne zadania wymagające dużej ilości procesora wykonywane w oddzielnych wątkach. Zadania wykorzystują równoległe strumienie. Pierwsze zadanie jest przerwane, więc każdy krok zajmuje 1 sekundę (symulowany przez sen wątku). Problem polega na tym, że inne wątki utkną i czekają na ukończenie zepsutego zadania. Jest to wymyślony przykład, ale wyobraź sobie aplikację servleta i kogoś, kto wysyła długo działające zadanie do wspólnej puli połączeń fork.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
10 answers
Istnieje sztuczka, jak wykonać operację równoległą w określonej puli Fork-join. Jeśli wykonasz go jako zadanie w Puli Fork-join, pozostaje tam i nie używa wspólnego.
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
//parallel task here, for example
IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();
Sztuczka opiera się na ForkJoinTask.fork , który określa: "organizuje asynchronicznie wykonanie tego zadania w Puli, w której uruchomione jest bieżące zadanie, jeśli dotyczy, lub za pomocą ForkJoinPool.commonPool () if not inForkJoinPool () "
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-02-26 06:17:50
Parallel streams używa domyślnego ForkJoinPool.commonPool
, który domyślnie ma o jeden wątek mniej niż procesory , zwracane przez Runtime.getRuntime().availableProcessors()
(oznacza to, że parallel streams używają wszystkich procesorów, ponieważ również używają głównego wątku):
Dla aplikacji, które wymagają osobnych lub niestandardowych pul, ForkJoinPool może być skonstruowany z określonym poziomem równoległości docelowej; domyślnie równa liczbie dostępnych procesorów.
Oznacza to również, że jeśli zagnieżdżono parallel streams lub wiele równoległych strumieni uruchomionych jednocześnie, wszystkie będą dzielić tę samą pulę. Zaleta: nigdy nie użyjesz więcej niż domyślna (liczba dostępnych procesorów). Wada: możesz nie mieć "wszystkich procesorów" przypisanych do każdego zainicjowanego strumienia równoległego (jeśli zdarzy ci się, że masz więcej niż jeden). (Widocznie możesz użyć ManagedBlocker , aby to obejść.)
Aby zmienić sposób wykonywania równoległych strumieni, można albo
- prześlij wykonanie parallel stream do własnego ForkJoinPool:
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
lub - możesz zmienić rozmiar wspólnej puli używając właściwości systemu:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
dla równoległości docelowej 20 wątków.
Przykład tego ostatniego na moim komputerze, który ma 8 procesorów. Jeśli uruchamiam następujący program:
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});
Wyjście to:
215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416
Więc ty widać, że parallel stream przetwarza 8 elementów na raz, tzn. używa 8 wątków. Jednak, jeśli odkomentuję linię komentarza, wyjście będzie następujące:
215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216
Tym razem, parallel stream użył 20 wątków i wszystkie 20 elementów w strumieniu zostały przetworzone jednocześnie.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-24 09:35:22
Alternatywnie, aby uruchomić obliczenia równoległe wewnątrz własnego forkJoinPool, możesz również przekazać tę pulę do CompletableFuture.metoda supplyAsync jak w:
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
//parallel task here, for example
range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()),
forkJoinPool
);
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-01-03 08:05:57
Używanie ForkJoinPool i submit dla równoległego strumienia nie wykorzystuje niezawodnie wszystkich wątków. Jeśli spojrzysz na to (Parallel stream z HashSet nie działa w parallel ) I this ( Dlaczego parallel stream nie używa wszystkich wątków ForkJoinPool?), zobaczysz rozumowanie.
Skrócona wersja: jeśli ForkJoinPool / submit nie działa dla ciebie, użyj
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:36
Do tej pory korzystałem z rozwiązań opisanych w odpowiedziach na to pytanie. Teraz wymyśliłem małą bibliotekę o nazwie Parallel Stream Support do tego:
ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
.filter(PrimesPrint::isPrime)
.collect(toList())
Ale jak zauważył @PabloMatiasGomez w komentarzach, istnieją wady dotyczące mechanizmu podziału równoległych strumieni, który w dużym stopniu zależy od wielkości wspólnej puli. Zobacz Parallel stream z HashSet nie działa równolegle.
Używam tego rozwiązania tylko po to, aby mieć oddzielne pule dla różnych rodzajów pracy, ale nie mogę ustawić wielkości wspólnej puli na 1, nawet jeśli nie używam go.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:36
Aby zmierzyć rzeczywistą liczbę użytych wątków, możesz sprawdzić Thread.activeCount()
:
Runnable r = () -> IntStream
.range(-42, +42)
.parallel()
.map(i -> Thread.activeCount())
.max()
.ifPresent(System.out::println);
ForkJoinPool.commonPool().submit(r).join();
new ForkJoinPool(42).submit(r).join();
To może produkować na 4-rdzeniowym procesorze wyjście takie jak:
5 // common pool
23 // custom pool
Bez .parallel()
daje:
3 // common pool
4 // custom pool
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-01-21 17:49:58
Idź po AbacusUtil . Numer wątku może być określony dla strumienia równoległego. Oto przykładowy kod:
LongStream.range(4, 1_000_000).parallel(threadNum)...
Ujawnienie: jestem twórcą AbacusUtil.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-02 03:26:08
Jeśli nie masz nic przeciwko użyciu biblioteki innej firmy, za pomocą cyclops-react Możesz mieszać sekwencyjne i równoległe strumienie w tym samym potoku i dostarczać niestandardowe ForkJoinPools. Na przykład
ReactiveSeq.range(1, 1_000_000)
.foldParallel(new ForkJoinPool(10),
s->s.filter(i->true)
.peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
.max(Comparator.naturalOrder()));
Lub jeśli chcemy kontynuować przetwarzanie w sekwencyjnym strumieniu
ReactiveSeq.range(1, 1_000_000)
.parallel(new ForkJoinPool(10),
s->s.filter(i->true)
.peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
.map(this::processSequentially)
.forEach(System.out::println);
[Disclosure I am the lead developer of cyclops-react]
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-03-10 12:04:19
Próbowałem custom ForkJoinPool w następujący sposób, aby dostosować rozmiar basenu:
private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
return () -> aList.parallelStream()
.peek((i) -> {
String threadName = Thread.currentThread().getName();
ThreadNameSet.add(threadName);
})
.reduce(0L, Long::sum);
}
private static void testForkJoinPool() {
final int parallelism = 10;
ForkJoinPool forkJoinPool = null;
Long result = 0L;
try {
forkJoinPool = new ForkJoinPool(parallelism);
result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown(); //always remember to shutdown the pool
}
}
out.println(result);
out.println(ThreadNameSet);
}
Oto wynik mówiący, że pula używa więcej wątków niż domyślne 4.
50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]
Ale faktycznie istnieje dziwak , kiedy próbowałem osiągnąć ten sam wynik używając ThreadPoolExecutor
następująco:
BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));
Ale zawiodłem.
Rozpocznie się tylko parallelStream w nowym wątku i wtedy wszystko inne będzie po prostu takie samo, coPonownie udowadnia że parallelStream
użyje forkjoinpool do uruchomienia swoich wątków potomnych.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-05-29 01:11:32
Uwaga: Wydaje się, że w JDK 10 zaimplementowano poprawkę, która zapewnia, że niestandardowa Pula wątków wykorzystuje oczekiwaną liczbę wątków.
Wykonywanie Parallel stream w ramach niestandardowego ForkJoinPool powinno być zgodne z równoległością https://bugs.openjdk.java.net/browse/JDK-8190974
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-06-13 20:09:32