Spark / Scala: forward fill z ostatnią obserwacją

Używanie Spark 1.4.0, Scala 2.10

Próbowałem znaleźć sposób, aby wypełnić wartości null ostatnią znaną obserwacją, ale nie widzę łatwego sposobu. Myślę, że jest to dość powszechna rzecz do zrobienia, ale nie mogę znaleźć przykładu pokazującego, jak to zrobić.

Widzę funkcje, które wypełniają NaN wartością, lub funkcje lag / lead, które wypełniają lub przesuwają dane przez przesunięcie, ale nic nie podnosi ostatniej znanej wartości.

Patrząc w Internecie, widzę wiele pytań na temat to samo w R, ale nie w Spark / Scali.

Myślałem o mapowaniu w zakresie dat, filtrowaniu Nan z wyników i wybraniu ostatniego elementu, ale chyba jestem zdezorientowany co do składni.

Używając DataFrames staram się coś w stylu

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)
Ale to do niczego nie prowadzi.

Część filtra nie działa; funkcja map zwraca sekwencję spark.sql.Kolumny, ale funkcja filter spodziewa się zwrócić Boolean, więc muszę uzyskać wartość z Kolumny do testowania, ale wydaje się, że są tylko metody kolumn, które zwracają kolumnę.

Czy Jest jakiś sposób, aby zrobić to bardziej "po prostu" na Spark?

Thanks for your input

EDIT:

Prosty przykład Przykładowe wejście:

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...

Oczekiwany wynik:

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22

Uwaga: 1) mam wiele kolumn, z których wiele ma ten brakujący wzorzec danych, ale nie w tym samym dniu / czasie. W razie potrzeby zrobię transformację jednej kolumny na czas.

EDIT :

Po odpowiedzi @zero323 próbowałem w ten sposób:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd


    def notMissing(row: Row): Boolean = { !row.isNullAt(1) }

    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
   case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap

    val toCarryBd = sc.broadcast(toCarry)

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }

    val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}

Zmienna broadcast kończy się jako lista wartości bez null. To jest postęp, ale nadal nie mogę uruchomić mapowania. ale nic nie dostaję, ponieważ indeks i w nie mapuje do oryginalnych danych, mapuje do podzbioru bez null.

Co mi umyka?

Edycja i rozwiązanie (jak wynika z odpowiedzi @zero323):

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))

Zobacz odpowiedź zero323 poniżej, aby uzyskać więcej opcji, jeśli używasz RDD zamiast ramek danych. Powyższe rozwiązanie może nie jest najskuteczniejsze, ale działa dla mnie. Jeśli chcesz zoptymalizować, sprawdź rozwiązanie RDD.

Author: MrE, 2015-11-10

1 answers

Odpowiedź początkowa (założenie pojedynczego szeregu czasowego):

Przede wszystkim unikaj funkcji okien, jeśli nie możesz podać klauzuli PARTITION BY. Przenosi dane na pojedynczą partycję, więc w większości przypadków jest to po prostu niewykonalne.

To, co możesz zrobić, to wypełnić luki na RDD za pomocą mapPartitionsWithIndex. Ponieważ nie podałeś przykładowych danych lub oczekiwanego wyjścia, Uznaj to za pseudokod, a nie prawdziwy program Scala: {]}

  • Najpierw zamów DataFrame według daty i przekonwertuj na RDD

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    val rows: RDD[Row] = df.orderBy($"Date").rdd
    
  • Next lets find the last not null observation per partition

    def notMissing(row: Row): Boolean = ???
    
    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
      .mapPartitionsWithIndex{ case (i, iter) => 
        Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
      .collectAsMap
    
  • I przekonwertować to Map na broadcast

    val toCarryBd = sc.broadcast(toCarry)
    
  • Wreszcie Mapa nad partycjami po raz kolejny wypełniając luki:

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
      // If it is the beginning of partition and value is missing
      // extract value to fill from toCarryBd.value
      // Remember to correct for empty / only missing partitions
      // otherwise take last not-null from the current partition
    }
    
    val imputed: RDD[Row] = rows
      .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) } 
    
  • W końcu przekonwertować z powrotem do DataFrame

Edit (partycjonowane / szeregi czasowe na dane grupy):

Diabeł tkwi w szczegółach. Jeśli Twoje dane są jednak partycjonowane, to cała problem można rozwiązać za pomocą groupBy. Przyjmijmy, że po prostu podzielenie przez kolumnę " v " typu T i Date jest liczbą całkowitą znacznika czasu:
def fill(iter: List[Row]): List[Row] = {
  // Just go row by row and fill with last non-empty value
  ???
}

val groupedAndSorted = df.rdd
  .groupBy(_.getAs[T]("k"))
  .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))

val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)

val dfFilled = sqlContext.createDataFrame(rows, df.schema)

W ten sposób możesz wypełnić wszystkie kolumny w tym samym czasie.

Czy można to zrobić z DataFrames zamiast konwertować tam iz powrotem do RDD?

To zależy, chociaż jest mało prawdopodobne, aby być skutecznym. Jeśli maksymalna przerwa jest stosunkowo mała, możesz zrobić coś takiego:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column

val maxGap: Int = ???  // Maximum gap between observations
val columnsToFill: List[String] = ???  // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed 

// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
  // Generate lag values between 1 and maxGap
  val lags = (1 to maxGap).map(lag(col(c), _)over(w))
  // Add current, coalesce and set alias
  coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}


// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))


// Finally select
val dfImputed = df.select($"*" :: lags: _*)

Można go łatwo dostosować do użyj innej maksymalnej szczeliny na kolumnę.

Prostszym sposobem osiągnięcia podobnego wyniku w najnowszej wersji Spark jest użycie last z ignoreNulls:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"k").orderBy($"Date")
  .rowsBetween(Window.unboundedPreceding, -1)

df.withColumn("value", coalesce($"value", last($"value", true).over(w)))

Chociaż możliwe jest porzucenie klauzuli partitionBy i zastosowanie tej metody globalnie, byłoby to zbyt kosztowne przy dużych zbiorach danych.

 15
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-08-08 15:24:53