Jak wyeksportować dane z Spark SQL do CSV

To polecenie działa z HiveQL:

insert overwrite directory '/data/home.csv' select * from testtable;

Ale ze Spark SQL dostaję błąd ze śladem stosu org.apache.spark.sql.hive.HiveQl:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable

Proszę poprowadź mnie do napisania funkcji export to CSV w Spark SQL.

Author: Daniel Darabos, 2015-08-11

6 answers

Możesz użyć poniższej instrukcji, aby zapisać zawartość dataframe w formacie CSV df.write.csv("/data/home/csv")

Jeśli chcesz zapisać cały dataframe do jednego pliku CSV, użyj df.coalesce(1).write.csv("/data/home/sample.csv")

Dla iskry 1.x, możesz użyć spark-csv aby zapisać wyniki do plików CSV

Poniżej Scala snippet pomoże

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")

Aby zapisać zawartość do jednego pliku

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
 60
Author: sag,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-29 09:26:46

Od Spark 2.X spark-csv jest zintegrowany jako natywne źródło danych . Dlatego konieczne polecenie upraszcza do (windows)

df.write
  .option("header", "true")
  .csv("file:///C:/out.csv")

Lub UNIX

df.write
  .option("header", "true")
  .csv("/var/out.csv")
 32
Author: Boern,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-09 09:10:32

Powyższa odpowiedź w spark-csv jest poprawna, ale jest problem-biblioteka tworzy kilka plików w oparciu o partycjonowanie ramki danych. A to nie jest to, czego zwykle potrzebujemy. Możesz więc połączyć wszystkie partycje w jedną:

df.coalesce(1).
    write.
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")

I zmienić nazwę wyjścia lib (nazwa "part-00000") na pożądaną nazwę pliku.

Ten wpis na blogu zawiera więcej szczegółów: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

 22
Author: Dmitry Petrov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-07 22:54:41

Najprostszym sposobem jest mapowanie na RDD ramki danych i użycie mkstringu:

  df.rdd.map(x=>x.mkString(","))

Od Spark 1.5 (a nawet wcześniej) df.map(r=>r.mkString(",")) zrobiłby to samo jeśli chcesz ucieczki CSV możesz użyć Apache commons lang do tego. np. oto kod, którego używamy

 def DfToTextFile(path: String,
                   df: DataFrame,
                   delimiter: String = ",",
                   csvEscape: Boolean = true,
                   partitions: Int = 1,
                   compress: Boolean = true,
                   header: Option[String] = None,
                   maxColumnLength: Option[Int] = None) = {

    def trimColumnLength(c: String) = {
      val col = maxColumnLength match {
        case None => c
        case Some(len: Int) => c.take(len)
      }
      if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
    }
    def rowToString(r: Row) = {
      val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
      st.split("~-~").map(trimColumnLength).mkString(delimiter)
    }

    def addHeader(r: RDD[String]) = {
      val rdd = for (h <- header;
                     if partitions == 1; //headers only supported for single partitions
                     tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
      rdd.getOrElse(r)
    }

    val rdd = df.map(rowToString).repartition(partitions)
    val headerRdd = addHeader(rdd)

    if (compress)
      headerRdd.saveAsTextFile(path, classOf[GzipCodec])
    else
      headerRdd.saveAsTextFile(path)
  }
 9
Author: Arnon Rotem-Gal-Oz,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-06-16 18:42:16

Komunikat o błędzie sugeruje, że nie jest to obsługiwana funkcja w języku zapytań. Ale możesz zapisać ramkę danych w dowolnym formacie, jak zwykle poprzez interfejs RDD (df.rdd.saveAsTextFile). Lub możesz sprawdzić https://github.com/databricks/spark-csv .

 1
Author: Daniel Darabos,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-08-11 10:45:04

Za pomocą spark-csv możemy zapisać plik CSV.

val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
 1
Author: Uva Prakash P,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-01-15 15:41:19