Jak używać JDBC source do zapisu i odczytu danych w (Py)Spark?
Celem tego pytania jest udokumentowanie:
-
Kroki wymagane do odczytu i zapisu danych przy użyciu połączeń JDBC w PySpark
-
Możliwe problemy ze źródłami JDBC i rozwiązaniami know
Przy niewielkich zmianach metody te powinny działać z innymi obsługiwanymi językami, w tym Scala i R.
3 answers
Zapis danych
-
Dołącz odpowiedni sterownik JDBC podczas wysyłania aplikacji lub uruchamiania powłoki. Możesz użyć na przykład
--packages
:bin/pyspark --packages group:name:version
Lub połączenie
driver-class-path
ijars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
Te właściwości można również ustawić za pomocą zmiennej środowiskowej
PYSPARK_SUBMIT_ARGS
przed uruchomieniem instancji JVM lub używającconf/spark-defaults.conf
do Ustawieniaspark.jars.packages
lubspark.jars
/spark.driver.extraClassPath
. -
Wybierz żądany tryb. Spark JDBC writer obsługuje następujące tryby:]}
-
append
: dołącza zawartość this: class:DataFrame
do istniejących danych. -
overwrite
: Zastąp istniejące dane. -
ignore
: po cichu ignoruj tę operację, jeśli dane już istnieją. -
error
(domyślny przypadek): wyrzuca wyjątek, jeśli dane już istnieją.
Upserts lub inne drobnoziarniste modyfikacje nie są obsługiwane
mode = ...
-
-
Przygotuj JDBC URI, na przykład:
# You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"
-
(opcjonalne) Utwórz słownik argumentów JDBC.
properties = { "user": "foo", "password": "bar" }
-
DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
Aby zapisać dane (Patrz
pyspark.sql.DataFrameWriter
po szczegóły).
Znane problemy :
-
Odpowiedniego sterownika nie można znaleźć, gdy sterownik został włączony za pomocą
--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)Zakładając, że nie ma niedopasowania wersji sterowników, aby rozwiązać ten problem, możesz dodać klasę
driver
doproperties
. Na przykład:properties = { ... "driver": "org.postgresql.Driver" }
-
Użycie
df.write.format("jdbc").options(...).save()
może spowodować in:Java.lang.RuntimeException: org.Apacz.Iskra.sql.egzekucja.źródła danych.jdbc.DefaultSource nie zezwala na tworzenie tabeli jako select.
Rozwiązanie nieznane.
-
W Pyspark 1.3 możesz spróbować wywołać metodę Javy bezpośrednio:
df._jdf.insertIntoJDBC(url, "baz", True)
Odczyt danych
- wykonaj kroki 1-4 z zapisywanie danych
-
sqlContext.read.jdbc
:sqlContext.read.jdbc(url=url, table="baz", properties=properties)
Lub
sqlContext.read.format("jdbc")
:(sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load())
Znane problemy i Gotcha :
- nie można znaleźć odpowiedniego sterownika-patrz: zapis danych
- Spark SQL obsługuje predykat pushdown ze źródłami JDBC, chociaż nie wszystkie predykaty mogą zostać zepchnięte w dół. Nie deleguje również limitów ani agregacji. Możliwe obejście to wymiana
dbtable
/table
argument z poprawnym zapytaniem podrzędnym. Zobacz na przykład: -
Domyślnie źródła danych JDBC ładują dane sekwencyjnie za pomocą jednego wątku wykonywalnego. Aby zapewnić rozproszone ładowanie danych można:
- zapewnić partycjonowanie
column
(musi byćIntegeType
),lowerBound
,upperBound
,numPartitions
. - podaj listę wzajemnie wykluczających się predykatów
predicates
, po jednym dla każdej żądanej partycji.
- zapewnić partycjonowanie
W trybie rozproszonym (z kolumną partycjonowania lub predykatami) każdy executor działa we własnej transakcji. Jeśli źródłowa baza danych jest modyfikowana w tym samym czasie, nie ma gwarancji, że końcowy widok będzie spójny.
Gdzie znaleźć odpowiednie sterowniki:
-
Maven Repository (aby uzyskać wymagane Współrzędne dla
--packages
wybierz żądaną wersję i skopiuj dane z karty Gradle w postacicompile-group:name:version
zastępującej odpowiednie pola) lub Maven Central Repozytorium :
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:30:57
Pobierz mysql-connector-java driver i pozostań w folderze Spark jar, obserwuj poniższy kod Pythona zapisując dane do "acotr1", musimy utworzyć strukturę tabeli acotr1 w bazie danych mysql
spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="Ramyam01").load()
mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=Ramyam01"
df.write.jdbc(mysql_url,table="actor1",mode="append")
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-09-05 05:06:45
Skorzystaj z tego linku, aby pobrać jdbc dla postgres i wykonaj kroki, aby pobrać plik jar
Https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html plik jar zostanie pobrany w ścieżce w ten sposób. "/home / anand/ivy2 / jars / org.postgresql_postgresql-42.1.1.jar "
Jeśli Twoja wersja spark to 2
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("sparkanalysis")
.config("spark.driver.extraClassPath",
"/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
.getOrCreate()
//for localhost database//
pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()
print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()
Zapisz plik jako python i uruchom "python respectivefilename.py"
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-02-22 07:11:59