Jak znaleźć medianę i kwantyle za pomocą Spark

Jak mogę znaleźć medianę RDD liczb całkowitych przy użyciu rozproszonej metody IPython i Spark? RDD jest około 700 000 elementów i dlatego zbyt duża, aby zebrać i znaleźć medianę.

To pytanie jest podobne do tego pytania. Jednak odpowiedzią na pytanie jest użycie Scali, której nie znam.

Jak obliczyć dokładną medianę za pomocą Apache Spark?

Wykorzystując myślenie do odpowiedzi Scali, staram się napisz podobną odpowiedź w Pythonie.

Wiem, że najpierw chcę uporządkować RDD. Nie wiem jak. Widzę sortBy (sortuje ten RDD według podanego keyfunc) i sortByKey (sortuje ten RDD, który zakłada się, że składa się z par (klucz, wartość).) metody. Myślę, że oba używają wartości klucza, a mój RDD ma tylko elementy całkowite.

  1. Po pierwsze, myślałem o zrobieniu myrdd.sortBy(lambda x: x)?
  2. następnie znajdę długość rdd (rdd.count()).
  3. wreszcie chcę znaleźć element lub 2 elementy w centrum rdd. Też potrzebuję pomocy w tej metodzie.

EDIT:

Mam pomysł. Może mogę indeksować mój RDD, a następnie Klucz = indeks i wartość = element. A potem mogę spróbować sortować według wartości? Nie wiem, czy jest to możliwe, ponieważ istnieje tylko metoda sortByKey.
Author: Community, 2015-07-15

4 answers

Praca w toku

Iskra-30569 - Dodaj funkcje DSL wywołujące percentyle_approx

Spark 2.0+:

Możesz użyć approxQuantile metody implementującej algorytm Greenwalda-Khanna:

Python :

df.approxQuantile("x", [0.5], 0.25)

Scala:

df.stat.approxQuantile("x", Array(0.5), 0.25)

Gdzie ostatni parametr jest względnym błędem. Im niższa liczba, tym dokładniejsze wyniki i droższe obliczenia.

Od Spark 2.2 ( SPARK-14352 ) obsługuje szacowanie na wielu kolumnach:

df.approxQuantile(["x", "y", "z"], [0.5], 0.25)

I

df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)

Podstawowe metody mogą być również używane w agregacji SQL (zarówno globalnej, jak i globalnej) przy użyciu approx_percentile funkcja:

> SELECT approx_percentile(10.0, array(0.5, 0.4, 0.1), 100);
 [10.0,10.0,10.0]
> SELECT approx_percentile(10.0, 0.5, 100);
 10.0

Spark

Python

Jak już wspomniałem w komentarzach, najprawdopodobniej nie jest to warte całego zamieszania. Jeśli dane są stosunkowo małe, jak w Twoim przypadku, po prostu zbierz i Oblicz medianę lokalnie:

import numpy as np

np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))

%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes

Zajmuje to około 0,01 sekundy na moim kilkuletnim komputerze i około 5,5 MB pamięci.

Jeśli dane są znacznie większe, sortowanie będzie czynnikiem ograniczającym, więc zamiast uzyskiwać dokładną wartość, prawdopodobnie lepiej jest pobierać próbki, zbierać i obliczać lokalnie. Ale jeśli naprawdę chcesz, aby użyć Spark coś takiego powinno zrobić sztuczkę (jeśli nic nie zepsułem): {]}

from numpy import floor
import time

def quantile(rdd, p, sample=None, seed=None):
    """Compute a quantile of order p ∈ [0, 1]
    :rdd a numeric rdd
    :p quantile(between 0 and 1)
    :sample fraction of and rdd to use. If not provided we use a whole dataset
    :seed random number generator seed to be used with sample
    """
    assert 0 <= p <= 1
    assert sample is None or 0 < sample <= 1

    seed = seed if seed is not None else time.time()
    rdd = rdd if sample is None else rdd.sample(False, sample, seed)

    rddSortedWithIndex = (rdd.
        sortBy(lambda x: x).
        zipWithIndex().
        map(lambda (x, i): (i, x)).
        cache())

    n = rddSortedWithIndex.count()
    h = (n - 1) * p

    rddX, rddXPlusOne = (
        rddSortedWithIndex.lookup(x)[0]
        for x in int(floor(h)) + np.array([0L, 1L]))

    return rddX + (h - floor(h)) * (rddXPlusOne - rddX)

I kilka testów:

np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)

Wreszcie zdefiniujmy mediana:

from functools import partial
median = partial(quantile, p=0.5)

Jak na razie dobrze, ale trwa 4,66 s W trybie lokalnym bez żadnej komunikacji sieciowej. Prawdopodobnie jest sposób, aby to poprawić, ale po co w ogóle się martwić?

Język niezależny (Ula UDAF):

