Apache Spark-SQL vs Sqoop benchmarking while transferring data from RDBMS to hdfs
Asked Answered
M

7

14

I am working on a use case where I have to transfer data from RDBMS to HDFS. We have done the benchmarking of this case using sqoop and found out that we are able to transfer around 20GB data in 6-7 Mins.

Where as when I try the same with Spark SQL, the performance is very low(1 Gb of records is taking 4 min to transfer from netezza to hdfs). I am trying to do some tuning and increase its performance but its unlikely to tune it to the level of sqoop(around 3 Gb of data in 1 Min).

I agree to the fact that spark is primarily a processing engine but my main question is that both spark and sqoop are using JDBC driver internally so why there is so much difference in the performance(or may be I am missing something). I am posting my code here.

object helloWorld {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Netezza_Connection").setMaster("local")
    val sc= new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC")
    val df2 =sqlContext.sql("select * from POC")
    val partitioner= new org.apache.spark.HashPartitioner(14)
    val rdd=df2.rdd.map(x=>(String.valueOf(x.get(1)),x)).partitionBy(partitioner).values
    rdd.saveAsTextFile("hdfs://Hostname/test")
  }
}

I have checked many other post but could not get a clear answer for the internal working and tuning of sqoop nor I got sqoop vs spark sql benchmarking .Kindly help in understanding this issue.

Marcellusmarcelo answered 10/5, 2016 at 8:41 Comment(0)
S
10

You are using the wrong tools for the job.

Sqoop will launch a slew of processes (on the datanodes) that will each make a connections to your database (see num-mapper) and they will each extract a part of the dataset. I don't think you can achieve kind of read parallelism with Spark.

Get the dataset with Sqoop and then process it with Spark.

Siclari answered 12/1, 2017 at 22:19 Comment(3)
Is this still true? Is sqoop still better than spark for ingestion of RDBMS data.Thessa
I think this is wrong. According to the link below, Spark can perform parallel reading operation. linkThoron
@MohammadRahmati, this is a good blog to read the differences between sqoop and spark jdbc.Cheshire
L
6

you can try the following:-

  1. Read data from netezza without any partitions and with increased fetch_size to a million.

    sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("fetchSize","1000000").load().registerTempTable("POC")
    
  2. repartition the data before writing it to final file.

    val df3 = df2.repartition(10) //to reduce the shuffle 
    
  3. ORC formats are more optimized than TEXT. Write the final output to parquet/ORC.

    df3.write.format("ORC").save("hdfs://Hostname/test")
    
Lupine answered 31/10, 2016 at 19:27 Comment(0)
S
3

@amitabh Although marked as an answer, I disagree with it.

Once you give the predicate to partition the data while reading from the jdbc, spark will run separate tasks for each partition. In your case no of tasks should be 14 (u can confirm this using spark UI).

I notice that you are using local as master, which would provide only 1 core for executors. Hence there will be no parallelism. Which is what is happening in your case.

Now to get the same throughput as sqoop you need to make sure that these tasks are running in parallel. Theoretically this can be done either by: 1. Using 14 executors with 1 core each 2. Using 1 executor with 14 cores (other end of the spectrum)

Typically, I would go with 4-5 cores per executor. So I test the performance with 15/5= 3 executors (i added 1 to 14 to consider 1 core for the driver running in clustor mode). Use: executor.cores, executor.instances in sparkConf.set to play with the configs.

If this does not significantly increase performance, the next thing would be to look at the executor memory.

Finally, I would tweak the application logic to look at mapRDD sizes, partition sizes and shuffle sizes.

Shareeshareholder answered 4/9, 2017 at 5:53 Comment(2)
:- Thanks for your comments.. I have given master as "local" in my code becoz I was not able to post my company yarn URL here.. In reality I was running this on yarn cluster. Also the parallelism of 14 is being achieved while writing the data on hdfs and not while reading.. While reading there is only one thread reading from SQL db which is making the overall process very slow. In this case I think Marco Polo answer is correct. This is my view. Please feel free to correct me in case I am missing something. Thanks.Marcellusmarcelo
how many executors are allotted to the job? can you verify using spark UI?Shareeshareholder
H
1

I had the same problem because the piece of code you are using it's not working for partition.

sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC")

You can check number of partitions created in you spark job by

df.rdd.partitions.length

you can use following code to connect db:

sqlContext.read.jdbc(url=db_url,
    table=tableName,
    columnName="ID",
    lowerBound=1L,
    upperBound=100000L,
    numPartitions=numPartitions,
    connectionProperties=connectionProperties) 

To optimize your spark job following are the parameters: 1. # of partitions 2. --num-executors 3.--executor-cores 4. --executor-memory 5. --driver-memory 6. fetch-size

2,3,4 and 5 options are depends on you cluster configurations you can monitor your spark job on spark ui.

Haste answered 7/4, 2018 at 20:14 Comment(0)
S
1

Sqoop and Spark SQL both use JDBC connectivity to fetch the data from RDBMS engines but Sqoop has an edge here since it is specifically made to migrate the data between RDBMS and HDFS.

Every single option available in Sqoop has been fine-tuned to get the best performance while doing the data ingestions.

You can start with discussing the option -m which control the number of mappers.

This is what you need to do to fetch data in parallel from RDBMS. Can I do it in Spark SQL? Of course yes but the developer would need to take care of "multithreading" that Sqoop has been taking care automatically.

Spirochete answered 5/6, 2020 at 7:19 Comment(0)
F
0

The below solution helped me

var df=spark.read.format("jdbc").option("url","
"url").option("user","user").option("password","password").option("dbTable","dbTable").option("fetchSize","10000").load()
df.registerTempTable("tempTable")
var dfRepart=spark.sql("select * from tempTable distribute by primary_key") //this will repartition the data evenly

dfRepart.write.format("parquet").save("hdfs_location")
Forkey answered 14/12, 2017 at 13:41 Comment(0)
T
0

Apache Sqoop is retired now - https://attic.apache.org/projects/sqoop.html

Using Apache Spark is a good option. This link shows how Spark can be used instead of Sqoop - https://medium.com/zaloni-engineering/apache-spark-vs-sqoop-engineering-a-better-data-pipeline-ef2bcb32b745

Else one can choose any cloud services like Azure Data Factory or Amazon Redshift etc.

Televise answered 11/11, 2022 at 19:39 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.