running tasks in parallel on separate Hive partitions using Scala and Spark to speed up loading Hive and writing results to Hive or Parquet
Asked Answered
P

0

1

this question is a spin off from [this one] (saving a list of rows to a Hive table in pyspark).

EDIT please see my update edits at the bottom of this post

I have used both Scala and now Pyspark to do the same task, but I am having problems with VERY slow saves of a dataframe to parquet or csv, or converting a dataframe to a list or array type data structure. Below is the relevant python/pyspark code and info:

#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()

for i in range(len(Table)):

    rows = sqlContext.sql(qry)
    val1 = Table[i][0]
    val2 = Table[i][1]
    count = Table[i][2]
    x = 100 - count

#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;

    query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)

    rows = sqlContext.sql(query)
    rows = rows.withColumn("col4", lit(10))
    rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
    rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
    #rows_list = rows.collect()
    #shuffle(rows_list)

I have tried to do the above in Scala, and I had similar problems. I could easily load the hive table or query of a hive table, but needing to do a random shuffle or store a large dataframe encounters memory issues. There were also some challenges with being able to add 2 extra columns.

The Hive table (hiveTemp) that I want to add rows to has 5,570,000 ~5.5 million rows and 120 columns.

The Hive table that I am iterating in the for loop through has 5000 rows and 3 columns. There are 25 unique val1 (a column in hiveTemp), and the combinations of val1 and val2 3000. Val2 could be one of 5 columns and its specific cell value. This means if I had tweaked code, then I could reduce the lookups of rows to add down to 26 from 5000, but the number of rows I have to retrieve, store and random shuffle would be pretty large and hence a memory issue (unless anyone has suggestions on this)

As far as how many total rows I need to add to the table might be about 100,000.

The ultimate goal is to have the original table of 5.5mill rows appended with the 100k+ rows written as a hive or parquet table. If its easier, I am fine with writing the 100k rows in its own table that can be merged to the 5.5 mill table later

Scala or Python is fine, though Scala is more preferred..

Any advice on this and the options that would be best would be great.

Thanks a lot!

EDIT Some additional thought I had on this problem: I used the hash partitioner to partition the hive table into 26 partitions. This is based on a column value which there are 26 distinct ones. The operations I want to perform in the for loop could be generalized so that it only needs to happen on each of these partitions. That being said, how could I, or what guide can I look at online to be able to write the scala code to do this, and for a separate executer to do each of these loops on each partition? I am thinking this would make things much faster.
I know how to do something like this using multithreads but not sure how to in the scala/spark paradigm.

Parget answered 28/4, 2016 at 19:44 Comment(1)
were you able to solve this ? I am trying to solve similar thing so any recommendations would be helpfulStrath

© 2022 - 2024 — McMap. All rights reserved.