Spark SQL replacement for MySQL GROUP CONCAT aggregate function
Mam tabelę dwóch kolumn typu string (username, friend) I dla każdej nazwy użytkownika chcę zebrać wszystkich znajomych w jednym wierszu, skonkatenowanych jako łańcuchy ('username1', 'friends1, friends2, friends3'). Wiem, że MySql robi to przez GROUP_CONCAT, czy jest jakiś sposób, aby to zrobić z SPARK SQL?
Thanks
5 answers
Zanim zaczniesz: Ta operacja jest jeszcze inna groupByKey
. Chociaż ma wiele legalnych aplikacji, jest stosunkowo drogi, więc należy go używać tylko wtedy, gdy jest to wymagane.
Niezbyt zwięzłe i wydajne rozwiązanie, ale można użyć UserDefinedAggregateFunction
wprowadzonego w Spark 1.5.0:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
Przykładowe użycie:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
Możesz również utworzyć wrapper Pythona, jak pokazano w Spark: jak mapować Pythona za pomocą funkcji Scala lub Java zdefiniowanych przez użytkownika?
W praktyka może być szybciej wyodrębnić RDD, groupByKey
, mkString
i odbudować ramkę danych.
Możesz uzyskać podobny efekt łącząc collect_list
function (Spark >= 1.6.0) z concat_ws
:
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-02 06:19:53
Możesz wypróbować funkcję collect_list
sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A
Albo możesz regieterować UDF coś w stylu
sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))
I możesz użyć tej funkcji w zapytaniu
sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-16 09:24:01
Jeden sposób, aby to zrobić z pyspark
byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y)
I jeśli chcesz zrobić z tego ramkę danych jeszcze raz:
sqlContext.createDataFrame(byUsername, ["username", "friends"])
Od wersji 1.6 możesz użyć collect_list , a następnie dołączyć do utworzonej listy:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
join_ = F.udf(lambda x: ", ".join(x), StringType())
df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends"))
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-01-25 00:07:13
Oto funkcja, której możesz użyć w PySpark:
import pyspark.sql.functions as F
def group_concat(col, distinct=False, sep=','):
if distinct:
collect = F.collect_set(col.cast(StringType()))
else:
collect = F.collect_list(col.cast(StringType()))
return F.concat_ws(sep, collect)
table.groupby('username').agg(F.group_concat('friends').alias('friends'))
W SQL:
select username, concat_ws(',', collect_list(friends)) as friends
from table
group by username
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-06 17:55:34
Język : Scala wersja Spark: 1.5.2
Miałem ten sam problem i próbowałem go rozwiązać za pomocą udfs
, ale, niestety, doprowadziło to do kolejnych problemów w kodzie z powodu niespójności typu. Udało mi się to obejść, najpierw konwertując {[2] } na RDD
, a następnie grupując przez i manipulując danymi w pożądany sposób, a następnie konwertując RDD
z powrotem na DF
w następujący sposób:
val df = sc
.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")))
.toDF("username", "friend")
+---------+-------+
| username| friend|
+---------+-------+
|username1|friend1|
|username1|friend2|
|username2|friend1|
|username2|friend3|
+---------+-------+
val dfGRPD = df.map(Row => (Row(0), Row(1)))
.groupByKey()
.map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))}
.toDF("username", "groupOfFriends")
+---------+---------------+
| username| groupOfFriends|
+---------+---------------+
|username1|friend2,friend1|
|username2|friend3,friend1|
+---------+---------------+
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-15 10:59:59