Jeśli używasz HiveContext możesz również użyć Udaf. O wartościach całkowych:

rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")

sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")

O wartościach ciągłych:

sqlContext.sql("SELECT percentile(x, 0.5) FROM df")

W percentile_approx możesz przekazać dodatkowy argument, który określa liczbę rekordów do użycia.

 123
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-01-28 13:09:03

Dodawanie rozwiązania, jeśli chcesz tylko metodę RDD i nie chcesz przejść do DF. Ten fragment może dać ci percentyl za podwójne RDD.

Jeśli wprowadzasz percentyl jako 50, powinieneś uzyskać wymaganą medianę. Daj mi znać, jeśli są jakieś sprawy narożne, które nie zostały uwzględnione.

/**
  * Gets the nth percentile entry for an RDD of doubles
  *
  * @param inputScore : Input scores consisting of a RDD of doubles
  * @param percentile : The percentile cutoff required (between 0 to 100), e.g 90%ile of [1,4,5,9,19,23,44] = ~23.
  *                     It prefers the higher value when the desired quantile lies between two data points
  * @return : The number best representing the percentile in the Rdd of double
  */    
  def getRddPercentile(inputScore: RDD[Double], percentile: Double): Double = {
    val numEntries = inputScore.count().toDouble
    val retrievedEntry = (percentile * numEntries / 100.0 ).min(numEntries).max(0).toInt


    inputScore
      .sortBy { case (score) => score }
      .zipWithIndex()
      .filter { case (score, index) => index == retrievedEntry }
      .map { case (score, index) => score }
      .collect()(0)
  }
 8
Author: Vedant,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-10-15 01:57:27

Oto metoda, której użyłem przy użyciu funkcji okienkowych (z pyspark 2.2.0).

from pyspark.sql import DataFrame

class median():
    """ Create median class with over method to pass partition """
    def __init__(self, df, col, name):
        assert col
        self.column=col
        self.df = df
        self.name = name

    def over(self, window):
        from pyspark.sql.functions import percent_rank, pow, first

        first_window = window.orderBy(self.column)                                  # first, order by column we want to compute the median for
        df = self.df.withColumn("percent_rank", percent_rank().over(first_window))  # add percent_rank column, percent_rank = 0.5 coressponds to median
        second_window = window.orderBy(pow(df.percent_rank-0.5, 2))                 # order by (percent_rank - 0.5)^2 ascending
        return df.withColumn(self.name, first(self.column).over(second_window))     # the first row of the window corresponds to median

def addMedian(self, col, median_name):
    """ Method to be added to spark native DataFrame class """
    return median(self, col, median_name)

# Add method to DataFrame class
DataFrame.addMedian = addMedian

Następnie zadzwoń do metody addMedian, aby obliczyć medianę col2:

from pyspark.sql import Window

median_window = Window.partitionBy("col1")
df = df.addMedian("col2", "median").over(median_window)

Wreszcie można grupować według W razie potrzeby.

df.groupby("col1", "median")
 7
Author: Benoît Carne,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-26 10:38:00

Napisałem funkcję, która przyjmuje ramkę danych jako wejście i zwraca ramkę danych, która ma medianę jako wyjście na partycji i order_col jest kolumną, dla której chcemy obliczyć medianę dla part_col jest poziomem, na którym chcemy obliczyć medianę dla:

from pyspark.sql import Window
import pyspark.sql.functions as F

def calculate_median(dataframe, part_col, order_col):
    win = Window.partitionBy(*part_col).orderBy(order_col)
#     count_row = dataframe.groupby(*part_col).distinct().count()
    dataframe.persist()
    dataframe.count()
    temp = dataframe.withColumn("rank", F.row_number().over(win))
    temp = temp.withColumn(
        "count_row_part",
        F.count(order_col).over(Window.partitionBy(part_col))
    )
    temp = temp.withColumn(
        "even_flag",
        F.when(
            F.col("count_row_part") %2 == 0,
            F.lit(1)
        ).otherwise(
            F.lit(0)
        )
    ).withColumn(
        "mid_value",
        F.floor(F.col("count_row_part")/2)
    )

    temp = temp.withColumn(
        "avg_flag",
        F.when(
            (F.col("even_flag")==1) &
            (F.col("rank") == F.col("mid_value"))|
            ((F.col("rank")-1) == F.col("mid_value")),
            F.lit(1)
        ).otherwise(
        F.when(
            F.col("rank") == F.col("mid_value")+1,
            F.lit(1)
            )
        )
    )
    temp.show(10)
    return temp.filter(
        F.col("avg_flag") == 1
    ).groupby(
        part_col + ["avg_flag"]
    ).agg(
        F.avg(F.col(order_col)).alias("median")
    ).drop("avg_flag")
 2
Author: Ankit Kumar Namdeo,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-09-04 20:20:39