Aktualizacja kolumny ramki danych w spark
Patrząc na nowe API Spark dataframe, nie jest jasne, czy możliwe jest modyfikowanie kolumn dataframe.
Jak zmienić wartość w wierszu x
kolumna y
ramki danych?
W pandas
to będzie df.ix[x,y] = new_value
Edit: konsolidując to, co zostało powiedziane poniżej, nie możesz modyfikować istniejącej ramki danych, ponieważ jest ona niezmienna, ale możesz zwrócić nową ramkę danych z żądanymi modyfikacjami.
Jeśli chcesz tylko zastąpić wartość w kolumnie opartej na warunek, np. np.where
:
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
Jeśli chcesz wykonać jakąś operację na kolumnie i utworzyć nową kolumnę, która zostanie dodana do ramki danych:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))
Jeśli chcesz, aby nowa kolumna miała taką samą nazwę jak stara kolumna, możesz dodać dodatkowy krok:
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
4 answers
Chociaż nie można modyfikować kolumny jako takiej, można operować na kolumnie i zwracać nową ramkę danych odzwierciedlającą tę zmianę. W tym celu należy najpierw utworzyć UserDefinedFunction
implementującą operację do zastosowania, a następnie selektywnie zastosować tę funkcję tylko do docelowej kolumny. W Pythonie:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
teraz ma taki sam schemat jak old_df
(zakładając, że {[4] } był również typu StringType
), ale wszystkie wartości w kolumnie target_column
będą new_value
.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-02-21 22:02:49
Często podczas aktualizacji kolumny chcemy zmapować starą wartość do nowej wartości. Oto sposób na to, aby zrobić to w pyspark bez UDF:
# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
F.when(df[update_col]==old_value,new_value).
otherwise(df[update_col])).
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-12-21 22:23:26
DataFrames
są oparte na RDD. RDD są strukturami niezmiennymi i nie pozwalają na aktualizację elementów na miejscu. Aby zmienić wartości, musisz utworzyć nową ramkę danych, przekształcając oryginalną za pomocą podobnych do SQL operacji DSL lub RDD, takich jak map
.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-24 21:56:18
Tak jak maasg mówi, że można utworzyć nowy DataFrame na podstawie mapy zastosowanej do starego DataFrame. Przykład dla danego DataFrame df
z dwoma wierszami:
val newDf = sqlContext.createDataFrame(df.map(row =>
Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
Zauważ, że jeśli typy kolumn się zmieniają, musisz podać prawidłowy schemat zamiast df.schema
. Sprawdź api org.apache.spark.sql.Row
dla dostępnych metod: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
[Update] lub używając UDFs w Scali:
import org.apache.spark.sql.functions._
val toLong = udf[Long, String] (_.toLong)
val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
Oraz jeśli nazwa kolumny musi pozostać taka sama, możesz zmienić jej nazwę z powrotem:
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 11:33:15