Wywołanie funkcji Java / Scala z zadania

Tło

Moje pierwotne pytanie brzmiało dlaczego użycie DecisionTreeModel.predict wewnątrz funkcji mapy powoduje wyjątek? i jest związane z Jak wygenerować krotki (oryginalna etykieta, przewidywana etykieta) na Spark za pomocą MLlib?

Kiedy używamy Scala API zalecanym sposobem uzyskiwania prognoz dla RDD[LabeledPoint] za pomocą DecisionTreeModel jest po prostu mapowanie na RDD:

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

Niestety podobne podejście w PySpark nie działa tak dobrze:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

Wyjątek: wygląda na to, że próbujesz odwołać się do SparkContext ze zmiennej broadcast, akcji lub transforamtion. SparkContext może być używany tylko na sterowniku, a nie w kodzie uruchamianym na workerach. Aby uzyskać więcej informacji, zobacz SPARK-5063.

Zamiast tego oficjalna dokumentacja zaleca coś takiego:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
Co tu się dzieje? Nie ma tu zmiennej broadcast i Scala API definiuje predict jako :
/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = {
  topNode.predict(features)
}

/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = {
  features.map(x => predict(x))
}

Więc przynajmniej na pierwszy rzut oka wywołanie z akcji lub transformacji nie jest problemem, ponieważ przewidywanie wydaje się być operacją lokalną.

Wyjaśnienie

Po pewnym kopaniu zorientowałem się, że źródłem problemu jest JavaModelWrapper.call metoda przywołana z DecisionTreeModel.predict . It access SparkContext który jest wymagany do wywołania funkcji Java:

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

Pytanie

W przypadku DecisionTreeModel.predict istnieje zalecane obejście i cały wymagany kod jest już częścią Scala API, ale czy jest jakiś elegancki sposób, aby poradzić sobie z takim problemem w ogóle?

Jedyne rozwiązania, o których teraz myślę, są raczej ciężkie:

  • przepychanie wszystkiego do JVM poprzez rozszerzanie klas Spark poprzez ukryte konwersje lub dodawanie pewnego rodzaju opakowań
  • korzystanie bezpośrednio z bramy Py4j
Author: Community, 2015-07-28

1 answers

Komunikacja przy użyciu domyślnej bramy Py4J jest po prostu niemożliwa. Aby zrozumieć, dlaczego musimy spojrzeć na poniższy schemat z dokumentu PySpark Internals[1]:

Tutaj wpisz opis obrazka

Ponieważ brama Py4J działa na driverze, nie jest dostępna dla interpreterów Pythona, którzy komunikują się z robotnikami JVM poprzez sockety (patrz na przykład PythonRDD / rdd.py).

Teoretycznie możliwe jest utworzenie osobnej bramy Py4J dla każdego pracownik, ale w praktyce jest mało prawdopodobne, aby był przydatny. Ignorowanie problemów, takich jak niezawodność Py4J po prostu nie jest zaprojektowany do wykonywania zadań wymagających dużej ilości danych.

Czy są jakieś obejścia?

  1. W tym celu należy użyć Spark SQL Data Sources API do zawijania kodu JVM.

    Plusy : obsługiwane, wysoki poziom, nie wymaga dostępu do wewnętrznego API PySpark

    Cons : stosunkowo obszerny i niezbyt dobrze udokumentowany, ograniczony głównie do danych wejściowych DANE

  2. Operowanie na ramkach danych przy użyciu Scala UDFs.

    Plusy : łatwe do zaimplementowania (zobacz Spark: jak mapować Pythona za pomocą funkcji Scala lub Java zdefiniowanych przez użytkownika?[[9]}), brak konwersji danych pomiędzy Pythonem i scalą, jeśli dane są już przechowywane w ramce danych, minimalny dostęp do Py4J

    Cons: wymaga dostępu do bramy Py4J i wewnętrznych metod, ograniczone do Spark SQL, trudne do debugowania, nie obsługiwane

  3. Tworzenie wysokiej klasy Scali interfejs w podobny sposób jak to się robi w MLlib.

    Plusy : Elastyczny, możliwość wykonania dowolnego złożonego kodu. Można to zrobić bezpośrednio na RDD (zobacz na przykład MLlib model wrappers) lub za pomocą DataFrames (Zobacz Jak używać klasy Scala wewnątrz Pyspark). To drugie rozwiązanie wydaje się o wiele bardziej przyjazne, ponieważ wszystkie szczegóły ser-de są już obsługiwane przez istniejące API.

    Cons: niski poziom, wymagana konwersja danych, tak samo jak wymaga UDFs dostęp do Py4J i wewnętrznego API, nieobsługiwany

    Kilka podstawowych przykładów można znaleźć w przekształcanie PySpark RDD za pomocą Scali

  4. Korzystanie z zewnętrznego narzędzia do zarządzania przepływem pracy do przełączania między zadaniami Python i Scala / Java i przekazywania danych do DFS.

    Plusy: łatwe do wdrożenia, minimalne zmiany w samym kodzie

    Wady : koszt odczytu / zapisu danych (Alluxio ?)

  5. Using shared SQLContext (Zobacz dla przykład Apache Zeppelin lub Livy ) do przekazywania danych między językami gości za pomocą zarejestrowanych tabel tymczasowych.

    Zalety : dobrze nadaje się do interaktywnej analizy

    Cons : nie tyle dla zadań wsadowych (Zeppelin) lub może wymagać dodatkowej orkiestracji (Livy)


    Joshua Rosen. (2014, Sierpień 04) PySpark Internals . Pobrano z https://cwiki.apache.org/confluence/display/SPARK/PySpark + wewnętrzne
 37
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-09-01 10:35:38