Zastąp określone partycje w Spark dataframe write method

Chcę nadpisać określone partycje zamiast wszystkich w spark. Próbuję wykonać następujące polecenie:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')

Gdzie df to ramka danych z nadpisanymi przyrostowymi danymi.

Hdfs-base-path zawiera dane podstawowe.

Kiedy wypróbuję powyższe polecenie, usuwa wszystkie partycje i wstawia te obecne w df na ścieżce hdfs.

Moim wymaganiem jest nadpisanie tylko tych partycji obecnych w df na podanej ścieżce hdfs. Czy ktoś może proszę, pomóż mi w tym?

Author: Prasad Khode, 2016-07-20

8 answers

To powszechny problem. Jedynym rozwiązaniem z Spark do 2.0 jest zapis bezpośrednio do katalogu partycji, np.

df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")

Jeśli używasz Spark przed 2.0, musisz powstrzymać Spark przed wysyłaniem plików metadanych (ponieważ będą one łamać automatyczne wykrywanie partycji) za pomocą:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

Jeśli używasz Sparka przed wersją 1.6.2, musisz również usunąć plik _SUCCESS w /root/path/to/data/partition_col=value lub jego obecność spowoduje złamanie automatycznego wykrywania partycji. (Zdecydowanie polecam korzystanie 1.6.2 lub później.)

Możesz uzyskać więcej informacji o tym, jak zarządzać dużymi partycjami tabel z mojej rozmowy Spark Summit na Bulletproof Jobs .

 23
Author: Sim,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-07-25 20:15:38

Nareszcie! Jest to teraz Funkcja W Spark 2.3.0: https://issues.apache.org/jira/browse/SPARK-20236

Aby go użyć, musisz ustawić spark.sql.źródła.partitionOverwriteMode ustawienie na dynamiczny, zbiór danych musi być podzielony na partycje, a tryb zapisu nadpisać . Przykład:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")

Polecam zrobić repartycję na podstawie kolumny partycji przed napisaniem, więc nie skończysz z 400 plikami na folder.

Przed Spark 2.3.0, najlepszy rozwiązaniem byłoby uruchomienie instrukcji SQL, aby usunąć te partycje, a następnie napisać je z mode append.

 16
Author: Madhava Carrillo,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-07-19 14:45:53

Za Pomocą Spark 1.6...

HiveContext może znacznie uprościć ten proces. Kluczem jest to, że musisz najpierw utworzyć tabelę w Hive używając instrukcji CREATE EXTERNAL TABLE ze zdefiniowanym partycjonowaniem. Na przykład:

# Hive SQL
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/tables/test'

Stąd Załóżmy, że masz ramkę danych z nowymi rekordami dla określonej partycji (lub wielu partycji). Możesz użyć polecenia HiveContext SQL do wykonania INSERT OVERWRITE za pomocą tej ramki danych, która nadpisze tabelę tylko dla partycji zawartych w Dataframe:

# PySpark
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable('update_dataframe')

hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
                   SELECT name, age
                   FROM update_dataframe""")

Uwaga: update_dataframe w tym przykładzie znajduje się schemat, który pasuje do docelowej tabeli test.

Jednym z łatwych błędów w tym podejściu jest pominięcie kroku CREATE EXTERNAL TABLE w Hive i po prostu utworzenie tabeli przy użyciu metod zapisu API Dataframe. W szczególności dla tabel opartych na parkiecie, tabela nie będzie zdefiniowana odpowiednio do obsługi funkcji INSERT OVERWRITE... PARTITION Ula.

Mam nadzieję, że to pomoże.
 6
Author: vertigokidd,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-01-24 20:24:35

Jeśli używasz DataFrame, być może chcesz użyć tabeli Hive zamiast danych. W tym przypadku potrzebujesz tylko metody call

df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name)

Nadpisze partycje, które zawiera ramka danych.

Nie ma konieczności podawania formatu (orc), ponieważ Spark będzie używał formatu tabeli Hive.

Działa dobrze w wersji Spark 1.6

 1
Author: L. Viktor,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-12-20 07:33:01

Próbowałem poniżej nadpisać partycję w tabeli HIVE.

### load Data and check records
    raw_df = spark.table("test.original")
    raw_df.count()

lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925


### Check data in few partitions.
    sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag")
    print "Number of records: ", sample.count()
    sample.show()


### Back-up the partitions before deletion
    raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite")


### UDF : To delete particular partition.
    def delete_part(table, part):
        qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")"
        spark.sql(qry)


### Delete partitions
    part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct()
    part_list = part_df.rdd.map(lambda x : x[0]).collect()

    table = "test.original"
    for p in part_list:
        delete_part(table, p)


### Do the required Changes to the columns in partitions
    df = spark.table("test.original_bkp")
    newdf = df.withColumn("c_preferred_cust_flag", lit("Y"))
    newdf.select("c_customer_sk", "c_preferred_cust_flag").show()


### Write the Partitions back to Original table
    newdf.write.insertInto("test.original")


### Verify data in Original table
    orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show()



Hope it helps.

Regards,

Neeraj
 1
Author: neeraj bhadani,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-07-26 08:30:13

Możesz zrobić coś takiego, aby praca stała się aktywna (idempotent): (próbowałem tego na spark 2.2)

# drop the partition
drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition)
print drop_query
spark.sql(drop_query)

# delete directory
dbutils.fs.rm(<partition_directoy>,recurse=True)

# Load the partition
df.write\
  .partitionBy("partition_col")\
  .saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>)
 0
Author: jatin,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-02-27 02:32:37

Proponuję zrobić porządki, a następnie napisać nowe partycje w trybie Append:

import scala.sys.process._
def deletePath(path: String): Unit = {
    s"hdfs dfs -rm -r -skipTrash $path".!
}

df.select(partitionColumn).distinct.collect().foreach(p => {
    val partition = p.getAs[String](partitionColumn)
    deletePath(s"$path/$partitionColumn=$partition")
})

df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path)

Spowoduje to usunięcie tylko nowych partycji. Po zapisaniu danych uruchom to polecenie, jeśli chcesz zaktualizować metastore:

sparkSession.sql(s"MSCK REPAIR TABLE $db.$table")

Uwaga: deletePath zakłada, że polecenie hfds jest dostępne w Twoim systemie.

 0
Author: gorros,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-05-04 08:27:26

Zamiast pisać bezpośrednio do tabeli docelowej, sugerowałbym utworzenie tymczasowej tabeli, takiej jak tabela docelowa i wstaw tam swoje dane.

CREATE TABLE tmpTbl LIKE trgtTbl LOCATION '<tmpLocation';

Po utworzeniu tabeli zapisujesz swoje dane do tmpLocation

df.write.mode("overwrite").partitionBy("p_col").orc(tmpLocation)

Następnie można odzyskać ścieżki partycji tabeli wykonując:

MSCK REPAIR TABLE tmpTbl;

Uzyskaj ścieżki partycji, pytając o metadane Hive, takie jak:

SHOW PARTITONS tmpTbl;

Usuń te partycje z trgtTbl I przenieś katalogi z tmpTbl do trgtTbl

 0
Author: Joha,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-09-07 06:51:06