Jak działa Spark aggregate function-aggregateByKey?

Powiedzmy, że mam system dystrybucji na 3 węzłach i moje dane są dystrybuowane między tymi węzłami. na przykład, mam test.plik csv, który istnieje na wszystkich 3 węzłach i zawiera 2 kolumny:

**row   | id,  c.**
---------------
row1  | k1 , c1  
row2  | k1 , c2  
row3  | k1 , c3  
row4  | k2 , c4  
row5  | k2 , c5  
row6  | k2 , c6  
row7  | k3 , c7  
row8  | k3 , c8  
row9  | k3 , c9  
row10 | k4 , c10   
row11 | k4 , c11  
row12 | k4 , c12 

Następnie używam SparkContext.plik tekstowy do odczytu pliku jako rdd i tak dalej. O ile rozumiem, każdy węzeł Spark worker odczytuje część a Z pliku. Więc teraz powiedzmy, że każdy węzeł będzie przechowywał:

  • węzeł 1: rząd 1~4
  • węzeł 2: rząd 5~8
  • węzeł 3: wiersz 9~12

Moje pytanie jest takie, że powiedzmy, że chcę wykonać obliczenia na tych danych, i jest jeden krok, który muszę zgrupować klucz razem, więc para wartości klucza będzie [k1 [{k1 c1} {k1 c2} {k1 c3}]].. i tak dalej.

Istnieje funkcja o nazwie groupByKey(), która jest bardzo droga w użyciu i aggregateByKey() jest zalecana do użycia. Zastanawiam się, jak działa groupByKey() i aggregateByKey() pod maską? Czy ktoś może wykorzystać powyższy przykład do wyjaśnienia? Po przetasowaniu gdzie znajdują się wiersze na każdym węzeł?

Author: gsamaras, 2014-07-17

2 answers

aggregateByKey() jest prawie identyczne z reduceByKey() (oba wywołania combineByKey() Za kulisami), z tym, że podajesz wartość początkową dla aggregateByKey(). Większość ludzi zna reduceByKey(), więc użyję tego w wyjaśnieniu.

Powód reduceByKey() jest o wiele lepszy, ponieważ korzysta z funkcji MapReduce zwanej kombinatorem. Każda funkcja jak + lub * może być używana w ten sposób, ponieważ kolejność elementów, na których jest wywoływana, nie ma znaczenia. Dzięki temu Spark może zacząć " zmniejszać" wartości z tym samym kluczem, nawet jeśli nie wszystkie znajdują się jeszcze na tej samej partycji.

Z drugiej strony groupByKey() daje większą wszechstronność, ponieważ piszesz funkcję, która ma Iterowalną, co oznacza, że możesz nawet wyciągnąć wszystkie elementy do tablicy. Jest to jednak nieefektywne, ponieważ aby działało, pełny zestaw par (K,V,) musi znajdować się w jednej partycji.

Krok, który przesuwa dane wokół operacji redukcji typu, jest ogólnie nazywany shuffle , w bardzo najprostszym poziom dane są dzielone na partycje do każdego węzła (często z partycjonerem hash), a następnie sortowane na każdym węźle.

 43
Author: aaronman,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-23 19:57:12

Agregatebykey() różni się od reduceByKey. Dzieje się tak, że reduceByKey jest rodzajem szczególnego przypadku aggregateByKey.

AggregateByKey() połączy wartości dla określonego klucza, a wynikiem takiej kombinacji może być dowolny obiekt, który określisz. Musisz określić, w jaki sposób wartości są łączone ("dodane") wewnątrz jednej partycji (która jest wykonywana w tym samym węźle) i w jaki sposób łączysz wynik z różnych partycji (które mogą znajdować się w różnych węzłach). reduceByKey jest szczególnym przypadkiem, w tym sensie, że wynik kombinacji (np. suma) jest tego samego typu Co wartości, a operacja po połączeniu z różnych partycji jest również taka sama jak operacja po połączeniu wartości wewnątrz partycji.

Przykład: Wyobraź sobie, że masz listę par. Można to porównać:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))

Teraz chcesz "połączyć" je za pomocą klucza produkującego sumę. W tym przypadku reduceByKey i aggregateByKey są to samo:

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))

//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))

Teraz wyobraź sobie, że chcesz, aby agregacja była zbiorem wartości, czyli innym typem niż wartości, które są liczbami całkowitymi (suma liczb całkowitych jest również liczbami całkowitymi):

import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the  _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))
 46
Author: Antoni,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-26 12:11:44