Jak pominąć nagłówek z plików CSV w Spark?

Załóżmy, że daję trzy ścieżki plików do kontekstu Spark do odczytania i każdy plik ma schemat w pierwszym wierszu. Jak możemy pominąć linie schematu z nagłówków?

val rdd=sc.textFile("file1,file2,file3")

Jak możemy pominąć linie nagłówka z tego rdd?

Author: coldspeed, 2015-01-09

13 answers

Gdyby w pierwszym rekordzie była tylko jedna linia nagłówka, to najskuteczniejszym sposobem na jej odfiltrowanie byłoby:

rdd.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}

To nie pomaga, jeśli oczywiście istnieje wiele plików z wieloma liniami nagłówkowymi w środku. Można połączyć trzy RDDs, które można zrobić w ten sposób, rzeczywiście.

Możesz również po prostu napisać filter, która pasuje Tylko do linii, która może być nagłówkiem. Jest to dość proste, ale mniej efektywne.

Odpowiednik Pythona:

from itertools import islice

rdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
)
 52
Author: Sean Owen,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-05-27 11:56:31
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header)   #filter out header
 84
Author: Jimmy,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-08-09 06:44:04

W Spark 2.0 wbudowany jest czytnik CSV, dzięki czemu można łatwo załadować plik CSV w następujący sposób:

spark.read.option("header","true").csv("filePath")
 45
Author: Sandeep Purohit,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-12 01:34:48

Od Spark 2.0 dalej możesz użyć SparkSession, Aby zrobić to jako jeden liner:

val spark = SparkSession.builder.config(conf).getOrCreate()

A potem jak powiedział @SandeepPurohit:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
Mam nadzieję, że to rozwiązało twoje pytanie !

P. S: SparkSession to nowy punkt wejścia wprowadzony w Spark 2.0 i można go znaleźć pod spark_sql pakiet

 11
Author: Shiv4nsh,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-10-24 13:11:57

Można załadować każdy plik osobno, filtrować je za pomocą file.zipWithIndex().filter(_._2 > 0), a następnie połączyć wszystkie pliki RDD.

Jeśli liczba plików jest zbyt duża, Unia może rzucić StackOverflowExeption.

 5
Author: pzecevic,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-11-11 14:29:25

W PySpark możesz użyć ramki danych i ustawić nagłówek jako True:

df = spark.read.csv(dataPath, header=True)
 5
Author: hayj,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-22 19:13:56

Użyj metody filter() w PySpark, filtrując nazwę pierwszej kolumny, aby usunąć nagłówek:

# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)

# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)

# Check your result
for i in filterDD.take(5) : print (i)
 3
Author: kumara81205,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-12 01:35:38

Jest to opcja przekazywana read() komendzie:

context = new org.apache.spark.sql.SQLContext(sc)

var data = context.read.option("header","true").csv("<path>")
 1
Author: Sahan Jayasumana,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-02-16 17:24:56

Alternatywnie możesz użyć pakietu spark-csv (lub w Spark 2.0 jest on mniej lub bardziej dostępny natywnie jako CSV). Zauważ, że to oczekuje nagłówka na każdym Pliku (jak chcesz):

schema = StructType([
        StructField('lat',DoubleType(),True),
        StructField('lng',DoubleType(),True)])

df = sqlContext.read.format('com.databricks.spark.csv'). \
     options(header='true',
             delimiter="\t",
             treatEmptyValuesAsNulls=True,
             mode="DROPMALFORMED").load(input_file,schema=schema)
 0
Author: Adrian Bridgett,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-12 01:33:51

Praca w 2018 (Spark 2.3)

Python

df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

Scala

Val myDf = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

PD1: myManualSchema jest predefiniowanym schematem napisanym przeze mnie, możesz pominąć tę część kodu

 0
Author: Antonio Cachuan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-19 05:31:42

To powinno działać dobrze

def dropHeader(data: RDD[String]): RDD[String] = {

     data.filter(r => r!=data.first)
 }
 -1
Author: Abhishek Galoda,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-11-29 17:03:08
//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{
    case (fileName, stream)=>
        val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
        (fileName, header)
}.collect().toMap

val fileNameHeaderBr = sc.broadcast(fileNameHeader)

// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter =>
    if(iter.hasNext){
        val firstLine = iter.next()
        println(s"Comparing with firstLine $firstLine")
        if(firstLine == fileNameHeaderBr.value.head._2)
            new WrappedIterator(null, iter)
        else
            new WrappedIterator(firstLine, iter)
    }
    else {
        iter
    }
).collect().foreach(println)

class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
    var isFirstIteration = true
    override def hasNext: Boolean = {
        if (isFirstIteration && firstLine != null){
            true
        }
        else{
            iter.hasNext
        }
    }

    override def next(): String = {
        if (isFirstIteration){
            println(s"For the first time $firstLine")
            isFirstIteration = false
            if (firstLine != null){
                firstLine
            }
            else{
                println(s"Every time $firstLine")
                iter.next()
            }
        }
        else {
          iter.next()
        }
    }
}
 -1
Author: RockSolid,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-12 01:38:17

Dla programistów Pythona. Testowałem z spark2. 0. Załóżmy, że chcesz usunąć pierwsze 14 wierszy.

sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

WithColumn jest funkcją df. Więc poniżej nie będzie działać w stylu RDD jak powyżej.

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
 -1
Author: kartik,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-16 03:08:27