Jak napisać do CSV w Spark
Próbuję znaleźć skuteczny sposób na zapisanie wyniku mojej pracy Spark jako pliku csv. Używam Sparka z Hadoop i do tej pory wszystkie moje pliki są zapisywane jako part-00000
.
Jakieś pomysły, jak sprawić, by moje zapisywanie spark do pliku o określonej nazwie pliku?
6 answers
Ponieważ Spark używa API systemu plików Hadoop do zapisu danych do plików, jest to nieuniknione. If you do
rdd.saveAsTextFile("foo")
Zostanie zapisany jako "foo/part-XXXXX
" z jedną częścią - * pliku każdej partycji w RDD, którą próbujesz zapisać. Powodem, dla którego każda partycja w RDD jest zapisywana jako oddzielny plik, jest tolerancja błędów. Jeśli zadanie zapisujące trzecią partycję (np. do part-00002
) nie powiedzie się, Spark po prostu uruchamia ponownie zadanie i zastępuje częściowo napisane/uszkodzone part-00002
, bez wpływu na inne części. Jeśli wszystkie napisał do tego samego pliku, to jest o wiele trudniej odzyskać pojedyncze zadanie dla awarii.
Pliki part-XXXXX
zwykle nie stanowią problemu, jeśli zamierzasz je ponownie wykorzystać w frameworkach opartych na Spark / Hadoop, ponieważ ponieważ wszystkie używają interfejsu HDFS API, jeśli poprosisz je o przeczytanie "foo", wszystkie będą również czytać wszystkie part-XXXXX
pliki wewnątrz foo.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-15 13:29:16
Proponuję zrobić to w ten sposób (przykład Javy):
theRddToPrint.coalesce(1, true).saveAsTextFile(textFileName);
FileSystem fs = anyUtilClass.getHadoopFileSystem(rootFolder);
FileUtil.copyMerge(
fs, new Path(textFileName),
fs, new Path(textFileNameDestiny),
true, fs.getConf(), null);
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-05 04:19:13
Istnieje inne podejście oparte na systemie plików Hadoop Ops.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-02 10:49:23
Rozszerzenie Tathagata Das odpowiedź na Spark 2.x i Scala 2.11
Używając Spark SQL możemy to zrobić w jednym linerze
//implicits for magic functions like .toDf
import spark.implicits._
val df = Seq(
("first", 2.0),
("choose", 7.0),
("test", 1.5)
).toDF("name", "vals")
//write DataFrame/DataSet to external storage
df.write
.format("csv")
.save("csv/file/location")
Następnie możesz iść do głowy i kontynuować adoalonso 's odpowiedź.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-09-05 11:46:45
Mam pomysł, ale nie gotowy fragment kodu. Wewnętrznie (jak sama nazwa wskazuje) Spark używa formatu wyjściowego Hadoop. (jak również InputFormat
podczas odczytu z HDFS).
W hadoop ' s FileOutputFormat
znajduje się protected member setOutputFormat
, który można wywołać z odziedziczonej klasy, aby ustawić inną nazwę bazową.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-15 13:28:07
To nie jest tak naprawdę czyste rozwiązanie, ale wewnątrz foreachRDD
() możesz w zasadzie robić co chcesz, także tworzyć nowy plik.
W moim rozwiązaniu robię to: zapisuję wyjście na HDFS( ze względu na tolerancję błędów), a wewnątrz foreachRDD
tworzę również plik TSV ze statystykami w folderze lokalnym.
Http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-15 13:26:04