Converting mysql table to spark dataset is very slow compared to same from csv file
Asked Answered
B

3

6

I have csv file in Amazon s3 with is 62mb in size (114 000 rows). I am converting it into spark dataset, and taking first 500 rows from it. Code is as follow;

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set=df.load("s3n://"+this.accessId.replace("\"", "")+":"+this.accessToken.replace("\"", "")+"@"+this.bucketName.replace("\"", "")+"/"+this.filePath.replace("\"", "")+"");

 set.take(500)

The whole operation takes 20 to 30 sec.

Now I am trying the same but rather using csv I am using mySQL table with 119 000 rows. MySQL server is in amazon ec2. Code is as follow;

String url ="jdbc:mysql://"+this.hostName+":3306/"+this.dataBaseName+"?user="+this.userName+"&password="+this.password;
    
SparkSession spark=StartSpark.getSparkSession();

SQLContext sc = spark.sqlContext();

Dataset<Row> set = sc
            .read()
            .option("url", url)
            .option("dbtable", this.tableName)
            .option("driver","com.mysql.jdbc.Driver")
            .format("jdbc")
            .load();
set.take(500);

This is taking 5 to 10 minutes. I am running spark inside jvm. Using same configuration in both cases.

I can use partitionColumn,numParttition etc but I don't have any numeric column and one more issue is the schema of the table is unknown to me.

My issue is not how to decrease the required time as I know in ideal case spark will run in cluster but what I can not understand is why this big time difference in the above two case?

Baronetage answered 9/3, 2017 at 13:18 Comment(0)
D
10

This problem has been covered multiple times on StackOverflow:

and in external sources:

so just to reiterate - by default DataFrameReader.jdbc doesn't distribute data or reads. It uses single thread, single exectuor.

To distribute reads:

  • use ranges with lowerBound / upperBound:

    Properties properties;
    Lower
    
    Dataset<Row> set = sc
        .read()
        .option("partitionColumn", "foo")
        .option("numPartitions", "3")
        .option("lowerBound", 0)
        .option("upperBound", 30)
        .option("url", url)
        .option("dbtable", this.tableName)
        .option("driver","com.mysql.jdbc.Driver")
        .format("jdbc")
        .load();
    
  • predicates

    Properties properties;
    Dataset<Row> set = sc
        .read()
        .jdbc(
            url, this.tableName,
            {"foo < 10", "foo BETWWEN 10 and 20", "foo > 20"},
            properties
        )
    
Dibasic answered 12/3, 2017 at 13:59 Comment(2)
I'm trying to read MySQL table having 500M rows with numPartitions=32. Still the reading using Spark is much slower than sqoop (also with 32 tasks). I even tried setting fetchsize to higher value (1k or 10k) but no gain. I'm using standard Connector/J v5.1.41 and I'm on MySQL v5.6.Brotherton
is there a way to improve parallelism in spark 2.4 for jdbc reader?Sememe
A
0

It's understandable that reading a big file is faster than reading multiple rows from a database.

You may improve the database read performance by adding the following option:

.option("fetchSize", 10000)

The optimal fetch size must determined based on your use-case (for me, 10000 gives me a 5x improvement).

An alternative is to make partitioning (e.g. see here https://mcmap.net/q/260514/-how-to-improve-performance-for-slow-spark-jobs-using-dataframe-and-jdbc-connection )

Amritsar answered 23/5 at 9:26 Comment(0)
H
-2

Please follow the steps below

1.download a copy of the JDBC connector for mysql. I believe you already have one.

wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar

2.create a db-properties.flat file in the below format

jdbcUrl=jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}
user=<username>
password=<password>

3.create a empty table first where you want to load the data.

invoke spark shell with driver class

spark-shell --driver-class-path  <your path to mysql jar>

then import all the required package

import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

initiate a hive context or a sql context

val sQLContext = new HiveContext(sc)
import sQLContext.implicits._
import sQLContext.sql

set some of the properties

sQLContext.setConf("hive.exec.dynamic.partition", "true")
sQLContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

Load mysql db properties from file

val dbProperties = new Properties()
dbProperties.load(new FileInputStream(new File("your_path_to/db-        properties.flat")))
val jdbcurl = dbProperties.getProperty("jdbcUrl")

create a query to read the data from your table and pass it to read method of #sqlcontext. this is where you can manage your where clause

val df1 = "(SELECT  * FROM your_table_name) as s1" 

pass the jdbcurl, select query and db properties to read method

val df2 = sQLContext.read.jdbc(jdbcurl, df1, dbProperties)

write it to your table

df2.write.format("orc").partitionBy("your_partition_column_name").mode(SaveMode.Append).saveAsTable("your_target_table_name")
Hornbill answered 15/3, 2017 at 16:49 Comment(2)
What have you done differently that you think would solve the asked question, i.e. slow reading of MySQL table into DataFrame? Can you point out a mistake / inefficiency in the code snippets provided in the question that is improved in some way by the snippets provided by you?Brotherton
The partitioning may improve the database read performance, see https://mcmap.net/q/260514/-how-to-improve-performance-for-slow-spark-jobs-using-dataframe-and-jdbc-connection (I didn't test)Amritsar

© 2022 - 2024 — McMap. All rights reserved.