Zadanie nie serializowalne: java. io. NotSerializableException podczas wywoływania funkcji poza zamknięciem tylko na klasach nie obiektach
Dziwne zachowanie podczas wywoływania funkcji poza Zamknięciem:
- Gdy funkcja jest w obiekcie wszystko działa
- Gdy funkcja jest w klasie get:
Zadanie nie serializowalne: java. io. NotSerializableException: testing
Problem w tym, że potrzebuję kodu w klasie, A Nie obiektu. Wiesz, dlaczego tak się dzieje? Czy obiekt Scala jest serializowany (domyślnie?)?
To jest przykład kodu roboczego:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
To jest niedziałającym przykładem:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
6 answers
Nie sądzę, aby druga odpowiedź była całkowicie poprawna. RDD są rzeczywiście serializowalne , więc to nie jest to, co powoduje niepowodzenie twojego zadania.
Spark jest rozproszonym silnikiem obliczeniowym, a jego główną abstrakcją jest odporny rozproszony zbiór danych (RDD), który może być postrzegany jako rozproszony zbiór. Zasadniczo elementy RDD są podzielone na węzły klastra, ale Spark abstrakuje to z dala od użytkownika, umożliwiając użytkownikowi interakcję z RDD (kolekcja) jakby była lokalna.Nie wdawać się w zbyt wiele szczegółów, ale kiedy uruchamiasz różne transformacje na RDD (map
, flatMap
, filter
i inni), Twój kod transformacji (zamknięcia) to:
- serialized on the driver node,
- wysłane do odpowiednich węzłów w klastrze,
- deserialized,
- i wreszcie wykonywane na węzłach
Można oczywiście uruchomić to lokalnie( jak w przykładzie), ale wszystkie te fazy (oprócz wysyłki przez sieć) nadal występują. [Pozwala to wykryć wszelkie błędy nawet przed wdrożeniem do produkcji]
W drugim przypadku wywołujesz metodę zdefiniowaną w klasie testing
z wewnątrz funkcji map. Spark widzi to, a ponieważ metody nie mogą być serializowane samodzielnie, Spark próbuje serializować całość testing
klasy, tak aby Kod nadal działał po wykonaniu w innym JVM. Masz dwie możliwości:
Albo zrobisz testowanie klasy serializowalne, więc cała klasa może być serializowana przez Spark: {]}
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
Lub tworzysz someFunc
funkcję zamiast metody( funkcje są obiektami w Scali), dzięki czemu Spark będzie mógł ją serializować:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
Podobny, ale nie ten sam problem z serializacją klas może Cię zainteresować i możesz o tym przeczytać w tej prezentacji Spark Summit 2013 .
Na marginesie, możesz przepisać rddList.map(someFunc(_))
na rddList.map(someFunc)
, są dokładnie takie same. Zwykle, drugi jest preferowany, ponieważ jest mniej gadatliwy i czystszy do czytania.
EDIT (2015-03-15): SPARK-5307 wprowadził SerializationDebugger i Spark 1.3.0 jest pierwszą wersją, która go używa. Dodaje ścieżkę serializacji do NotSerializableException . Po napotkaniu wyjątku NotSerializableException debugger odwiedza Wykres obiektu, aby znaleźć ścieżkę do obiektu, który nie może być serializowany, i konstruuje informacje, aby pomóc użytkownikowi znaleźć obiekt.
W przypadku OP, to jest to, co jest drukowane na stdout:
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-03-15 17:19:51
Odpowiedź Grega jest świetna w wyjaśnianiu, dlaczego oryginalny kod nie działa i na dwa sposoby rozwiązania problemu. Jednak to rozwiązanie nie jest zbyt elastyczne; rozważ przypadek, w którym Twoje zamknięcie zawiera wywołanie metody na klasie nie - Serializable
, nad którą nie masz kontroli. Nie można dodać znacznika Serializable
do tej klasy ani zmienić implementacji bazowej, aby zmienić metodę w funkcję.
Nilesh przedstawia świetne obejście tego problemu, ale rozwiązaniem może być bardziej zwięzłe i ogólne:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
Ta funkcja-serializer może być następnie używana do automatycznego zawijania zamknięć i wywołań metod:
rdd map genMapper(someFunc)
Ta technika ma również tę zaletę, że nie wymaga dodatkowych zależności od rekina, aby uzyskać dostęp do[4]}, ponieważ chłód Twittera jest już wciągnięty przez core Spark [8]}Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:26:36
Complete talk w pełni wyjaśniające problem, który proponuje świetny sposób zmiany paradygmatu, aby uniknąć problemów z serializacją: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md
Najczęściej głosowana odpowiedź w zasadzie sugeruje wyrzucenie całej funkcji języka - czyli nie używania już metod, a tylko funkcji. Rzeczywiście w programowaniu funkcyjnym metody w klasach należy unikać, ale przekształcenie ich w funkcje nie rozwiązuje tutaj problemu projektowego(patrz powyższy link).
Jako szybkie rozwiązanie w tej konkretnej sytuacji możesz po prostu użyć adnotacji @transient
, aby powiedzieć jej, aby nie próbowała serializować obrażającej wartości (tutaj Spark.ctx
jest niestandardową klasą, A nie klasą Spark po nazwie OP):
@transient
val rddList = Spark.ctx.parallelize(list)
Możesz również zrestrukturyzować kod tak, aby rddlist mieszkał gdzie indziej, ale to też jest paskudne.
Przyszłość to prawdopodobnie zarodniki
W przyszłości Scala będzie zawierać te rzeczy zwane "zarodnikami", które powinny pozwolić nam drobnoziarniste kontrolowanie tego, co robi, a nie dokładnie zostaje wciągnięte przez zamknięcie. Co więcej, powinno to zmienić wszystkie błędy przypadkowego ściągania typów nieserializowalnych (lub niechcianych wartości) w błędy kompilacji, a nie teraz, co jest strasznym wyjątkami / wyciekami pamięci.
Http://docs.scala-lang.org/sips/pending/spores.html
Wskazówka na temat serializacji Kryo
Używając kyro, zrób to tak, aby Rejestracja jest konieczna, oznacza to, że zamiast wycieków pamięci pojawią się Błędy:
W końcu wiem, że kryo ma kryo.setRegistrationOptional (true), ale mam bardzo trudny czas, próbując dowiedzieć się, jak z niego korzystać. Kiedy ta opcja jest włączona, kryo nadal wydaje się wyrzucać wyjątki, jeśli nie zarejestrowałem klas."Strategia rejestracji zajęć z kryo
Oczywiście daje to tylko kontrolę na poziomie typu, a nie na poziomie wartości Kontrola.
... więcej pomysłów na przyszłość.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:02:48
Rozwiązałem ten problem używając innego podejścia. Po prostu musisz serializować obiekty przed przejściem przez zamknięcie, a następnie usunąć serializację. To podejście po prostu działa, nawet jeśli twoje zajęcia nie są Serializowalne, ponieważ wykorzystuje Kryo za kulisami. Wszystko czego potrzebujesz to curry. ;)
Oto przykład jak to zrobiłem:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
Możesz zrobić Bla tak skomplikowane, jak chcesz, Klasa, obiekt towarzyszący, zagnieżdżone klasy, odniesienia do wielu stron trzecich libs.
KryoSerializationWrapper odnosi się do: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-04-14 06:54:59
Nie jestem do końca pewien, czy dotyczy to Scali, ale w Javie rozwiązałem NotSerializableException
poprzez refaktoryzację kodu tak, aby zamknięcie nie uzyskało dostępu do niezwiązanego z serializowalnym polem final
.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-10-13 18:14:02
Miałem do czynienia z podobnym problemem, a to, co rozumiem z odpowiedzi Grega to
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
Twoja metodadoIT próbuje serializowaćmetodę someFunc (_) , ale ponieważ metody nie są serializowalne, próbuje serializować klasętesting , która ponownie nie jest serializowalna.
Aby Twój kod działał, powinieneś zdefiniować someFuncwewnątrz metody doIT. Na przykład:
def doIT = {
def someFunc(a:Int) = a+1
//function definition
}
val after = rddList.map(someFunc(_))
after.collect().map(println(_))
}
A Jeśli pojawi się wiele funkcji, to wszystkie funkcje te powinny być dostępne dla kontekstu nadrzędnego.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:10:44