Załaduj plik CSV za pomocą Spark

Jestem nowy w Spark i próbuję odczytać dane CSV z pliku z Spark. Oto co robię:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

Spodziewałbym się, że to wywołanie da mi listę dwóch pierwszych kolumn mojego pliku, ale dostaję ten błąd:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

Chociaż mój plik CSV jako więcej niż jedna kolumna.

Author: Kernael, 2015-02-28

10 answers

Czy jesteś pewien, żewszystkie linie mają co najmniej 2 kolumny? Możesz spróbować czegoś takiego, żeby sprawdzić?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

Alternatywnie możesz wydrukować winowajcę (jeśli istnieje):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()
 45
Author: G Quintana,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-30 18:25:57

Iskra 2.0.0+

Możesz użyć wbudowanego źródła danych csv bezpośrednio:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

Lub

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

Bez uwzględniania zewnętrznych zależności.

Iskra :

Zamiast ręcznego parsowania, które nie jest trywialne w ogólnym przypadku, polecam spark-csv:

Upewnij się, że Spark CSV jest zawarty w ścieżce (--packages, --jars, --driver-class-path)

I załaduj swoje dane w następujący sposób:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

Może obsługiwać wczytywanie, wnioskowanie schematu, upuszczanie zniekształconych linii i nie wymaga przekazywania danych z Pythona do JVM.

Uwaga:

Jeśli znasz schemat, lepiej unikać wnioskowania schematu i przekazać go do DataFrameReader. Zakładając, że masz trzy kolumny-integer, double i string:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))
 116
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-08-30 08:33:21

Po prostu dzielenie przecinkami również podzieli przecinki znajdujące się w polach (np. a,b,"1,2,3",c), więc nie jest to zalecane. odpowiedź zero323 jest dobra, jeśli chcesz użyć API DataFrames, ale jeśli chcesz trzymać się base Spark, możesz parsować CSV w Pythonie base za pomocą modułu csv :

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDIT: jak @muon wspomniał w komentarzach, będzie to traktować nagłówek jak każdy inny wiersz, więc będziesz musiał wyodrębnić go ręcznie. Na przykład, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (Upewnij się, aby nie modyfikować header przed filtr ocenia). Ale w tym momencie prawdopodobnie lepiej będzie użyć wbudowanego parsera csv.

 10
Author: Galen Long,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:34:37

I jeszcze jedna opcja polegająca na odczytaniu pliku CSV za pomocą pandy, a następnie zaimportowaniu ramki danych pandy do Spark.

Na przykład:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
 9
Author: JP Mercier,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-11-14 00:39:52
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|");

print(df.collect())
 9
Author: y durga prasad,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-30 19:09:48

Jest to zgodne z tym, co JP Mercier początkowo zasugerował o używaniu pand, ale z dużą modyfikacją: jeśli czytasz dane do pand w kawałkach, powinno to być bardziej plastyczne. Oznacza to, że można przetworzyć znacznie większy plik niż Pandy mogą faktycznie obsługiwać jako pojedynczy kawałek i przekazać go do Spark w mniejszych rozmiarach. (To również odpowiada na komentarz o tym, dlaczego ktoś chciałby używać Spark, jeśli może załadować wszystko do pand i tak.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
 3
Author: abby sobh,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:03:05

Teraz jest jeszcze jedna opcja dla dowolnego ogólnego pliku csv: https://github.com/seahboonsiew/pyspark-csv następująco:

Załóżmy, że mamy następujący kontekst

sc = SparkContext
sqlCtx = SQLContext or HiveContext

Po pierwsze, rozpowszechniaj pyspark-csv.py to executors using SparkContext

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

Odczyt danych csv za pomocą SparkContext i konwersja ich na DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
 3
Author: optimist,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-08-01 09:09:29

Jeśli Twoje dane csv nie zawierają nowych linii w żadnym z pól, możesz załadować dane za pomocą textFile() i przetworzyć je

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)
 2
Author: iec2011007,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-11-23 05:02:48

Jeśli chcesz załadować plik csv jako ramkę danych, możesz wykonać następujące czynności:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file
Dla mnie zadziałało.
 1
Author: Jeril,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-09 10:09:02
import pandas as pd

data1 = pd.read_csv("test1.csv")
data2 = pd.read_csv("train1.csv")
 -4
Author: hey kay,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-07-31 18:01:43