Wywołanie funkcji Java / Scala z zadania
Tło
Moje pierwotne pytanie brzmiało dlaczego użycie DecisionTreeModel.predict
wewnątrz funkcji mapy powoduje wyjątek? i jest związane z Jak wygenerować krotki (oryginalna etykieta, przewidywana etykieta) na Spark za pomocą MLlib?
Kiedy używamy Scala API zalecanym sposobem uzyskiwania prognoz dla RDD[LabeledPoint]
za pomocą DecisionTreeModel
jest po prostu mapowanie na RDD
:
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
Niestety podobne podejście w PySpark nie działa tak dobrze:
labelsAndPredictions = testData.map(
lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
Wyjątek: wygląda na to, że próbujesz odwołać się do SparkContext ze zmiennej broadcast, akcji lub transforamtion. SparkContext może być używany tylko na sterowniku, a nie w kodzie uruchamianym na workerach. Aby uzyskać więcej informacji, zobacz SPARK-5063.
Zamiast tego oficjalna dokumentacja zaleca coś takiego:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
Co tu się dzieje? Nie ma tu zmiennej broadcast i Scala API definiuje predict
jako :
/**
* Predict values for a single data point using the model trained.
*
* @param features array representing a single data point
* @return Double prediction from the trained model
*/
def predict(features: Vector): Double = {
topNode.predict(features)
}
/**
* Predict values for the given data set using the model trained.
*
* @param features RDD representing data points to be predicted
* @return RDD of predictions for each of the given data points
*/
def predict(features: RDD[Vector]): RDD[Double] = {
features.map(x => predict(x))
}
Więc przynajmniej na pierwszy rzut oka wywołanie z akcji lub transformacji nie jest problemem, ponieważ przewidywanie wydaje się być operacją lokalną.
Wyjaśnienie
Po pewnym kopaniu zorientowałem się, że źródłem problemu jest JavaModelWrapper.call
metoda przywołana z DecisionTreeModel.predict . It access SparkContext
który jest wymagany do wywołania funkcji Java:
callJavaFunc(self._sc, getattr(self._java_model, name), *a)
Pytanie
W przypadku DecisionTreeModel.predict
istnieje zalecane obejście i cały wymagany kod jest już częścią Scala API, ale czy jest jakiś elegancki sposób, aby poradzić sobie z takim problemem w ogóle?
Jedyne rozwiązania, o których teraz myślę, są raczej ciężkie:
- przepychanie wszystkiego do JVM poprzez rozszerzanie klas Spark poprzez ukryte konwersje lub dodawanie pewnego rodzaju opakowań
- korzystanie bezpośrednio z bramy Py4j
1 answers
Komunikacja przy użyciu domyślnej bramy Py4J jest po prostu niemożliwa. Aby zrozumieć, dlaczego musimy spojrzeć na poniższy schemat z dokumentu PySpark Internals[1]:
Ponieważ brama Py4J działa na driverze, nie jest dostępna dla interpreterów Pythona, którzy komunikują się z robotnikami JVM poprzez sockety (patrz na przykład PythonRDD
/ rdd.py
).
Teoretycznie możliwe jest utworzenie osobnej bramy Py4J dla każdego pracownik, ale w praktyce jest mało prawdopodobne, aby był przydatny. Ignorowanie problemów, takich jak niezawodność Py4J po prostu nie jest zaprojektowany do wykonywania zadań wymagających dużej ilości danych.
Czy są jakieś obejścia?
-
W tym celu należy użyć Spark SQL Data Sources API do zawijania kodu JVM.
Plusy : obsługiwane, wysoki poziom, nie wymaga dostępu do wewnętrznego API PySpark
Cons : stosunkowo obszerny i niezbyt dobrze udokumentowany, ograniczony głównie do danych wejściowych DANE
-
Operowanie na ramkach danych przy użyciu Scala UDFs.
Plusy : łatwe do zaimplementowania (zobacz Spark: jak mapować Pythona za pomocą funkcji Scala lub Java zdefiniowanych przez użytkownika?[[9]}), brak konwersji danych pomiędzy Pythonem i scalą, jeśli dane są już przechowywane w ramce danych, minimalny dostęp do Py4J
Cons: wymaga dostępu do bramy Py4J i wewnętrznych metod, ograniczone do Spark SQL, trudne do debugowania, nie obsługiwane
-
Tworzenie wysokiej klasy Scali interfejs w podobny sposób jak to się robi w MLlib.
Plusy : Elastyczny, możliwość wykonania dowolnego złożonego kodu. Można to zrobić bezpośrednio na RDD (zobacz na przykład MLlib model wrappers) lub za pomocą
DataFrames
(Zobacz Jak używać klasy Scala wewnątrz Pyspark). To drugie rozwiązanie wydaje się o wiele bardziej przyjazne, ponieważ wszystkie szczegóły ser-de są już obsługiwane przez istniejące API.Cons: niski poziom, wymagana konwersja danych, tak samo jak wymaga UDFs dostęp do Py4J i wewnętrznego API, nieobsługiwany
Kilka podstawowych przykładów można znaleźć w przekształcanie PySpark RDD za pomocą Scali
-
Korzystanie z zewnętrznego narzędzia do zarządzania przepływem pracy do przełączania między zadaniami Python i Scala / Java i przekazywania danych do DFS.
Plusy: łatwe do wdrożenia, minimalne zmiany w samym kodzie
Wady : koszt odczytu / zapisu danych (Alluxio ?)
-
Using shared
SQLContext
(Zobacz dla przykład Apache Zeppelin lub Livy ) do przekazywania danych między językami gości za pomocą zarejestrowanych tabel tymczasowych.Zalety : dobrze nadaje się do interaktywnej analizy
Cons : nie tyle dla zadań wsadowych (Zeppelin) lub może wymagać dodatkowej orkiestracji (Livy)
-
Joshua Rosen. (2014, Sierpień 04) PySpark Internals . Pobrano z https://cwiki.apache.org/confluence/display/SPARK/PySpark + wewnętrzne
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-09-01 10:35:38