Funkcje Spark a wydajność UDF?

Spark oferuje teraz predefiniowane funkcje, które mogą być używane w ramkach danych i wydaje się, że są one wysoce zoptymalizowane. Moje pierwotne pytanie miało być na który jest szybszy, ale zrobiłem kilka testów siebie i okazało się, że funkcje spark są około 10 razy szybsze przynajmniej w jednym przypadku. Czy ktoś wie dlaczego tak jest i kiedy udf byłby szybszy (tylko w przypadku, gdy istnieje identyczna funkcja spark)?

Oto Mój kod testowy (uruchomiony na społeczności Databricks ed): {]}

# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)

# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
  name = fake.name().split()
  return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)

# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
    for _ in xrange(times):
        yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print len(data)
data[0]

dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))
dataDF.cache()

Funkcja UDF:

concat_s = udf(lambda s: s+ 's')
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name'))
udfData.count()

Funkcja Iskry:

spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name'))
spfData.count()
Udf Zwykle trwało około 1,1 - 1,4 s, a funkcja Spark concat zawsze trwała poniżej 0,15 s.]}
Author: zero323, 2016-07-10

3 answers

Kiedy udf będzie szybszy

Jeśli zapytasz o Python UDF odpowiedź brzmi prawdopodobnie nigdy*. Ponieważ funkcje SQL są stosunkowo proste i nie są przeznaczone do złożonych zadań, jest to prawie niemożliwe zrekompensować koszty wielokrotnej serializacji, deserializacji i przepływu danych między interpreterem Pythona i JVM.

Czy ktoś wie dlaczego tak jest

Główne powody są już wymienione powyżej i można je sprowadzić do prostego faktu ta Spark DataFrame jest natywnie strukturą JVM, a standardowe metody dostępu są implementowane przez proste wywołania do Java API. UDF z drugiej strony są zaimplementowane w Pythonie i wymagają przenoszenia danych tam iz powrotem.

Podczas gdy PySpark ogólnie wymaga przenoszenia danych pomiędzy JVM a Pythonem, w przypadku niskiego poziomu API RDD zazwyczaj nie wymaga kosztownej aktywności serde. Spark SQL dodaje dodatkowy koszt serializacji i serializacji, a także koszt przenoszenia danych z I do reprezentacja na JVM. Późniejszy jest specyficzny dla wszystkich UDFs (Python, Scala i Java), ale pierwszy jest specyficzny dla języków obcych.

W Przeciwieństwie Do UDFs, funkcje Spark SQL działają bezpośrednio na JVM i zazwyczaj są dobrze zintegrowane zarówno z Catalyst, jak i Tungsten. Oznacza to, że mogą one być zoptymalizowane w planie realizacji i większość czasu może korzystać z codgen i innych optymalizacji wolframu. Ponadto mogą one operować na danych w ich" natywnej " reprezentacji.

Więc w problem polega na tym, że Python UDF musi wprowadzać dane do kodu, podczas gdy wyrażenia SQL idą w drugą stronę.


* zgodnie z przybliżonymi szacunkami PySpark window UDF może pokonać funkcję okna Scala.

 69
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-10-02 14:51:15

Po latach, kiedy mam więcej iskry wiedzy i po raz drugi spojrzeć na pytanie, po prostu uświadomiłem sobie, co @ alfredox naprawdę chce zadać. Więc ponownie poprawiłem i podzieliłem odpowiedź na dwie części:


Aby odpowiedzieć dlaczego natywna funkcja DF (natywna funkcja Spark-SQL) jest szybsza:

Zasadniczo, dlaczego natywna funkcja Spark jest zawsze szybsza niż Spark UDF, niezależnie od tego, czy twój UDF jest zaimplementowany w Pythonie czy Scali.

Po pierwsze, musimy zrozumieć co , które jest po raz pierwszy wprowadzony w Spark 1.4.

Jest to backend i na czym się skupia:

  1. zarządzanie pamięcią Off-Heap za pomocą binarnej reprezentacji danych w pamięci, aka Tungsten row format i zarządzanie pamięcią jawnie,
  2. Cache location, który jest o Cache-aware obliczeń z cache-aware układu dla wysokiej częstotliwości trafień cache,
  3. Generowanie kodu całego etapu (aka CodeGen).

jedna z największych iskier zabójcą wydajności jest GC. GC zatrzymuje wszystkie wątki w JVM, aż GC się skończy. Właśnie dlatego wprowadzono zarządzanie pamięcią Off-Heap.

Podczas wykonywania natywnych funkcji Spark-SQL, dane pozostaną w backendzie tungsten. Jednak w scenariuszu Spark UDF dane zostaną przeniesione z tungsten do JVM (Scala scenario) lub JVM i procesu Pythona (Python), aby wykonać rzeczywisty proces, a następnie wrócić do tungsten. W wyniku że:

  1. nieuchronnie, będzie narzut / kara na :
    1. Deserializuj wejście z wolframu.
    2. Serializuj wyjście z powrotem do wolframu.
  2. [17]}nawet używając Scali, pierwszej klasy obywatela w Spark, zwiększy to ślad pamięci w JVM, co może prawdopodobnie obejmować więcej GC w JVM. ten problem dokładnie to, co funkcja tungsten "Off-Heap Memory Management" próbuje rozwiązać.

To odpowiedź jeśli Python byłby wolniejszy od Scali:

Od 30 października, 2017, Spark właśnie wprowadził wektorowe udfs dla pyspark.

Https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

Powodem, dla którego Python UDF jest powolny, jest prawdopodobnie to, że PySpark UDF nie jest zaimplementowany w najbardziej zoptymalizowany sposób:

Zgodnie z akapitem z linku.

Spark dodał API Pythona w wersji 0.7, z obsługą dla funkcje zdefiniowane przez użytkownika. Te zdefiniowane przez użytkownika funkcje działają jeden wiersz na raz, a zatem cierpią z powodu wysokiej serializacji i narzutu wywołania.

Jednak nowo wektoryzowane udfs wydają się znacznie poprawiać wydajność:

Od 3x do ponad 100x.

Tutaj wpisz opis obrazka

 21
Author: Tom Tang,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-10-20 22:24:35

Użyj standardowych funkcji opartych na kolumnach wyższego poziomu z operatorami zbiorów danych, o ile to możliwe, zanim powrócisz do używania własnych funkcji UDF, ponieważ UDFs jest BlackBox Dla Spark, a więc nawet nie próbuj ich optymalizować.

To, co dzieje się za ekranami, to fakt, że katalizator nie może w ogóle przetwarzać i optymalizować UDFs, a to zagraża im jako BlackBox, co powoduje utratę wielu optymalizacji, takich jak Predicate pushdown, Constant folding i wiele innych.

 0
Author: Sharhabeel Hamdan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-01-30 14:15:03