Wykonaj wpisane połączenie w Scali za pomocą zestawów danych Spark
Lubię zbiory danych Spark, ponieważ dają mi błędy analizy i błędy składni podczas kompilacji, a także pozwalają mi pracować z getterami zamiast ciężko zakodowanych nazw/liczb. Większość obliczeń można wykonać za pomocą interfejsów API wysokiego poziomu zestawu danych. Na przykład o wiele prostsze jest wykonywanie operacji agg, select, sum, avg, map, filter lub groupBy poprzez dostęp do obiektu wpisanego w zestawie danych niż korzystanie z pól danych RDD rows.
Jednak brakuje w tym operacji join, czytałem, że można zrobić taki join
ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")
Ale nie tego chcę, ponieważ wolałbym to zrobić poprzez interfejs klasy case, więc coś bardziej podobnego
ds1.joinWith(ds2, ds1.key === ds2.key, "inner")
Najlepszą alternatywą na razie wydaje się stworzenie obiektu obok klasy case i podanie tej funkcji, która zapewni mi nazwę prawej kolumny jako ciąg znaków. Użyłbym więc pierwszej linii kodu, ale zamiast mocno zakodowanej nazwy kolumny wstawiłbym funkcję. Ale to nie wydaje się wystarczająco eleganckie..
Czy ktoś może mi doradzić w sprawie inne opcje tutaj? Celem jest uzyskanie abstrakcji z rzeczywistych nazw kolumn i praca najlepiej za pośrednictwem getterów klasy case.Używam Spark 1.6.1 i Scala 2.10
1 answers
Obserwacja
Spark SQL może zoptymalizować join tylko wtedy, gdy warunek join jest oparty na operatorze równości. Oznacza to, że możemy rozważyć equijoins i non-equijoins oddzielnie.
Equijoin
Equijoin może być zaimplementowany w bezpieczny sposób, mapując zarównoDatasets
do krotek (klucz, wartość), wykonując join na podstawie kluczy i przekształcając wynik:
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset
def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
(f: T => K, g: U => K)
(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
val ds1_ = ds1.map(x => (f(x), x))
val ds2_ = ds2.map(x => (g(x), x))
ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}
Non-equijoin
Można wyrazić za pomocą operatorów algebry relacyjnej jako R θ θ s = σθ (R × S) I konwertowane bezpośrednio na kod.
Spark 2.0
WŁĄCZ crossJoin
i użyj joinWith
z trywialnie równym predykatem:
spark.conf.set("spark.sql.crossJoin.enabled", true)
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean) = {
ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}
Spark 2.1
Użyj metody crossJoin
:
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean)
(implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}
Przykłady
case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)
val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS
safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)
Uwagi
-
Należy zauważyć, że metody te różnią się jakościowo od bezpośredniego zastosowania
Jest to podobne do zachowania opisanego w Spark 2.0 Dataset vs DataFrame.joinWith
i wymagają kosztownychDeserializeToObject
/SerializeFromObject
transformacje (w porównaniu do bezpośredniegojoinWith
mogą używać operacji logicznych na danych). -
Jeśli nie jesteś ograniczony do Spark SQL API
frameless
zapewnia interesujące rozszerzenia typu safe dlaDatasets
(na dzień dzisiejszy obsługuje tylko Spark 2.0):import frameless.TypedDataset val typedPoints1 = TypedDataset.create(points1) val typedPoints2 = TypedDataset.create(points2) typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
Dataset
API nie jest stabilne w 1.6, więc nie sądzę, że ma sens, aby używać go tam.Oczywiście ten projekt i nazwy opisowe nie są konieczne. Możesz łatwo Użyj klasy type, aby dodać te metody w sposób niejawny do
Dataset
i nie ma konfliktu z wbudowanymi podpisami, więc obie mogą być wywołanejoinWith
.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:10:05