Jak używać JDBC source do zapisu i odczytu danych w (Py)Spark?

Celem tego pytania jest udokumentowanie:

  • Kroki wymagane do odczytu i zapisu danych przy użyciu połączeń JDBC w PySpark

  • Możliwe problemy ze źródłami JDBC i rozwiązaniami know

Przy niewielkich zmianach metody te powinny działać z innymi obsługiwanymi językami, w tym Scala i R.

Author: zero323, 2015-06-22

3 answers

Zapis danych

  1. Dołącz odpowiedni sterownik JDBC podczas wysyłania aplikacji lub uruchamiania powłoki. Możesz użyć na przykład --packages:

    bin/pyspark --packages group:name:version  
    

    Lub połączenie driver-class-path i jars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
    

    Te właściwości można również ustawić za pomocą zmiennej środowiskowej PYSPARK_SUBMIT_ARGS przed uruchomieniem instancji JVM lub używając conf/spark-defaults.conf do Ustawienia spark.jars.packages lub spark.jars / spark.driver.extraClassPath.

  2. Wybierz żądany tryb. Spark JDBC writer obsługuje następujące tryby:]}

    • append: dołącza zawartość this: class: DataFrame do istniejących danych.
    • overwrite: Zastąp istniejące dane.
    • ignore: po cichu ignoruj tę operację, jeśli dane już istnieją.
    • error (domyślny przypadek): wyrzuca wyjątek, jeśli dane już istnieją.

    Upserts lub inne drobnoziarniste modyfikacje nie są obsługiwane

    mode = ...
    
  3. Przygotuj JDBC URI, na przykład:

    # You can encode credentials in URI or pass
    # separately using properties argument
    # of jdbc method or options
    
    url = "jdbc:postgresql://localhost/foobar"
    
  4. (opcjonalne) Utwórz słownik argumentów JDBC.

    properties = {
        "user": "foo",
        "password": "bar"
    }
    
  5. DataFrame.write.jdbc

    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

    Aby zapisać dane (Patrz pyspark.sql.DataFrameWriter po szczegóły).

Znane problemy :

  • Odpowiedniego sterownika nie można znaleźć, gdy sterownik został włączony za pomocą --packages (java.sql.SQLException: No suitable driver found for jdbc: ...)

    Zakładając, że nie ma niedopasowania wersji sterowników, aby rozwiązać ten problem, możesz dodać klasę driver do properties. Na przykład:

    properties = {
        ...
        "driver": "org.postgresql.Driver"
    }
    
  • Użycie df.write.format("jdbc").options(...).save() może spowodować in:

    Java.lang.RuntimeException: org.Apacz.Iskra.sql.egzekucja.źródła danych.jdbc.DefaultSource nie zezwala na tworzenie tabeli jako select.

    Rozwiązanie nieznane.

  • W Pyspark 1.3 możesz spróbować wywołać metodę Javy bezpośrednio:

    df._jdf.insertIntoJDBC(url, "baz", True)
    

Odczyt danych

  1. wykonaj kroki 1-4 z zapisywanie danych
  2. sqlContext.read.jdbc:

    sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

    Lub sqlContext.read.format("jdbc"):

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())
    

Znane problemy i Gotcha :

  • nie można znaleźć odpowiedniego sterownika-patrz: zapis danych
  • Spark SQL obsługuje predykat pushdown ze źródłami JDBC, chociaż nie wszystkie predykaty mogą zostać zepchnięte w dół. Nie deleguje również limitów ani agregacji. Możliwe obejście to wymiana dbtable / table argument z poprawnym zapytaniem podrzędnym. Zobacz na przykład:
  • Domyślnie źródła danych JDBC ładują dane sekwencyjnie za pomocą jednego wątku wykonywalnego. Aby zapewnić rozproszone ładowanie danych można:

    • zapewnić partycjonowanie column (musi być IntegeType), lowerBound, upperBound, numPartitions.
    • podaj listę wzajemnie wykluczających się predykatów predicates, po jednym dla każdej żądanej partycji.
  • W trybie rozproszonym (z kolumną partycjonowania lub predykatami) każdy executor działa we własnej transakcji. Jeśli źródłowa baza danych jest modyfikowana w tym samym czasie, nie ma gwarancji, że końcowy widok będzie spójny.

Gdzie znaleźć odpowiednie sterowniki:

 68
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:30:57

Pobierz mysql-connector-java driver i pozostań w folderze Spark jar, obserwuj poniższy kod Pythona zapisując dane do "acotr1", musimy utworzyć strukturę tabeli acotr1 w bazie danych mysql

    spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()

    sc = spark.sparkContext

    from pyspark.sql import SQLContext

    sqlContext = SQLContext(sc)

    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="Ramyam01").load()

    mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=Ramyam01"

    df.write.jdbc(mysql_url,table="actor1",mode="append")
 -1
Author: y durga prasad,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-09-05 05:06:45

Skorzystaj z tego linku, aby pobrać jdbc dla postgres i wykonaj kroki, aby pobrać plik jar

Https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html plik jar zostanie pobrany w ścieżce w ten sposób. "/home / anand/ivy2 / jars / org.postgresql_postgresql-42.1.1.jar "

Jeśli Twoja wersja spark to 2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()


print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

Zapisz plik jako python i uruchom "python respectivefilename.py"

 -2
Author: anand ml,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-02-22 07:11:59