Zmniejsz parę klucz-wartość do pary klucz-Lista za pomocą Apache Spark

Piszę aplikację Spark i chcę połączyć zestaw par klucz-wartość (K, V1), (K, V2), ..., (K, Vn) w jedną parę klucz-Multivalue (K, [V1, V2, ..., Vn]). Czuję, że powinienem być w stanie to zrobić za pomocą funkcji reduceByKey z czymś o smaku:

My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

Błąd, który dostaję, gdy to nastąpi to:

Obiekt'NoneType' nie posiada atrybucji 'append'.

Moje klucze to liczby całkowite i wartości V1,..., Vn to krotki. Moim celem jest stworzenie jednej pary z kluczem i listy wartości (krotki).

Author: Community, 2014-11-18

9 answers

Mapa i ReduceByKey

Typ wejścia i typ wyjścia reduce muszą być takie same, dlatego jeśli chcesz połączyć listę, musisz map wejść do listy. Następnie łączysz listy w jedną listę.

Listy łączące

Będziesz potrzebował metody łączenia list w jedną listę. Phyton dostarcza kilka metod do łączenia list .

append modyfikuje pierwszą listę i zawsze powróci None.

x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]

extend robi to samo, ale rozpakowuje listy:

x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]

Obie metody zwracają None, ale będziesz potrzebował metody, która zwraca połączoną listę, dlatego po prostu użyj znaku plus.

x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]

Spark

file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
         .map(lambda actor: (actor.split(",")[0], actor)) \ 

         # transform each value into a list
         .map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \

         # combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
         .reduceByKey(lambda a, b: a + b)

CombineByKey

Możliwe jest również rozwiązanie tego za pomocą combineByKey, który jest używany wewnętrznie do implementacji reduceByKey, ale jest bardziej złożony i "korzystanie z jednego z wyspecjalizowanych kombinatorów na klucze w Spark może być znacznie szybciej". Twój przypadek użycia jest wystarczająco prosty dla górnego rozwiązania.

GroupByKey

Możliwe jest również rozwiązanie tego za pomocą groupByKey, ale zmniejsza równoległość i dlatego może być znacznie wolniejsza dla dużych zbiorów danych.

 41
Author: Christian Strempfer,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:26:10

Jestem trochę spóźniony na rozmowę, ale oto moja sugestia:

>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]
 13
Author: alreich,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-01-28 21:20:01

Tl; dr Jeśli naprawdę potrzebujesz takiej operacji użyj groupByKey zgodnie z sugestią by @ MariusIon. Każde inne rozwiązanie proponowane tutaj jest albo jawnie nieefektywne są co najmniej nieoptymalne w porównaniu do bezpośredniego grupowania.

reduceByKey z listą konkatenacja nie jest rozwiązaniem akceptowalnym, ponieważ:

  • wymaga inicjalizacji list O(N).
  • każde zastosowanie + do pary list wymaga pełnej kopii obu list (O (N)) skutecznie zwiększając ogólną złożoność do O (N2).
  • nie rozwiązuje żadnego z problemów wprowadzonych przez groupByKey. Ilość danych, które mają zostać przetasowane, jak również rozmiar ostatecznej struktury są takie same.
  • W przeciwieństwie do sugerowanego przez jedną z odpowiedzi nie ma różnicy w poziomie równoległości między implementacją przy użyciu reduceByKey i groupByKey.

