Jak wyeksportować dane z Spark SQL do CSV
To polecenie działa z HiveQL:
insert overwrite directory '/data/home.csv' select * from testtable;
Ale ze Spark SQL dostaję błąd ze śladem stosu org.apache.spark.sql.hive.HiveQl
:
java.lang.RuntimeException: Unsupported language features in query:
insert overwrite directory '/data/home.csv' select * from testtable
Proszę poprowadź mnie do napisania funkcji export to CSV w Spark SQL.
6 answers
Możesz użyć poniższej instrukcji, aby zapisać zawartość dataframe w formacie CSV
df.write.csv("/data/home/csv")
Jeśli chcesz zapisać cały dataframe do jednego pliku CSV, użyj
df.coalesce(1).write.csv("/data/home/sample.csv")
Dla iskry 1.x, możesz użyć spark-csv aby zapisać wyniki do plików CSV
Poniżej Scala snippet pomoże
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")
Aby zapisać zawartość do jednego pliku
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-29 09:26:46
Od Spark 2.X
spark-csv
jest zintegrowany jako natywne źródło danych . Dlatego konieczne polecenie upraszcza do (windows)
df.write
.option("header", "true")
.csv("file:///C:/out.csv")
Lub UNIX
df.write
.option("header", "true")
.csv("/var/out.csv")
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-09 09:10:32
Powyższa odpowiedź w spark-csv jest poprawna, ale jest problem-biblioteka tworzy kilka plików w oparciu o partycjonowanie ramki danych. A to nie jest to, czego zwykle potrzebujemy. Możesz więc połączyć wszystkie partycje w jedną:
df.coalesce(1).
write.
format("com.databricks.spark.csv").
option("header", "true").
save("myfile.csv")
I zmienić nazwę wyjścia lib (nazwa "part-00000") na pożądaną nazwę pliku.
Ten wpis na blogu zawiera więcej szczegółów: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-07 22:54:41
Najprostszym sposobem jest mapowanie na RDD ramki danych i użycie mkstringu:
df.rdd.map(x=>x.mkString(","))
Od Spark 1.5 (a nawet wcześniej)
df.map(r=>r.mkString(","))
zrobiłby to samo
jeśli chcesz ucieczki CSV możesz użyć Apache commons lang do tego. np. oto kod, którego używamy
def DfToTextFile(path: String,
df: DataFrame,
delimiter: String = ",",
csvEscape: Boolean = true,
partitions: Int = 1,
compress: Boolean = true,
header: Option[String] = None,
maxColumnLength: Option[Int] = None) = {
def trimColumnLength(c: String) = {
val col = maxColumnLength match {
case None => c
case Some(len: Int) => c.take(len)
}
if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
}
def rowToString(r: Row) = {
val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
st.split("~-~").map(trimColumnLength).mkString(delimiter)
}
def addHeader(r: RDD[String]) = {
val rdd = for (h <- header;
if partitions == 1; //headers only supported for single partitions
tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
rdd.getOrElse(r)
}
val rdd = df.map(rowToString).repartition(partitions)
val headerRdd = addHeader(rdd)
if (compress)
headerRdd.saveAsTextFile(path, classOf[GzipCodec])
else
headerRdd.saveAsTextFile(path)
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-06-16 18:42:16
Komunikat o błędzie sugeruje, że nie jest to obsługiwana funkcja w języku zapytań. Ale możesz zapisać ramkę danych w dowolnym formacie, jak zwykle poprzez interfejs RDD (df.rdd.saveAsTextFile
). Lub możesz sprawdzić https://github.com/databricks/spark-csv .
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-08-11 10:45:04
Za pomocą spark-csv możemy zapisać plik CSV.
val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-01-15 15:41:19