Save Spark dataframe as dynamic partitioned table in Hive

Mam przykładową aplikację pracującą do odczytu z plików csv do ramki danych. Ramkę danych można zapisać do tabeli ula w formacie parquet przy użyciu metody df.saveAsTable(tablename,mode).

Powyższy kod działa dobrze, ale mam tyle danych na każdy dzień, że chcę dynamicznie partycjonować tabelę hive w oparciu o creationdate (kolumnę w tabeli).

Czy Jest jakiś sposób na dynamiczną partycję ramki danych i przechowywanie jej w magazynie ula. Chcesz powstrzymać się od twardego kodowania wkładki oświadczenie za pomocą hivesqlcontext.sql(insert into table partittioin by(date)....).

Pytanie może być traktowane jako rozszerzenie do: Jak zapisać DataFrame bezpośrednio do Hive?

Każda pomoc jest mile widziana.
Author: Community, 2015-07-10

5 answers

Wierzę, że działa coś takiego:

df jest ramką danych z kolumnami Rok, Miesiąc i inne

df.write.partitionBy('year', 'month').saveAsTable(...)

Lub

df.write.partitionBy('year', 'month').insertInto(...)
 17
Author: mdurant,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-07-12 19:15:00

Udało mi się napisać do partycjonowanej tabeli ula używając df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")

Musiałem włączyć następujące właściwości, aby to działało.

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
 26
Author: Jins George,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-25 08:59:56

Ja również zmierzyłem się z tym samym, ale za pomocą następujących sztuczek rozwiązałem.

  1. Kiedy wykonujemy dowolną tabelę jako partycjonowaną, wtedy partycjonowana kolumna staje się wrażliwa na wielkość liter.

  2. Dzielona kolumna powinna być obecna w ramce danych o tej samej nazwie (rozróżnia wielkość liter). Kod:

    var dbName="your database name"
    var finaltable="your table name"
    
    // First check if table is available or not..
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
         //If table is not available then it will create for you..
         println("Table Not Present \n  Creating table " + finaltable)
         sparkSession.sql("use Database_Name")
         sparkSession.sql("SET hive.exec.dynamic.partition = true")
         sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
         sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
         sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID        string,EMP_Name          string,EMP_Address               string,EMP_Salary    bigint)  PARTITIONED BY (EMP_DEP STRING)")
         //Table is created now insert the DataFrame in append Mode
         df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
    }
    
 6
Author: Nilesh Shinde,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-08-16 06:51:10

To mi pasuje. Ustawiam te ustawienia, a następnie umieszczam dane w partycjonowanych tabelach.

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", 
"nonstrict")
 0
Author: Shaunak Bangale,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-07-29 22:24:09

To działało dla mnie przy użyciu Pythona i spark 2.1.0.

Nie wiem, czy to najlepszy sposób, ale to działa...
# WRITE DATA INTO A HIVE TABLE
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

### CREATE HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'hive_df'
""")
spark.sql("""
INSERT INTO hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###

### CREATE NON HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION 'non_hive_df'
""")
spark.sql("""
INSERT INTO non_hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###

### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
spark.sql("""
INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")

spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite
 0
Author: isichei,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-09-17 14:43:07