combineByKey z list.extend jest nieoptymalnym roztworem ponieważ:

  • tworzy O(N) listę obiektów w MergeValue (można to zoptymalizować używając list.append bezpośrednio na nowym elemencie).
  • jeśli jest zoptymalizowana za pomocą list.append jest dokładnie równoważna starej (Spark groupByKey i ignoruje wszystkie optymalizacje wprowadzone przez SPARK-3074, która umożliwia zewnętrzne (na dysku) grupowanie struktur większych niż pamięć.
 12
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:18:17

Możesz użyć metody RDD groupByKey.

Input:

data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()

wyjście:

[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]
 11
Author: Marius Ion,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-04-24 10:42:41

Jeśli chcesz wykonać reduceByKey, gdzie typ w zredukowanych parach KV jest inny niż typ w oryginalnych parach KV, możesz użyć funkcji combineByKey. Funkcja przyjmuje pary KV i łączy je (według klucza) w pary KC, gdzie C jest innym typem niż V.

Jeden określa 3 Funkcje, createCombiner, mergevalue, mergecombiners. Pierwszy określa, jak przekształcić typ V W Typ C, drugi opisuje, jak połączyć Typ C z typem V, A last określa jak połączyć Typ C z innym typem C. Mój kod tworzy pary K-V:

Zdefiniuj 3 Funkcje w następujący sposób:

def Combiner(a):    #Turns value a (a tuple) into a list of a single tuple.
    return [a]

def MergeValue(a, b): #a is the new type [(,), (,), ..., (,)] and b is the old type (,)
    a.extend([b])
    return a

def MergeCombiners(a, b): #a is the new type [(,),...,(,)] and so is b, combine them
    a.extend(b)
    return a

Then, My_KMV = My_KV.combineByKey(Combiner, MergeValue, MergeCombiners)

Najlepszy zasób jaki znalazłem przy użyciu tej funkcji to: http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

Jak zauważyli inni, a.append(b) lub a.extend(b) return None. Tak więc reduceByKey(lambda a, b: a.append(b)) zwraca None na pierwszej parze par KV, a następnie zawodzi na drugiej parze bo nie ma.dołączenie (b) nie powiodło się. Można to obejść definiując oddzielną funkcję:

 def My_Extend(a,b):
      a.extend(b)
      return a

Następnie wywołanie reduceByKey(lambda a, b: My_Extend(a,b)) (użycie funkcji lambda tutaj może być niepotrzebne, ale nie testowałem tego przypadku.)

 3
Author: TravisJ,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-11-20 15:39:59

Ok. Mam nadzieję, że dobrze zrozumiałem. Twój wkład jest podobny do tego:

kv_input = [("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 5)]

I chcesz dostać coś takiego:

kmv_output = [("a", [1, 2, 3]), ("b", [1, 5])]

To może zadziałać (zobacz tutaj):

d = dict()
for k, v in kv_input:
    d.setdefault(k, list()).append(v)
kmv_output = list(d.items())

Jeśli się mylę, proszę mi powiedzieć, więc mogę dostosować to do Twoich potrzeb.

P. S.: a.append([b]) zwraca zawsze None. Możesz obserwować albo [b] albo a, ale nie wynik append.

 1
Author: Dave J,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-11-19 10:38:45

Komunikat o błędzie wynika z Typu " a " w Twoim zamknięciu.

 My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

Niech pySpark jawnie oceni a jako listę. Na przykład,

My_KMV = My_KV.reduceByKey(lambda a,b:[a].extend([b]))

W wielu przypadkach, reduceByKey będzie lepszy od groupByKey, odnoszą się do: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

 1
Author: Seung-Hwan Lim,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-23 17:52:21

Próbowałem z combineByKey ,oto moje kroki

combineddatardd=sc.parallelize([("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)])

combineddatardd.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y).collect()

Wyjście:

[('A', [3, 9, 12]), ('B', [4, 10, 11])]
  1. Zdefiniuj funkcję dla combiner, która ustawia accumulator na pierwszą parę wartości klucza, którą napotka wewnątrz partycji przekonwertuj wartość na Listę w tym kroku

  2. Zdefiniuj funkcję, która łączy nową wartość tego samego klucza z wartością akumulatora przechwyconą w kroku 1 Uwaga: - Przelicz wartość na Listę w tej funkcji, ponieważ wartość akumulatora została przekonwertowana na Listę w pierwszym Krok

  3. Zdefiniuj funkcję scalania wyjść kombinatorów poszczególnych partycji.

 1
Author: krishna rachur,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-18 23:45:11

Trafiłem na tę stronę szukając przykładu Javy dla tego samego problemu. (Jeśli Twoja sprawa jest podobna, oto mój przykład)

Sztuczka polega na tym, że musisz grupować klucze.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class SparkMRExample {

    public static void main(String[] args) {
        // spark context initialisation
        SparkConf conf = new SparkConf()
                .setAppName("WordCount")
                .setMaster("local");
        JavaSparkContext context = new JavaSparkContext(conf);

        //input for testing;
        List<String> input = Arrays.asList("Lorem Ipsum is simply dummy text of the printing and typesetting industry.",
                "Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book.",
                "It has survived not only for centuries, but also the leap into electronic typesetting, remaining essentially unchanged.",
                "It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing");
        JavaRDD<String> inputRDD = context.parallelize(input);


        // the map phase of word count example
        JavaPairRDD<String, Integer> mappedRDD =
                inputRDD.flatMapToPair( line ->                      // for this input, each string is a line
                        Arrays.stream(line.split("\\s+"))            // splitting into words, converting into stream
                                .map(word -> new Tuple2<>(word, 1))  // each word is assigned with count 1
                                .collect(Collectors.toList()));      // stream to iterable

        // group the tuples by key
        // (String,Integer) -> (String, Iterable<Integer>)
        JavaPairRDD<String, Iterable<Integer>> groupedRDD = mappedRDD.groupByKey();

        // the reduce phase of word count example
        //(String, Iterable<Integer>) -> (String,Integer)
        JavaRDD<Tuple2<String, Integer>> resultRDD =
                groupedRDD.map(group ->                                      //input is a tuple (String, Iterable<Integer>)
                        new Tuple2<>(group._1,                              // the output key is same as input key
                        StreamSupport.stream(group._2.spliterator(), true)  // converting to stream
                                .reduce(0, (f, s) -> f + s)));              // the sum of counts
        //collecting the RRD so that we can print
        List<Tuple2<String, Integer>> result = resultRDD.collect();
        // print each tuple
        result.forEach(System.out::println);
    }
}
 0
Author: Thamme Gowda,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-04-06 08:31:34