Zastąp określone partycje w Spark dataframe write method
Chcę nadpisać określone partycje zamiast wszystkich w spark. Próbuję wykonać następujące polecenie:
df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')
Gdzie df to ramka danych z nadpisanymi przyrostowymi danymi.
Hdfs-base-path zawiera dane podstawowe.
Kiedy wypróbuję powyższe polecenie, usuwa wszystkie partycje i wstawia te obecne w df na ścieżce hdfs.
Moim wymaganiem jest nadpisanie tylko tych partycji obecnych w df na podanej ścieżce hdfs. Czy ktoś może proszę, pomóż mi w tym?
8 answers
To powszechny problem. Jedynym rozwiązaniem z Spark do 2.0 jest zapis bezpośrednio do katalogu partycji, np.
df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
Jeśli używasz Spark przed 2.0, musisz powstrzymać Spark przed wysyłaniem plików metadanych (ponieważ będą one łamać automatyczne wykrywanie partycji) za pomocą:
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
Jeśli używasz Sparka przed wersją 1.6.2, musisz również usunąć plik _SUCCESS
w /root/path/to/data/partition_col=value
lub jego obecność spowoduje złamanie automatycznego wykrywania partycji. (Zdecydowanie polecam korzystanie 1.6.2 lub później.)
Możesz uzyskać więcej informacji o tym, jak zarządzać dużymi partycjami tabel z mojej rozmowy Spark Summit na Bulletproof Jobs .
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-07-25 20:15:38
Nareszcie! Jest to teraz Funkcja W Spark 2.3.0: https://issues.apache.org/jira/browse/SPARK-20236
Aby go użyć, musisz ustawić spark.sql.źródła.partitionOverwriteMode ustawienie na dynamiczny, zbiór danych musi być podzielony na partycje, a tryb zapisu nadpisać . Przykład:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
Polecam zrobić repartycję na podstawie kolumny partycji przed napisaniem, więc nie skończysz z 400 plikami na folder.
Przed Spark 2.3.0, najlepszy rozwiązaniem byłoby uruchomienie instrukcji SQL, aby usunąć te partycje, a następnie napisać je z mode append.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-07-19 14:45:53
Za Pomocą Spark 1.6...
HiveContext może znacznie uprościć ten proces. Kluczem jest to, że musisz najpierw utworzyć tabelę w Hive używając instrukcji CREATE EXTERNAL TABLE
ze zdefiniowanym partycjonowaniem. Na przykład:
# Hive SQL
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/tables/test'
Stąd Załóżmy, że masz ramkę danych z nowymi rekordami dla określonej partycji (lub wielu partycji). Możesz użyć polecenia HiveContext SQL do wykonania INSERT OVERWRITE
za pomocą tej ramki danych, która nadpisze tabelę tylko dla partycji zawartych w Dataframe:
# PySpark
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable('update_dataframe')
hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
SELECT name, age
FROM update_dataframe""")
Uwaga: update_dataframe
w tym przykładzie znajduje się schemat, który pasuje do docelowej tabeli test
.
Jednym z łatwych błędów w tym podejściu jest pominięcie kroku CREATE EXTERNAL TABLE
w Hive i po prostu utworzenie tabeli przy użyciu metod zapisu API Dataframe. W szczególności dla tabel opartych na parkiecie, tabela nie będzie zdefiniowana odpowiednio do obsługi funkcji INSERT OVERWRITE... PARTITION
Ula.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-01-24 20:24:35
Jeśli używasz DataFrame, być może chcesz użyć tabeli Hive zamiast danych. W tym przypadku potrzebujesz tylko metody call
df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name)
Nadpisze partycje, które zawiera ramka danych.
Nie ma konieczności podawania formatu (orc), ponieważ Spark będzie używał formatu tabeli Hive.
Działa dobrze w wersji Spark 1.6
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-12-20 07:33:01
Próbowałem poniżej nadpisać partycję w tabeli HIVE.
### load Data and check records
raw_df = spark.table("test.original")
raw_df.count()
lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925
### Check data in few partitions.
sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag")
print "Number of records: ", sample.count()
sample.show()
### Back-up the partitions before deletion
raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite")
### UDF : To delete particular partition.
def delete_part(table, part):
qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")"
spark.sql(qry)
### Delete partitions
part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct()
part_list = part_df.rdd.map(lambda x : x[0]).collect()
table = "test.original"
for p in part_list:
delete_part(table, p)
### Do the required Changes to the columns in partitions
df = spark.table("test.original_bkp")
newdf = df.withColumn("c_preferred_cust_flag", lit("Y"))
newdf.select("c_customer_sk", "c_preferred_cust_flag").show()
### Write the Partitions back to Original table
newdf.write.insertInto("test.original")
### Verify data in Original table
orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show()
Hope it helps.
Regards,
Neeraj
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-07-26 08:30:13
Możesz zrobić coś takiego, aby praca stała się aktywna (idempotent): (próbowałem tego na spark 2.2)
# drop the partition
drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition)
print drop_query
spark.sql(drop_query)
# delete directory
dbutils.fs.rm(<partition_directoy>,recurse=True)
# Load the partition
df.write\
.partitionBy("partition_col")\
.saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-02-27 02:32:37
Proponuję zrobić porządki, a następnie napisać nowe partycje w trybie Append
:
import scala.sys.process._
def deletePath(path: String): Unit = {
s"hdfs dfs -rm -r -skipTrash $path".!
}
df.select(partitionColumn).distinct.collect().foreach(p => {
val partition = p.getAs[String](partitionColumn)
deletePath(s"$path/$partitionColumn=$partition")
})
df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path)
Spowoduje to usunięcie tylko nowych partycji. Po zapisaniu danych uruchom to polecenie, jeśli chcesz zaktualizować metastore:
sparkSession.sql(s"MSCK REPAIR TABLE $db.$table")
Uwaga: deletePath
zakłada, że polecenie hfds
jest dostępne w Twoim systemie.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-05-04 08:27:26
Zamiast pisać bezpośrednio do tabeli docelowej, sugerowałbym utworzenie tymczasowej tabeli, takiej jak tabela docelowa i wstaw tam swoje dane.
CREATE TABLE tmpTbl LIKE trgtTbl LOCATION '<tmpLocation';
Po utworzeniu tabeli zapisujesz swoje dane do tmpLocation
df.write.mode("overwrite").partitionBy("p_col").orc(tmpLocation)
Następnie można odzyskać ścieżki partycji tabeli wykonując:
MSCK REPAIR TABLE tmpTbl;
Uzyskaj ścieżki partycji, pytając o metadane Hive, takie jak:
SHOW PARTITONS tmpTbl;
Usuń te partycje z trgtTbl
I przenieś katalogi z tmpTbl
do trgtTbl
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-09-07 06:51:06