Zadanie nie serializowalne: java. io. NotSerializableException podczas wywoływania funkcji poza zamknięciem tylko na klasach nie obiektach

Dziwne zachowanie podczas wywoływania funkcji poza Zamknięciem:

  • Gdy funkcja jest w obiekcie wszystko działa
  • Gdy funkcja jest w klasie get:

Zadanie nie serializowalne: java. io. NotSerializableException: testing

Problem w tym, że potrzebuję kodu w klasie, A Nie obiektu. Wiesz, dlaczego tak się dzieje? Czy obiekt Scala jest serializowany (domyślnie?)?

To jest przykład kodu roboczego:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

To jest niedziałającym przykładem:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}
Author: Derlin, 2014-03-23

6 answers

Nie sądzę, aby druga odpowiedź była całkowicie poprawna. RDD są rzeczywiście serializowalne , więc to nie jest to, co powoduje niepowodzenie twojego zadania.

Spark jest rozproszonym silnikiem obliczeniowym, a jego główną abstrakcją jest odporny rozproszony zbiór danych (RDD), który może być postrzegany jako rozproszony zbiór. Zasadniczo elementy RDD są podzielone na węzły klastra, ale Spark abstrakuje to z dala od użytkownika, umożliwiając użytkownikowi interakcję z RDD (kolekcja) jakby była lokalna.

Nie wdawać się w zbyt wiele szczegółów, ale kiedy uruchamiasz różne transformacje na RDD (map, flatMap, filter i inni), Twój kod transformacji (zamknięcia) to:

  1. serialized on the driver node,
  2. wysłane do odpowiednich węzłów w klastrze,
  3. deserialized,
  4. i wreszcie wykonywane na węzłach

Można oczywiście uruchomić to lokalnie( jak w przykładzie), ale wszystkie te fazy (oprócz wysyłki przez sieć) nadal występują. [Pozwala to wykryć wszelkie błędy nawet przed wdrożeniem do produkcji]

W drugim przypadku wywołujesz metodę zdefiniowaną w klasie testing z wewnątrz funkcji map. Spark widzi to, a ponieważ metody nie mogą być serializowane samodzielnie, Spark próbuje serializować całość testing klasy, tak aby Kod nadal działał po wykonaniu w innym JVM. Masz dwie możliwości:

Albo zrobisz testowanie klasy serializowalne, więc cała klasa może być serializowana przez Spark: {]}

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

Lub tworzysz someFunc funkcję zamiast metody( funkcje są obiektami w Scali), dzięki czemu Spark będzie mógł ją serializować:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

Podobny, ale nie ten sam problem z serializacją klas może Cię zainteresować i możesz o tym przeczytać w tej prezentacji Spark Summit 2013 .

Na marginesie, możesz przepisać rddList.map(someFunc(_)) na rddList.map(someFunc), są dokładnie takie same. Zwykle, drugi jest preferowany, ponieważ jest mniej gadatliwy i czystszy do czytania.

EDIT (2015-03-15): SPARK-5307 wprowadził SerializationDebugger i Spark 1.3.0 jest pierwszą wersją, która go używa. Dodaje ścieżkę serializacji do NotSerializableException . Po napotkaniu wyjątku NotSerializableException debugger odwiedza Wykres obiektu, aby znaleźć ścieżkę do obiektu, który nie może być serializowany, i konstruuje informacje, aby pomóc użytkownikowi znaleźć obiekt.

W przypadku OP, to jest to, co jest drukowane na stdout:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)
 256
Author: Grega Kešpret,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-03-15 17:19:51

Odpowiedź Grega jest świetna w wyjaśnianiu, dlaczego oryginalny kod nie działa i na dwa sposoby rozwiązania problemu. Jednak to rozwiązanie nie jest zbyt elastyczne; rozważ przypadek, w którym Twoje zamknięcie zawiera wywołanie metody na klasie nie - Serializable, nad którą nie masz kontroli. Nie można dodać znacznika Serializable do tej klasy ani zmienić implementacji bazowej, aby zmienić metodę w funkcję.

Nilesh przedstawia świetne obejście tego problemu, ale rozwiązaniem może być bardziej zwięzłe i ogólne:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

Ta funkcja-serializer może być następnie używana do automatycznego zawijania zamknięć i wywołań metod:

