Jak znaleźć medianę i kwantyle za pomocą Spark
Jak mogę znaleźć medianę RDD
liczb całkowitych przy użyciu rozproszonej metody IPython i Spark? RDD
jest około 700 000 elementów i dlatego zbyt duża, aby zebrać i znaleźć medianę.
To pytanie jest podobne do tego pytania. Jednak odpowiedzią na pytanie jest użycie Scali, której nie znam.
Jak obliczyć dokładną medianę za pomocą Apache Spark?
Wykorzystując myślenie do odpowiedzi Scali, staram się napisz podobną odpowiedź w Pythonie.
Wiem, że najpierw chcę uporządkować RDD
. Nie wiem jak. Widzę sortBy
(sortuje ten RDD według podanego keyfunc
) i sortByKey
(sortuje ten RDD
, który zakłada się, że składa się z par (klucz, wartość).) metody. Myślę, że oba używają wartości klucza, a mój RDD
ma tylko elementy całkowite.
- Po pierwsze, myślałem o zrobieniu
myrdd.sortBy(lambda x: x)
? - następnie znajdę długość rdd (
rdd.count()
). - wreszcie chcę znaleźć element lub 2 elementy w centrum rdd. Też potrzebuję pomocy w tej metodzie.
EDIT:
Mam pomysł. Może mogę indeksować mójRDD
, a następnie Klucz = indeks i wartość = element. A potem mogę spróbować sortować według wartości? Nie wiem, czy jest to możliwe, ponieważ istnieje tylko metoda sortByKey
. 4 answers
Praca w toku
Iskra-30569 - Dodaj funkcje DSL wywołujące percentyle_approx
Spark 2.0+:
Możesz użyć approxQuantile
metody implementującej algorytm Greenwalda-Khanna:
Python :
df.approxQuantile("x", [0.5], 0.25)
Scala:
df.stat.approxQuantile("x", Array(0.5), 0.25)
Gdzie ostatni parametr jest względnym błędem. Im niższa liczba, tym dokładniejsze wyniki i droższe obliczenia.
Od Spark 2.2 ( SPARK-14352 ) obsługuje szacowanie na wielu kolumnach:
df.approxQuantile(["x", "y", "z"], [0.5], 0.25)
I
df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)
Podstawowe metody mogą być również używane w agregacji SQL (zarówno globalnej, jak i globalnej) przy użyciu approx_percentile
funkcja:
> SELECT approx_percentile(10.0, array(0.5, 0.4, 0.1), 100);
[10.0,10.0,10.0]
> SELECT approx_percentile(10.0, 0.5, 100);
10.0
Spark
Python
Jak już wspomniałem w komentarzach, najprawdopodobniej nie jest to warte całego zamieszania. Jeśli dane są stosunkowo małe, jak w Twoim przypadku, po prostu zbierz i Oblicz medianę lokalnie:
import numpy as np
np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))
%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes
Zajmuje to około 0,01 sekundy na moim kilkuletnim komputerze i około 5,5 MB pamięci.
Jeśli dane są znacznie większe, sortowanie będzie czynnikiem ograniczającym, więc zamiast uzyskiwać dokładną wartość, prawdopodobnie lepiej jest pobierać próbki, zbierać i obliczać lokalnie. Ale jeśli naprawdę chcesz, aby użyć Spark coś takiego powinno zrobić sztuczkę (jeśli nic nie zepsułem): {]}
from numpy import floor
import time
def quantile(rdd, p, sample=None, seed=None):
"""Compute a quantile of order p ∈ [0, 1]
:rdd a numeric rdd
:p quantile(between 0 and 1)
:sample fraction of and rdd to use. If not provided we use a whole dataset
:seed random number generator seed to be used with sample
"""
assert 0 <= p <= 1
assert sample is None or 0 < sample <= 1
seed = seed if seed is not None else time.time()
rdd = rdd if sample is None else rdd.sample(False, sample, seed)
rddSortedWithIndex = (rdd.
sortBy(lambda x: x).
zipWithIndex().
map(lambda (x, i): (i, x)).
cache())
n = rddSortedWithIndex.count()
h = (n - 1) * p
rddX, rddXPlusOne = (
rddSortedWithIndex.lookup(x)[0]
for x in int(floor(h)) + np.array([0L, 1L]))
return rddX + (h - floor(h)) * (rddXPlusOne - rddX)
I kilka testów:
np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)
Wreszcie zdefiniujmy mediana:
from functools import partial
median = partial(quantile, p=0.5)
Jak na razie dobrze, ale trwa 4,66 s W trybie lokalnym bez żadnej komunikacji sieciowej. Prawdopodobnie jest sposób, aby to poprawić, ale po co w ogóle się martwić?
Język niezależny (Ula UDAF):
Jeśli używasz HiveContext
możesz również użyć Udaf. O wartościach całkowych:
rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")
sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")
O wartościach ciągłych:
sqlContext.sql("SELECT percentile(x, 0.5) FROM df")
W percentile_approx
możesz przekazać dodatkowy argument, który określa liczbę rekordów do użycia.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-01-28 13:09:03
Dodawanie rozwiązania, jeśli chcesz tylko metodę RDD i nie chcesz przejść do DF. Ten fragment może dać ci percentyl za podwójne RDD.
Jeśli wprowadzasz percentyl jako 50, powinieneś uzyskać wymaganą medianę. Daj mi znać, jeśli są jakieś sprawy narożne, które nie zostały uwzględnione.
/**
* Gets the nth percentile entry for an RDD of doubles
*
* @param inputScore : Input scores consisting of a RDD of doubles
* @param percentile : The percentile cutoff required (between 0 to 100), e.g 90%ile of [1,4,5,9,19,23,44] = ~23.
* It prefers the higher value when the desired quantile lies between two data points
* @return : The number best representing the percentile in the Rdd of double
*/
def getRddPercentile(inputScore: RDD[Double], percentile: Double): Double = {
val numEntries = inputScore.count().toDouble
val retrievedEntry = (percentile * numEntries / 100.0 ).min(numEntries).max(0).toInt
inputScore
.sortBy { case (score) => score }
.zipWithIndex()
.filter { case (score, index) => index == retrievedEntry }
.map { case (score, index) => score }
.collect()(0)
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-10-15 01:57:27
Oto metoda, której użyłem przy użyciu funkcji okienkowych (z pyspark 2.2.0).
from pyspark.sql import DataFrame
class median():
""" Create median class with over method to pass partition """
def __init__(self, df, col, name):
assert col
self.column=col
self.df = df
self.name = name
def over(self, window):
from pyspark.sql.functions import percent_rank, pow, first
first_window = window.orderBy(self.column) # first, order by column we want to compute the median for
df = self.df.withColumn("percent_rank", percent_rank().over(first_window)) # add percent_rank column, percent_rank = 0.5 coressponds to median
second_window = window.orderBy(pow(df.percent_rank-0.5, 2)) # order by (percent_rank - 0.5)^2 ascending
return df.withColumn(self.name, first(self.column).over(second_window)) # the first row of the window corresponds to median
def addMedian(self, col, median_name):
""" Method to be added to spark native DataFrame class """
return median(self, col, median_name)
# Add method to DataFrame class
DataFrame.addMedian = addMedian
Następnie zadzwoń do metody addMedian, aby obliczyć medianę col2:
from pyspark.sql import Window
median_window = Window.partitionBy("col1")
df = df.addMedian("col2", "median").over(median_window)
Wreszcie można grupować według W razie potrzeby.
df.groupby("col1", "median")
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-26 10:38:00
Napisałem funkcję, która przyjmuje ramkę danych jako wejście i zwraca ramkę danych, która ma medianę jako wyjście na partycji i order_col jest kolumną, dla której chcemy obliczyć medianę dla part_col jest poziomem, na którym chcemy obliczyć medianę dla:
from pyspark.sql import Window
import pyspark.sql.functions as F
def calculate_median(dataframe, part_col, order_col):
win = Window.partitionBy(*part_col).orderBy(order_col)
# count_row = dataframe.groupby(*part_col).distinct().count()
dataframe.persist()
dataframe.count()
temp = dataframe.withColumn("rank", F.row_number().over(win))
temp = temp.withColumn(
"count_row_part",
F.count(order_col).over(Window.partitionBy(part_col))
)
temp = temp.withColumn(
"even_flag",
F.when(
F.col("count_row_part") %2 == 0,
F.lit(1)
).otherwise(
F.lit(0)
)
).withColumn(
"mid_value",
F.floor(F.col("count_row_part")/2)
)
temp = temp.withColumn(
"avg_flag",
F.when(
(F.col("even_flag")==1) &
(F.col("rank") == F.col("mid_value"))|
((F.col("rank")-1) == F.col("mid_value")),
F.lit(1)
).otherwise(
F.when(
F.col("rank") == F.col("mid_value")+1,
F.lit(1)
)
)
)
temp.show(10)
return temp.filter(
F.col("avg_flag") == 1
).groupby(
part_col + ["avg_flag"]
).agg(
F.avg(F.col(order_col)).alias("median")
).drop("avg_flag")
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-09-04 20:20:39