Aktualizacja kolumny ramki danych w spark

Patrząc na nowe API Spark dataframe, nie jest jasne, czy możliwe jest modyfikowanie kolumn dataframe.

Jak zmienić wartość w wierszu x kolumna y ramki danych?

W pandas to będzie df.ix[x,y] = new_value

Edit: konsolidując to, co zostało powiedziane poniżej, nie możesz modyfikować istniejącej ramki danych, ponieważ jest ona niezmienna, ale możesz zwrócić nową ramkę danych z żądanymi modyfikacjami.

Jeśli chcesz tylko zastąpić wartość w kolumnie opartej na warunek, np. np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

Jeśli chcesz wykonać jakąś operację na kolumnie i utworzyć nową kolumnę, która zostanie dodana do ramki danych:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

Jeśli chcesz, aby nowa kolumna miała taką samą nazwę jak stara kolumna, możesz dodać dodatkowy krok:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
Author: Luke, 2015-03-18

4 answers

Chociaż nie można modyfikować kolumny jako takiej, można operować na kolumnie i zwracać nową ramkę danych odzwierciedlającą tę zmianę. W tym celu należy najpierw utworzyć UserDefinedFunction implementującą operację do zastosowania, a następnie selektywnie zastosować tę funkcję tylko do docelowej kolumny. W Pythonie:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df teraz ma taki sam schemat jak old_df (zakładając, że {[4] } był również typu StringType), ale wszystkie wartości w kolumnie target_column będą new_value.

 52
Author: karlson,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-02-21 22:02:49

Często podczas aktualizacji kolumny chcemy zmapować starą wartość do nowej wartości. Oto sposób na to, aby zrobić to w pyspark bez UDF:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).
 29
Author: Paul,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-12-21 22:23:26

DataFrames są oparte na RDD. RDD są strukturami niezmiennymi i nie pozwalają na aktualizację elementów na miejscu. Aby zmienić wartości, musisz utworzyć nową ramkę danych, przekształcając oryginalną za pomocą podobnych do SQL operacji DSL lub RDD, takich jak map.

[[2]} wysoce zalecana talia slajdów: [[5]} Wprowadzenie ramek danych w Spark do wielkoskalowej Nauki o danych .
 12
Author: maasg,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-24 21:56:18

Tak jak maasg mówi, że można utworzyć nowy DataFrame na podstawie mapy zastosowanej do starego DataFrame. Przykład dla danego DataFrame df z dwoma wierszami:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Zauważ, że jeśli typy kolumn się zmieniają, musisz podać prawidłowy schemat zamiast df.schema. Sprawdź api org.apache.spark.sql.Row dla dostępnych metod: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Update] lub używając UDFs w Scali:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

Oraz jeśli nazwa kolumny musi pozostać taka sama, możesz zmienić jej nazwę z powrotem:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
 11
Author: radek1st,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 11:33:15