rdd map genMapper(someFunc)
Ta technika ma również tę zaletę, że nie wymaga dodatkowych zależności od rekina, aby uzyskać dostęp do[4]}, ponieważ chłód Twittera jest już wciągnięty przez core Spark [8]}
 29
Author: Ben Sidhom,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:26:36

Complete talk w pełni wyjaśniające problem, który proponuje świetny sposób zmiany paradygmatu, aby uniknąć problemów z serializacją: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md

Najczęściej głosowana odpowiedź w zasadzie sugeruje wyrzucenie całej funkcji języka - czyli nie używania już metod, a tylko funkcji. Rzeczywiście w programowaniu funkcyjnym metody w klasach należy unikać, ale przekształcenie ich w funkcje nie rozwiązuje tutaj problemu projektowego(patrz powyższy link).

Jako szybkie rozwiązanie w tej konkretnej sytuacji możesz po prostu użyć adnotacji @transient, aby powiedzieć jej, aby nie próbowała serializować obrażającej wartości (tutaj Spark.ctx jest niestandardową klasą, A nie klasą Spark po nazwie OP):

@transient
val rddList = Spark.ctx.parallelize(list)

Możesz również zrestrukturyzować kod tak, aby rddlist mieszkał gdzie indziej, ale to też jest paskudne.

Przyszłość to prawdopodobnie zarodniki

W przyszłości Scala będzie zawierać te rzeczy zwane "zarodnikami", które powinny pozwolić nam drobnoziarniste kontrolowanie tego, co robi, a nie dokładnie zostaje wciągnięte przez zamknięcie. Co więcej, powinno to zmienić wszystkie błędy przypadkowego ściągania typów nieserializowalnych (lub niechcianych wartości) w błędy kompilacji, a nie teraz, co jest strasznym wyjątkami / wyciekami pamięci.

Http://docs.scala-lang.org/sips/pending/spores.html

Wskazówka na temat serializacji Kryo

Używając kyro, zrób to tak, aby Rejestracja jest konieczna, oznacza to, że zamiast wycieków pamięci pojawią się Błędy:

W końcu wiem, że kryo ma kryo.setRegistrationOptional (true), ale mam bardzo trudny czas, próbując dowiedzieć się, jak z niego korzystać. Kiedy ta opcja jest włączona, kryo nadal wydaje się wyrzucać wyjątki, jeśli nie zarejestrowałem klas."

Strategia rejestracji zajęć z kryo

Oczywiście daje to tylko kontrolę na poziomie typu, a nie na poziomie wartości Kontrola.

... więcej pomysłów na przyszłość.

 22
Author: samthebest,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:02:48

Rozwiązałem ten problem używając innego podejścia. Po prostu musisz serializować obiekty przed przejściem przez zamknięcie, a następnie usunąć serializację. To podejście po prostu działa, nawet jeśli twoje zajęcia nie są Serializowalne, ponieważ wykorzystuje Kryo za kulisami. Wszystko czego potrzebujesz to curry. ;)

Oto przykład jak to zrobiłem:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

Możesz zrobić Bla tak skomplikowane, jak chcesz, Klasa, obiekt towarzyszący, zagnieżdżone klasy, odniesienia do wielu stron trzecich libs.

KryoSerializationWrapper odnosi się do: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

 6
Author: Nilesh,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-04-14 06:54:59

Nie jestem do końca pewien, czy dotyczy to Scali, ale w Javie rozwiązałem NotSerializableException poprzez refaktoryzację kodu tak, aby zamknięcie nie uzyskało dostępu do niezwiązanego z serializowalnym polem final.

 6
Author: Trebor Rude,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-10-13 18:14:02

Miałem do czynienia z podobnym problemem, a to, co rozumiem z odpowiedzi Grega to

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

Twoja metodadoIT próbuje serializowaćmetodę someFunc (_) , ale ponieważ metody nie są serializowalne, próbuje serializować klasętesting , która ponownie nie jest serializowalna.

Aby Twój kod działał, powinieneś zdefiniować someFuncwewnątrz metody doIT. Na przykład:

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

A Jeśli pojawi się wiele funkcji, to wszystkie funkcje te powinny być dostępne dla kontekstu nadrzędnego.

 4
Author: Tarang Bhalodia,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:10:44