How to use JDBC source to write and read data in (Py)Spark?
Asked Answered
R

3

81

The goal of this question is to document:

  • steps required to read and write data using JDBC connections in PySpark

  • possible issues with JDBC sources and know solutions

With small changes these methods should work with other supported languages including Scala and R.

Responsory answered 22/6, 2015 at 15:30 Comment(0)
R
143

Writing data

  1. Include applicable JDBC driver when you submit the application or start shell. You can use for example --packages:

     bin/pyspark --packages group:name:version  
    

or combining driver-class-path and jars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

These properties can be also set using PYSPARK_SUBMIT_ARGS environment variable before JVM instance has been started or using conf/spark-defaults.conf to set spark.jars.packages or spark.jars / spark.driver.extraClassPath.

  1. Choose desired mode. Spark JDBC writer supports following modes:

    • append: Append contents of this :class:DataFrame to existing data.
    • overwrite: Overwrite existing data.
    • ignore: Silently ignore this operation if data already exists.
    • error (default case): Throw an exception if data already exists.

    Upserts or other fine-grained modifications are not supported

     mode = ...
    
  2. Prepare JDBC URI, for example:

     # You can encode credentials in URI or pass
     # separately using properties argument
     # of jdbc method or options
    
     url = "jdbc:postgresql://localhost/foobar"
    
  3. (Optional) Create a dictionary of JDBC arguments.

     properties = {
         "user": "foo",
         "password": "bar"
     }
    

    properties / options can be also used to set supported JDBC connection properties.

  4. Use DataFrame.write.jdbc

     df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

to save the data (see pyspark.sql.DataFrameWriter for details).

Known issues:

  • Suitable driver cannot be found when driver has been included using --packages (java.sql.SQLException: No suitable driver found for jdbc: ...)

    Assuming there is no driver version mismatch to solve this you can add driver class to the properties. For example:

      properties = {
          ...
          "driver": "org.postgresql.Driver"
      }
    
  • using df.write.format("jdbc").options(...).save() may result in:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.

    Solution unknown.

  • in Pyspark 1.3 you can try calling Java method directly:

      df._jdf.insertIntoJDBC(url, "baz", True)
    

Reading data

  1. Follow steps 1-4 from Writing data

  2. Use sqlContext.read.jdbc:

     sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

or sqlContext.read.format("jdbc"):

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())

Known issues and gotchas:

Where to find suitable drivers:

Other options

Depending on the database specialized source might exist, and be preferred in some cases:

Responsory answered 22/6, 2015 at 15:30 Comment(1)
mode="overwrite" use this command: spark_submit --driver-class-path /xx/yy/postgresql-xx.jar my-script.pyGottschalk
S
-1

Download mysql-connector-java driver and keep in spark jar folder,observe the bellow python code here writing data into "acotr1",we have to create acotr1 table structure in mysql database

    spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()

    sc = spark.sparkContext

    from pyspark.sql import SQLContext

    sqlContext = SQLContext(sc)

    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()

    mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"

    df.write.jdbc(mysql_url,table="actor1",mode="append")
Ssr answered 22/3, 2017 at 11:12 Comment(0)
T
-3

Refer this link to download the jdbc for postgres and follow the steps to download jar file

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar file will be download in the path like this. "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"

If your spark version is 2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()


print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

save the file as python and run "python respectivefilename.py"

Theophilus answered 22/2, 2018 at 7:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.