Spark-repartition () vs coalesce()
Według nauki Spark
Należy pamiętać, że repartycjonowanie danych jest dość kosztowną operacją. Spark ma również zoptymalizowaną wersję repartition() o nazwie coalesce (), która pozwala uniknąć przepływu danych, ale tylko wtedy, gdy zmniejszasz liczbę partycji RDD.
Jedna różnica jest taka, że z repartition() liczba partycji może być zwiększona/zmniejszona, ale z coalesce() liczba partycji może być tylko zmniejszona.
Jeśli partycje są rozłożone na wiele maszyn i coalesce () jest uruchomione, jak może uniknąć przenoszenia danych?
7 answers
Pozwala uniknąć pełnego przetasowania. Jeśli wiadomo, że liczba maleje, to executor może bezpiecznie przechowywać dane na minimalnej liczbie partycji, przenosząc dane tylko z dodatkowych węzłów, na węzły, które przechowujemy.
Więc, to pójdzie coś takiego:
Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12
Następnie coalesce
do 2 partycji:
Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
Zauważ, że Node 1 i Node 3 nie wymagały przeniesienia swoich oryginalnych danych.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-07-24 14:13:21
Odpowiedź Justina jest niesamowita, a ta odpowiedź sięga głębiej.
Algorytm repartition
wykonuje pełne przetasowanie i tworzy nowe partycje z danymi, które są równomiernie rozłożone. Stwórzmy ramkę z liczbami od 1 do 12.
val x = (1 to 12).toList
val numbersDf = x.toDF("number")
numbersDf
zawiera 4 partycje na moim komputerze.
numbersDf.rdd.partitions.size // => 4
Oto jak dane są dzielone na partycje:
Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12
Zróbmy pełne przetasowanie metodą repartition
i uzyskajmy te dane na dwóch węzły.
val numbersDfR = numbersDf.repartition(2)
Oto jak numbersDfR
Dane są partycjonowane na mojej maszynie:
Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11
Metoda repartition
tworzy nowe partycje i równomiernie rozprowadza dane w nowych partycjach (dystrybucja danych jest bardziej równomierna dla większych zbiorów danych).
Różnica między coalesce
i repartition
coalesce
używa istniejących partycji, aby zminimalizować ilość przetasowanych danych. repartition
tworzy nowe partycje i wykonuje pełne przetasowanie. coalesce
powoduje, że partycje z różne ilości danych (czasami partycje, które mają znacznie różne rozmiary) i repartition
skutkują w przybliżeniu równymi rozmiarami partycji.
Czy coalesce
czy repartition
jest szybszy?
coalesce
mogą działać szybciej niż repartition
, ale partycje o różnej wielkości są na ogół wolniejsze w pracy niż partycje o równej wielkości. Zwykle po przefiltrowaniu dużego zestawu danych konieczne będzie ponowne podzielenie zbiorów danych. Znalazłem repartition
, aby być szybszym ogólnie, ponieważ Spark jest zbudowany do pracy z równym rozmiarem partycje.
Przeczytaj ten wpis na blogu , jeśli chcesz jeszcze więcej szczegółów.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-05 20:54:49
Dodatkowy punkt, aby zauważyć tutaj jest to, że, jako podstawowa zasada iskry RDD jest niezmienność. Repartition lub coalesce utworzy nowe RDD. Baza RDD będzie nadal istnieć z pierwotną liczbą partycji. W przypadku, gdy use case wymaga utrzymania RDD w pamięci podręcznej, to samo musi być zrobione dla nowo utworzonego RDD.
scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26
scala> res16.partitions.length
res17: Int = 10
scala> pairMrkt.partitions.length
res20: Int = 2
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-08-21 15:44:49
Wszystkie odpowiedzi dodają sporo wiedzy do tego bardzo często zadawanego pytania.
Więc zgodnie z tradycją tego pytania, oto moje 2 centy.Okazało się, że repartycja jest szybsza niż coalesce , w bardzo konkretnym przypadku.
W moim wniosku, gdy liczba plików, które szacujemy jest niższa niż określony próg, repartition działa szybciej.
Oto co mam na myśli
if(numFiles > 20)
df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
W powyższym fragmencie, gdyby moje pliki były mniej niż 20, coalesce zajęło wieczność, aby zakończyć, podczas gdy repartycja była znacznie szybsza i tak powyższy kod.
Oczywiście liczba ta (20) zależy od liczby pracowników i ilości danych.
Mam nadzieję, że to pomoże.Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-07-24 10:00:36
W prosty sposób COALESCE : - jest tylko dla zmniejszenia liczby partycji, bez tasowania danych to po prostu kompresuje partycje
REPARTITION: - jest zarówno dla zwiększenia i zmniejszenia liczby partycji , ale tasowanie odbywa się
Przykład:-
val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)
Oba działa dobrze
Ale ogólnie chodzi o te dwie rzeczy, kiedy musimy zobaczyć wyjście w jednej klastrze, idziemy z tym.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-08-24 06:46:50
Ale również należy upewnić się, że dane, które nadchodzi węzły koalesce powinny mieć wysoko skonfigurowane, jeśli masz do czynienia z ogromnymi danymi. Ponieważ wszystkie dane zostaną załadowane do tych węzłów, może prowadzić wyjątek pamięci. Chociaż zadośćuczynienie jest kosztowne, wolę go używać. Ponieważ tasuje i rozprowadza dane równomiernie.
Bądź mądry, aby wybrać między coalesce i repartition.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-30 13:10:23
Repartition - zaleca się użycie repartition przy jednoczesnym zwiększeniu liczby partycji, ponieważ wiąże się to z tasowaniem wszystkich danych.
Coalesce-zaleca się stosowanie coalesce przy jednoczesnym zmniejszeniu liczby partycji. Na przykład, jeśli masz 3 partycje i chcesz zredukować je do 2 partycji, Coalesce przeniesie dane trzeciej partycji do partycji 1 i 2. Partycja 1 i 2 pozostaną w tym samym kontenerze.ale zmiana spowoduje przetasowanie danych we wszystkich partycjach, więc użycie sieci między executor będzie wysoki i to wpływa na wydajność.
Wydajność mądrze łączy wydajność lepiej niż repartycji przy jednoczesnym zmniejszeniu liczby partycji.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-31 07:14:07