Funkcje Spark a wydajność UDF?
Spark oferuje teraz predefiniowane funkcje, które mogą być używane w ramkach danych i wydaje się, że są one wysoce zoptymalizowane. Moje pierwotne pytanie miało być na który jest szybszy, ale zrobiłem kilka testów siebie i okazało się, że funkcje spark są około 10 razy szybsze przynajmniej w jednym przypadku. Czy ktoś wie dlaczego tak jest i kiedy udf byłby szybszy (tylko w przypadku, gdy istnieje identyczna funkcja spark)?
Oto Mój kod testowy (uruchomiony na społeczności Databricks ed): {]}
# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
name = fake.name().split()
return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
for _ in xrange(times):
yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print len(data)
data[0]
dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))
dataDF.cache()
Funkcja UDF:
concat_s = udf(lambda s: s+ 's')
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name'))
udfData.count()
Funkcja Iskry:
spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name'))
spfData.count()
Udf Zwykle trwało około 1,1 - 1,4 s, a funkcja Spark concat
zawsze trwała poniżej 0,15 s.]} 3 answers
Kiedy udf będzie szybszy
Jeśli zapytasz o Python UDF odpowiedź brzmi prawdopodobnie nigdy*. Ponieważ funkcje SQL są stosunkowo proste i nie są przeznaczone do złożonych zadań, jest to prawie niemożliwe zrekompensować koszty wielokrotnej serializacji, deserializacji i przepływu danych między interpreterem Pythona i JVM.
Czy ktoś wie dlaczego tak jest
Główne powody są już wymienione powyżej i można je sprowadzić do prostego faktu ta Spark DataFrame
jest natywnie strukturą JVM, a standardowe metody dostępu są implementowane przez proste wywołania do Java API. UDF z drugiej strony są zaimplementowane w Pythonie i wymagają przenoszenia danych tam iz powrotem.
Podczas gdy PySpark ogólnie wymaga przenoszenia danych pomiędzy JVM a Pythonem, w przypadku niskiego poziomu API RDD zazwyczaj nie wymaga kosztownej aktywności serde. Spark SQL dodaje dodatkowy koszt serializacji i serializacji, a także koszt przenoszenia danych z I do reprezentacja na JVM. Późniejszy jest specyficzny dla wszystkich UDFs (Python, Scala i Java), ale pierwszy jest specyficzny dla języków obcych.
W Przeciwieństwie Do UDFs, funkcje Spark SQL działają bezpośrednio na JVM i zazwyczaj są dobrze zintegrowane zarówno z Catalyst, jak i Tungsten. Oznacza to, że mogą one być zoptymalizowane w planie realizacji i większość czasu może korzystać z codgen i innych optymalizacji wolframu. Ponadto mogą one operować na danych w ich" natywnej " reprezentacji.
Więc w problem polega na tym, że Python UDF musi wprowadzać dane do kodu, podczas gdy wyrażenia SQL idą w drugą stronę.
* zgodnie z przybliżonymi szacunkami PySpark window UDF może pokonać funkcję okna Scala.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-10-02 14:51:15
Po latach, kiedy mam więcej iskry wiedzy i po raz drugi spojrzeć na pytanie, po prostu uświadomiłem sobie, co @ alfredox naprawdę chce zadać. Więc ponownie poprawiłem i podzieliłem odpowiedź na dwie części:
Aby odpowiedzieć dlaczego natywna funkcja DF (natywna funkcja Spark-SQL) jest szybsza:
Zasadniczo, dlaczego natywna funkcja Spark jest zawsze szybsza niż Spark UDF, niezależnie od tego, czy twój UDF jest zaimplementowany w Pythonie czy Scali.
Po pierwsze, musimy zrozumieć co , które jest po raz pierwszy wprowadzony w Spark 1.4.
Jest to backend i na czym się skupia:
- zarządzanie pamięcią Off-Heap za pomocą binarnej reprezentacji danych w pamięci, aka Tungsten row format i zarządzanie pamięcią jawnie,
- Cache location, który jest o Cache-aware obliczeń z cache-aware układu dla wysokiej częstotliwości trafień cache,
- Generowanie kodu całego etapu (aka CodeGen).
jedna z największych iskier zabójcą wydajności jest GC. GC zatrzymuje wszystkie wątki w JVM, aż GC się skończy. Właśnie dlatego wprowadzono zarządzanie pamięcią Off-Heap.
Podczas wykonywania natywnych funkcji Spark-SQL, dane pozostaną w backendzie tungsten. Jednak w scenariuszu Spark UDF dane zostaną przeniesione z tungsten do JVM (Scala scenario) lub JVM i procesu Pythona (Python), aby wykonać rzeczywisty proces, a następnie wrócić do tungsten. W wyniku że:
- nieuchronnie, będzie narzut / kara na :
- Deserializuj wejście z wolframu.
- Serializuj wyjście z powrotem do wolframu.
[17]}nawet używając Scali, pierwszej klasy obywatela w Spark, zwiększy to ślad pamięci w JVM, co może prawdopodobnie obejmować więcej GC w JVM.
ten problem dokładnie to, co funkcja tungsten "Off-Heap Memory Management" próbuje rozwiązać.
To odpowiedź jeśli Python byłby wolniejszy od Scali:
Od 30 października, 2017, Spark właśnie wprowadził wektorowe udfs dla pyspark.
Https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
Powodem, dla którego Python UDF jest powolny, jest prawdopodobnie to, że PySpark UDF nie jest zaimplementowany w najbardziej zoptymalizowany sposób:
Zgodnie z akapitem z linku.
Spark dodał API Pythona w wersji 0.7, z obsługą dla funkcje zdefiniowane przez użytkownika. Te zdefiniowane przez użytkownika funkcje działają jeden wiersz na raz, a zatem cierpią z powodu wysokiej serializacji i narzutu wywołania.
Jednak nowo wektoryzowane udfs wydają się znacznie poprawiać wydajność:
Od 3x do ponad 100x.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-10-20 22:24:35
Użyj standardowych funkcji opartych na kolumnach wyższego poziomu z operatorami zbiorów danych, o ile to możliwe, zanim powrócisz do używania własnych funkcji UDF, ponieważ UDFs jest BlackBox Dla Spark, a więc nawet nie próbuj ich optymalizować.
To, co dzieje się za ekranami, to fakt, że katalizator nie może w ogóle przetwarzać i optymalizować UDFs, a to zagraża im jako BlackBox, co powoduje utratę wielu optymalizacji, takich jak Predicate pushdown, Constant folding i wiele innych.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-01-30 14:15:03