Spark SQL zamiennik dla MySQL GROUP CONCAT agregate function

Mam tabelę z dwoma kolumnami typu string (username, friend) i dla każdej nazwy użytkownika chcę zebrać wszystkich znajomych w jednym wierszu, skonkatenowanych jako łańcuchy. Na przykład: ('username1', 'friends1, friends2, friends3')

Wiem, że MySQL robi to z GROUP_CONCAT. Czy jest jakiś sposób, aby to zrobić za pomocą Spark SQL?

Author: Nick Chammas, 2015-07-26

9 answers

Zanim zaczniesz: Ta operacja jest jeszcze inna groupByKey. Chociaż ma wiele legalnych aplikacji, jest stosunkowo drogi, więc należy go używać tylko wtedy, gdy jest to wymagane.


Niezbyt zwięzłe i wydajne rozwiązanie, ale można użyć UserDefinedAggregateFunction wprowadzonego w Spark 1.5.0:

object GroupConcat extends UserDefinedAggregateFunction {
    def inputSchema = new StructType().add("x", StringType)
    def bufferSchema = new StructType().add("buff", ArrayType(StringType))
    def dataType = StringType
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, ArrayBuffer.empty[String])
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) 
        buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
    }

    def evaluate(buffer: Row) = UTF8String.fromString(
      buffer.getSeq[String](0).mkString(","))
}

Przykładowe użycie:

val df = sc.parallelize(Seq(
  ("username1", "friend1"),
  ("username1", "friend2"),
  ("username2", "friend1"),
  ("username2", "friend3")
)).toDF("username", "friend")

df.groupBy($"username").agg(GroupConcat($"friend")).show

## +---------+---------------+
## | username|        friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+

Możesz również utworzyć wrapper Pythona, jak pokazano w Spark: jak mapować Pythona za pomocą funkcji Scala lub Java zdefiniowanych przez użytkownika?

W praktyka może być szybciej wyodrębnić RDD, groupByKey, mkString i odbudować ramkę danych.

Możesz uzyskać podobny efekt łącząc collect_list function (Spark >= 1.6.0) z concat_ws:

import org.apache.spark.sql.functions.{collect_list, udf, lit}

df.groupBy($"username")
  .agg(concat_ws(",", collect_list($"friend")).alias("friends"))
 47
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-02 06:19:53

Możesz wypróbować funkcję collect_list

sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A

Albo możesz regieterować UDF coś w stylu

sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))

I możesz użyć tej funkcji w zapytaniu

sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")
 19
Author: iec2011007,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-16 09:24:01

Oto funkcja, której możesz użyć w PySpark:

import pyspark.sql.functions as F

def group_concat(col, distinct=False, sep=','):
    if distinct:
        collect = F.collect_set(col.cast(StringType()))
    else:
        collect = F.collect_list(col.cast(StringType()))
    return F.concat_ws(sep, collect)


table.groupby('username').agg(F.group_concat('friends').alias('friends'))

W SQL:

select username, concat_ws(',', collect_list(friends)) as friends
from table
group by username
 10
Author: rikturr,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-06 17:55:34

W Spark 2.4+ stało się to prostsze za pomocą collect_list()i array_join().

Oto demonstracja w PySpark, chociaż kod powinien być bardzo podobny do Scali:

from pyspark.sql.functions import array_join, collect_list

friends = spark.createDataFrame(
    [
        ('jacques', 'nicolas'),
        ('jacques', 'georges'),
        ('jacques', 'francois'),
        ('bob', 'amelie'),
        ('bob', 'zoe'),
    ],
    schema=['username', 'friend'],
)

(
    friends
    .orderBy('friend', ascending=False)
    .groupBy('username')
    .agg(
        array_join(
            collect_list('friend'),
            delimiter=', ',
        ).alias('friends')
    )
    .show(truncate=False)
)

Wyjście:

+--------+--------------------------+
|username|friends                   |
+--------+--------------------------+
|jacques |nicolas, georges, francois|
|bob     |zoe, amelie               |
+--------+--------------------------+

To jest podobne do MySQL 's GROUP_CONCAT() i Redshift' s LISTAGG().

 10
Author: Nick Chammas,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-12-24 19:23:23

Jeden sposób, aby to zrobić z pyspark

byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y)

I jeśli chcesz zrobić z tego ramkę danych jeszcze raz:

sqlContext.createDataFrame(byUsername, ["username", "friends"])

Od wersji 1.6 możesz użyć collect_list , a następnie dołączyć do utworzonej listy:

from pyspark.sql import functions as F
from pyspark.sql.types import StringType
join_ = F.udf(lambda x: ", ".join(x), StringType())
df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends"))
 3
Author: Kamil Sindi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-01-25 00:07:13

Język : Scala wersja Spark: 1.5.2

Miałem ten sam problem i próbowałem go rozwiązać za pomocą udfs, ale, niestety, doprowadziło to do kolejnych problemów w kodzie z powodu niespójności typu. Udało mi się to obejść, najpierw konwertując {[2] } na RDD, a następnie grupując przez i manipulując danymi w pożądany sposób, a następnie konwertując RDD z powrotem na DF w następujący sposób:

val df = sc
     .parallelize(Seq(
        ("username1", "friend1"),
        ("username1", "friend2"),
        ("username2", "friend1"),
        ("username2", "friend3")))
     .toDF("username", "friend")

+---------+-------+
| username| friend|
+---------+-------+
|username1|friend1|
|username1|friend2|
|username2|friend1|
|username2|friend3|
+---------+-------+

val dfGRPD = df.map(Row => (Row(0), Row(1)))
     .groupByKey()
     .map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))}
     .toDF("username", "groupOfFriends")

+---------+---------------+
| username| groupOfFriends|
+---------+---------------+
|username1|friend2,friend1|
|username2|friend3,friend1|
+---------+---------------+
 2
Author: Christos Hadjinikolis,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-15 10:59:59

-- spark SQL resolution with collect_set

SELECT id, concat_ws(', ', sort_array( collect_set(colors))) as csv_colors
FROM ( 
  VALUES ('A', 'green'),('A','yellow'),('B', 'blue'),('B','green') 
) as T (id, colors)
GROUP BY id
 1
Author: Auguronomics,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-11-06 13:09:01

Poniżej kodu opartego na Pythonie, który osiąga funkcjonalność group_concat.

Dane Wejściowe:

Cust_No, Cust_Cars

1, Toyota

2, BMW

1, Audi

2, Hyundai

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import pyspark.sql.functions as F

spark = SparkSession.builder.master('yarn').getOrCreate()

# Udf to join all list elements with "|"
def combine_cars(car_list,sep='|'):
  collect = sep.join(car_list)
  return collect

test_udf = udf(combine_cars,StringType())
car_list_per_customer.groupBy("Cust_No").agg(F.collect_list("Cust_Cars").alias("car_list")).select("Cust_No",test_udf("car_list").alias("Final_List")).show(20,False)

Dane Wyjściowe: Cust_No, Final_List

1, Toyota / Audi

2, BMW / Hyundai

 0
Author: Akshay Patel,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-08-21 14:22:12

Można również użyć Spark SQL funkcja collect_list i po trzeba będzie cast do łańcucha i użyć funkcji regexp_replace zastąpić znaki specjalne.

regexp_replace(regexp_replace(regexp_replace(cast(collect_list((column)) as string), ' ', ''), ',', '|'), '[^A-Z0-9|]', '')
To łatwiejszy sposób.
 0
Author: Kevin Giediel,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2021-01-28 02:27:43