How to improve performance for slow Spark jobs using DataFrame and JDBC connection?
Asked Answered
P

4

11

I am trying to access a mid-size Teradata table (~100 million rows) via JDBC in standalone mode on a single node (local[*]).

I am using Spark 1.4.1. and is setup on a very powerful machine(2 cpu, 24 cores, 126G RAM).

I have tried several memory setup and tuning options to make it work faster, but neither of them made a huge impact.

I am sure there is something I am missing and below is my final try that took about 11 minutes to get this simple counts vs it only took 40 seconds using a JDBC connection through R to get the counts.

bin/pyspark --driver-memory 40g --executor-memory 40g

df = sqlContext.read.jdbc("jdbc:teradata://......)
df.count()

When I tried with BIG table (5B records) then no results returned upon completion of query.

Prescribe answered 24/8, 2015 at 17:36 Comment(6)
How do you count using R?Neuman
@Neuman - simply using RJDBC and teradataR packages after setting up connection using Teradata JARS..and then tdQuery("SELECT COUNT(*) FROM your_table)Prescribe
As far as I know Spark JDBC Data Source can push down predicates but actual executing is done in Spark. It means you have to transfer your data to the Spark cluster. So it is not the same as executing SQL query over JDBC (R case). First you should do is to cache your data after loading. It won't improve performance for the first query though.Neuman
@Neuman - thanks, I realized that after doing some more research on this. I do have a quick question thought - what would be the fastest way to read data in apache spark? is it through Parquet file structure?Prescribe
It is probably a good choice but the first thing you can try before you go this way is to use Teradata Hadoop conector. It looks like it supports multiple export options including Hive tables. With a single machine network and disk IO can be still a limiting factor though.Neuman
Suggest accepting Gianmarios answer.Moltke
H
18

All of the aggregation operations are performed after the whole dataset is retrieved into memory into a DataFrame collection. So doing the count in Spark will never be as efficient as it would be directly in TeraData. Sometimes it's worth it to push some computation into the database by creating views and then mapping those views using the JDBC API.

Every time you use the JDBC driver to access a large table you should specify the partitioning strategy otherwise you will create a DataFrame/RDD with a single partition and you will overload the single JDBC connection.

Instead you want to try the following AI (since Spark 1.4.0+):

sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", 
  lowerBound = minValue,
  upperBound = maxValue,
  numPartitions = 20,
  connectionProperties = new java.util.Properties()
)

There is also an option to push down some filtering.

If you don't have an uniformly distributed integral column you want to create some custom partitions by specifying custom predicates (where statements). For example let's suppose you have a timestamp column and want to partition by date ranges:

    val predicates = 
  Array(
    "2015-06-20" -> "2015-06-30",
    "2015-07-01" -> "2015-07-10",
    "2015-07-11" -> "2015-07-20",
    "2015-07-21" -> "2015-07-31"
  )
  .map {
    case (start, end) => 
      s"cast(DAT_TME as date) >= date '$start'  AND cast(DAT_TME as date) <= date '$end'"
  }

 predicates.foreach(println) 

// Below is the result of how predicates were formed 
//cast(DAT_TME as date) >= date '2015-06-20'  AND cast(DAT_TME as date) <= date '2015-06-30'
//cast(DAT_TME as date) >= date '2015-07-01'  AND cast(DAT_TME as date) <= date '2015-07-10'
//cast(DAT_TME as date) >= date '2015-07-11'  AND cast(DAT_TME as date) <= date //'2015-07-20'
//cast(DAT_TME as date) >= date '2015-07-21'  AND cast(DAT_TME as date) <= date '2015-07-31'


sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  predicates = predicates,
  connectionProperties = new java.util.Properties()
)

It will generate a DataFrame where each partition will contain the records of each subquery associated to the different predicates.

Check the source code at DataFrameReader.scala

Heavierthanair answered 30/9, 2015 at 14:49 Comment(2)
@zero323, @Gianmario Spacagna if I actually need to read the entire MySQL table (and not just get count), then how can I improve the sluggish performance of Spark-SQL? I'm already parallelizing the read operation using spark.read.jdbc(..numPartitions..) method.Calderon
My MySQL (InnoDB) table has ~ 186M records weighing around 149 GB (as per stats shown by phpMyAdmin) and I'm using numPartitions = 32. [Spark 2.2.0] I'm on EMR 5.12.0 with 1 master, 1 task and 1 core (all r3.xlarge, 8 vCore, 30.5 GiB memory, 80 SSD GB storage). I've found that reading MySQL table into DataFrame fails if I DON'T limit the records to ~ 1.5-2M. It gives a long stack-trace that has javax.servlet.ServletException: java.util.NoSuchElementException: None.get & java.sql.SQLException: Incorrect key file for table..Calderon
H
5

Does the unserialized table fit into 40 GB? If it starts swapping on disk performance will decrease drammatically.

Anyway when you use a standard JDBC with ansi SQL syntax you leverage the DB engine, so if teradata ( I don't know teradata ) holds statistics about your table, a classic "select count(*) from table" will be very fast. Instead spark, is loading your 100 million rows in memory with something like "select * from table" and then will perform a count on RDD rows. It's a pretty different workload.

Heavily answered 24/8, 2015 at 18:24 Comment(7)
I think it would and I also tried increasing memory to 100 GB, but didn't see any improvement. I am not trying to load 100 million rows in memory, but running some aggregated operation such as count() on dataframe or count(*) on temp table, but Spark take too long. I also tried registering a DF as temp table and did a simple count, but takes about the same time. ra1.registerTempTable("ra_dt"); total = sqlContext.sql("select count(*) from ra_dt")Prescribe
Yes, but I think that spark is not pruning the count operation on DB engine, so it will load all rows in memory and then will perform a count on DF.Heavily
How many columns do you have into that table, with 100 million rows is pretty easy to reach 100 GB of unserialized objects. Could you post your table schema ?Heavily
I think you're right, I was reading few other posts online and found that Spark is trying to load the data before applying count operation. In that case, what would be ideal way to read this type of data faster in Spark? In other words what would be the fastest way to read data in apache spark? Here is my table schema: root |-- field1: decimal(18,0) (nullable = true) |-- field2: string (nullable = true) |-- field3: date (nullable = true) |-- field4: date (nullable = true) |-- field5: integer (nullable = true) |-- field6: string (nullable = true)Prescribe
Spark is a distributed processing engine, so the best way to load data in spark is from a distributed file system or dbms. In your case, working on a signle instance, I think you can only improve performance specifying partitionColumn, lowerBound, upperBound, numPartition to improve reading parallelism. If you need to perform other queries after the count you can cache the DF before count it, so the first count will take its time but then next queries will be in memory and will be faster.Heavily
Makes sense! Thanks for the answer!Prescribe
How many executors are you running, and how many --executor-cores?Conceivable
M
0

One solution that differs from others is to save the data from the oracle table in an avro file (partitioned in many files) saved on hadoop. This way reading those avro files with spark would be a peace of cake since you won't call the db anymore.

Melinite answered 19/8, 2019 at 20:50 Comment(0)
N
0

Configuring the appropriate JDBC fetchSize may improve the SQL read performance. This would be something like this:

sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  fetchSize = 100000
)

I loaded an Oracle table with 700'000 rows and 300 columns in 97 seconds with the fetchsize=100000 instead of 500 seconds without the option : that's a 5 times improvement. The optimal value must determined according to your use-case.

Nalchik answered 23/5 at 9:29 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.