Wykonaj wpisane połączenie w Scali za pomocą zestawów danych Spark

Lubię zbiory danych Spark, ponieważ dają mi błędy analizy i błędy składni podczas kompilacji, a także pozwalają mi pracować z getterami zamiast ciężko zakodowanych nazw/liczb. Większość obliczeń można wykonać za pomocą interfejsów API wysokiego poziomu zestawu danych. Na przykład o wiele prostsze jest wykonywanie operacji agg, select, sum, avg, map, filter lub groupBy poprzez dostęp do obiektu wpisanego w zestawie danych niż korzystanie z pól danych RDD rows.

Jednak brakuje w tym operacji join, czytałem, że można zrobić taki join

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")

Ale nie tego chcę, ponieważ wolałbym to zrobić poprzez interfejs klasy case, więc coś bardziej podobnego

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")

Najlepszą alternatywą na razie wydaje się stworzenie obiektu obok klasy case i podanie tej funkcji, która zapewni mi nazwę prawej kolumny jako ciąg znaków. Użyłbym więc pierwszej linii kodu, ale zamiast mocno zakodowanej nazwy kolumny wstawiłbym funkcję. Ale to nie wydaje się wystarczająco eleganckie..

Czy ktoś może mi doradzić w sprawie inne opcje tutaj? Celem jest uzyskanie abstrakcji z rzeczywistych nazw kolumn i praca najlepiej za pośrednictwem getterów klasy case.

Używam Spark 1.6.1 i Scala 2.10

Author: Community, 2016-11-15

1 answers

Obserwacja

Spark SQL może zoptymalizować join tylko wtedy, gdy warunek join jest oparty na operatorze równości. Oznacza to, że możemy rozważyć equijoins i non-equijoins oddzielnie.

Equijoin

Equijoin może być zaimplementowany w bezpieczny sposób, mapując zarówno Datasets do krotek (klucz, wartość), wykonując join na podstawie kluczy i przekształcając wynik:
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset

def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
    (f: T => K, g: U => K)
    (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
  val ds1_ = ds1.map(x => (f(x), x))
  val ds2_ = ds2.map(x => (g(x), x))
  ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}

Non-equijoin

Można wyrazić za pomocą operatorów algebry relacyjnej jako R θ θ s = σθ (R × S) I konwertowane bezpośrednio na kod.

Spark 2.0

WŁĄCZ crossJoin i użyj joinWith z trywialnie równym predykatem:

spark.conf.set("spark.sql.crossJoin.enabled", true)

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
                         (p: (T, U) => Boolean) = {
  ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}

Spark 2.1

Użyj metody crossJoin:

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
    (p: (T, U) => Boolean)
    (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
  ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}

Przykłady

case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)

val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
  LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS

safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)

Uwagi

  • Należy zauważyć, że metody te różnią się jakościowo od bezpośredniego zastosowania joinWith i wymagają kosztownych DeserializeToObject / SerializeFromObject transformacje (w porównaniu do bezpośredniego joinWith mogą używać operacji logicznych na danych).

    Jest to podobne do zachowania opisanego w Spark 2.0 Dataset vs DataFrame.
  • Jeśli nie jesteś ograniczony do Spark SQL API frameless zapewnia interesujące rozszerzenia typu safe dla Datasets (na dzień dzisiejszy obsługuje tylko Spark 2.0):

    import frameless.TypedDataset
    
    val typedPoints1 = TypedDataset.create(points1)
    val typedPoints2 = TypedDataset.create(points2)
    
    typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
    
  • Dataset API nie jest stabilne w 1.6, więc nie sądzę, że ma sens, aby używać go tam.

  • Oczywiście ten projekt i nazwy opisowe nie są konieczne. Możesz łatwo Użyj klasy type, aby dodać te metody w sposób niejawny do Dataset i nie ma konfliktu z wbudowanymi podpisami, więc obie mogą być wywołane joinWith.

 23
Author: user6910411,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:10:05