Apache Spark: dzielenie pary RDD na wiele RDD za pomocą klucza, aby zapisać wartości

Używam Spark 1.0.1 do przetwarzania dużej ilości danych. Każdy wiersz zawiera numer ID, niektóre z duplikatami identyfikatorów. Chcę zapisać wszystkie wiersze z tym samym numerem ID w tej samej lokalizacji, ale mam problemy z robieniem tego skutecznie. Tworzę RDD [(String, String)] par (ID number, data row):

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)} 

Sposobem, który działa, ale nie jest wydajny, jest zbieranie numerów ID, filtrowanie RDD dla każdego ID i zapisywanie RDD wartości o tym samym ID jako tekst plik.

val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
    val dataRows = mapRdd.filter(_._1 == id).values
    dataRows.saveAsTextFile(id)
})

Próbowałem również groupByKey lub reduceByKey, aby każda krotka w RDD zawierała unikalny numer ID jako klucz i ciąg połączonych wierszy danych oddzielonych nowymi liniami dla tego numeru ID. Chcę iterować przez RDD tylko raz używając foreach do zapisania danych, ale nie może podać wartości jako RDD

groupedRdd.foreach({ tup =>
  val data = sc.parallelize(List(tup._2)) //nested RDD does not work
  data.saveAsTextFile(tup._1)
})

Zasadniczo chcę podzielić RDD na wiele RDD przez numer ID i zapisać wartości dla tego numeru ID w ich własnej lokalizacji.

Author: smli, 2014-07-30

3 answers

Myślę, że ten problem jest podobny do zapis do wielu wyjść za pomocą klucza Spark-jedno zadanie Spark

Proszę skierować tam odpowiedź.

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

Właśnie widziałem podobną odpowiedź powyżej, ale w rzeczywistości nie potrzebujemy niestandardowych partycji. MultipleTextOutputFormat utworzy plik dla każdego klucza. Jest ok, że wiele rekordów z tymi samymi kluczami przypada na tę samą partycję.

New HashPartitioner (num), gdzie num jest wybranym numerem partycji. W przypadku, gdy masz duży liczba różnych kluczy, możesz ustawić numer na duży. W takim przypadku każda partycja nie otworzy zbyt wielu programów obsługi plików hdfs.

 13
Author: zhang zhan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 11:46:16

Możesz bezpośrednio wywołać saveAsTextFile na grupowanym RDD, tutaj zapisze dane na partycjach, to znaczy, jeśli masz 4 różne partycje i podałeś liczbę partycji groupedRDD jako 4, to spark przechowuje każdą partycję w jednym pliku (dzięki czemu możesz mieć tylko jeden identyfikator filepera) u możesz nawet zobaczyć dane jako iteraby każdego identyfikatora w systemie plików.

 0
Author: napster,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-08-11 12:27:14

To zapisze dane na ID użytkownika

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1),
x)}.groupByKey(numPartitions).saveAsObjectFile("file")

Jeśli chcesz odzyskać dane ponownie na podstawie identyfikatora użytkownika, możesz zrobić coś w stylu

val userIdLookupTable = sc.objectFile("file").cache() //could use persist() if data is to big for memory  
val data = userIdLookupTable.lookup(id) //note this returns a sequence, in this case you can just get the first one  

Zauważ, że nie ma szczególnego powodu, aby zapisać do pliku w tym przypadku zrobiłem to, ponieważ op poprosił o to, że mówiąc zapisanie do pliku pozwala załadować RDD w dowolnym momencie po początkowym grupowaniu zostało zrobione.

Ostatnia rzecz, lookup jest szybsza niż filtrowanie dostępu do identyfikatorów, ale jeśli chcesz aby wyłączyć pull request od spark możesz sprawdzić ta odpowiedź dla szybszego podejścia

 0
Author: aaronman,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 11:53:59