Spark-repartition () vs coalesce()

Według nauki Spark

Należy pamiętać, że repartycjonowanie danych jest dość kosztowną operacją. Spark ma również zoptymalizowaną wersję repartition() o nazwie coalesce (), która pozwala uniknąć przepływu danych, ale tylko wtedy, gdy zmniejszasz liczbę partycji RDD.

Jedna różnica jest taka, że z repartition() liczba partycji może być zwiększona/zmniejszona, ale z coalesce() liczba partycji może być tylko zmniejszona.

Jeśli partycje są rozłożone na wiele maszyn i coalesce () jest uruchomione, jak może uniknąć przenoszenia danych?

Author: gsamaras, 2015-07-24

7 answers

Pozwala uniknąć pełnego przetasowania. Jeśli wiadomo, że liczba maleje, to executor może bezpiecznie przechowywać dane na minimalnej liczbie partycji, przenosząc dane tylko z dodatkowych węzłów, na węzły, które przechowujemy.

Więc, to pójdzie coś takiego:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

Następnie coalesce do 2 partycji:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

Zauważ, że Node 1 i Node 3 nie wymagały przeniesienia swoich oryginalnych danych.

 197
Author: Justin Pihony,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-07-24 14:13:21

Odpowiedź Justina jest niesamowita, a ta odpowiedź sięga głębiej.

Algorytm repartition wykonuje pełne przetasowanie i tworzy nowe partycje z danymi, które są równomiernie rozłożone. Stwórzmy ramkę z liczbami od 1 do 12.

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf zawiera 4 partycje na moim komputerze.

numbersDf.rdd.partitions.size // => 4

Oto jak dane są dzielone na partycje:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

Zróbmy pełne przetasowanie metodą repartition i uzyskajmy te dane na dwóch węzły.

val numbersDfR = numbersDf.repartition(2)

Oto jak numbersDfR Dane są partycjonowane na mojej maszynie:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

Metoda repartition tworzy nowe partycje i równomiernie rozprowadza dane w nowych partycjach (dystrybucja danych jest bardziej równomierna dla większych zbiorów danych).

Różnica między coalesce i repartition

coalesce używa istniejących partycji, aby zminimalizować ilość przetasowanych danych. repartition tworzy nowe partycje i wykonuje pełne przetasowanie. coalesce powoduje, że partycje z różne ilości danych (czasami partycje, które mają znacznie różne rozmiary) i repartition skutkują w przybliżeniu równymi rozmiarami partycji.

Czy coalesce czy repartition jest szybszy?

coalesce mogą działać szybciej niż repartition, ale partycje o różnej wielkości są na ogół wolniejsze w pracy niż partycje o równej wielkości. Zwykle po przefiltrowaniu dużego zestawu danych konieczne będzie ponowne podzielenie zbiorów danych. Znalazłem repartition, aby być szybszym ogólnie, ponieważ Spark jest zbudowany do pracy z równym rozmiarem partycje.

Przeczytaj ten wpis na blogu , jeśli chcesz jeszcze więcej szczegółów.

 84
Author: Powers,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-05 20:54:49

Dodatkowy punkt, aby zauważyć tutaj jest to, że, jako podstawowa zasada iskry RDD jest niezmienność. Repartition lub coalesce utworzy nowe RDD. Baza RDD będzie nadal istnieć z pierwotną liczbą partycji. W przypadku, gdy use case wymaga utrzymania RDD w pamięci podręcznej, to samo musi być zrobione dla nowo utworzonego RDD.

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2
 13
Author: Harikrishnan Ck,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-08-21 15:44:49

Wszystkie odpowiedzi dodają sporo wiedzy do tego bardzo często zadawanego pytania.

Więc zgodnie z tradycją tego pytania, oto moje 2 centy.

Okazało się, że repartycja jest szybsza niż coalesce , w bardzo konkretnym przypadku.

W moim wniosku, gdy liczba plików, które szacujemy jest niższa niż określony próg, repartition działa szybciej.

Oto co mam na myśli

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

W powyższym fragmencie, gdyby moje pliki były mniej niż 20, coalesce zajęło wieczność, aby zakończyć, podczas gdy repartycja była znacznie szybsza i tak powyższy kod.

Oczywiście liczba ta (20) zależy od liczby pracowników i ilości danych.

Mam nadzieję, że to pomoże.
 4
Author: Abhishek,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-07-24 10:00:36

W prosty sposób COALESCE : - jest tylko dla zmniejszenia liczby partycji, bez tasowania danych to po prostu kompresuje partycje

REPARTITION: - jest zarówno dla zwiększenia i zmniejszenia liczby partycji , ale tasowanie odbywa się

Przykład:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

Oba działa dobrze

Ale ogólnie chodzi o te dwie rzeczy, kiedy musimy zobaczyć wyjście w jednej klastrze, idziemy z tym.

 0
Author: Bujuti Niranjan Reddy,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-08-24 06:46:50

Ale również należy upewnić się, że dane, które nadchodzi węzły koalesce powinny mieć wysoko skonfigurowane, jeśli masz do czynienia z ogromnymi danymi. Ponieważ wszystkie dane zostaną załadowane do tych węzłów, może prowadzić wyjątek pamięci. Chociaż zadośćuczynienie jest kosztowne, wolę go używać. Ponieważ tasuje i rozprowadza dane równomiernie.

Bądź mądry, aby wybrać między coalesce i repartition.

 0
Author: Arun Goudar,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-30 13:10:23

Repartition - zaleca się użycie repartition przy jednoczesnym zwiększeniu liczby partycji, ponieważ wiąże się to z tasowaniem wszystkich danych.

Coalesce-zaleca się stosowanie coalesce przy jednoczesnym zmniejszeniu liczby partycji. Na przykład, jeśli masz 3 partycje i chcesz zredukować je do 2 partycji, Coalesce przeniesie dane trzeciej partycji do partycji 1 i 2. Partycja 1 i 2 pozostaną w tym samym kontenerze.ale zmiana spowoduje przetasowanie danych we wszystkich partycjach, więc użycie sieci między executor będzie wysoki i to wpływa na wydajność.

Wydajność mądrze łączy wydajność lepiej niż repartycji przy jednoczesnym zmniejszeniu liczby partycji.

 0
Author: Kamalesan C,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-31 07:14:07