Split dataset based on column values in spark
Asked Answered
M

3

9

I am trying to split the Dataset into different Datasets based on Manufacturer column contents. It is very slow
Please suggest a way to improve the code, so that it can execute faster and reduce the usage of Java code.

List<Row> lsts= countsByAge.collectAsList();
                                
for(Row lst:lsts) {
     String man = lst.toString();
     man = man.replaceAll("[\\p{Ps}\\p{Pe}]", "");
     Dataset<Row> DF = src.filter("Manufacturer='" + man + "'");
     DF.show();                                      
}

The Code, Input and Output Datasets are as shown below.

package org.sparkexample;

import org.apache.parquet.filter2.predicate.Operators.Column;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RelationalGroupedDataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;

public class GroupBy {

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "C:\\winutils");
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession.builder().appName("split datasets").getOrCreate();
        sc.setLogLevel("ERROR");
                        
        Dataset<Row> src= sqlContext.read()
                    .format("com.databricks.spark.csv")
                    .option("header", "true")
                    .load("sample.csv");
                                    
                          
        Dataset<Row> unq_manf=src.select("Manufacturer").distinct();
        List<Row> lsts= unq_manf.collectAsList();
                        
        for(Row lst:lsts) {
             String man = lst.toString();
             man = man.replaceAll("[\\p{Ps}\\p{Pe}]", "");
             Dataset<Row> DF = src.filter("Manufacturer='" + man + "'");
             DF.show();          
        }
    }

}

Input Table

+------+------------+--------------------+---+
|ItemID|Manufacturer|       Category name|UPC|
+------+------------+--------------------+---+
|   804|         ael|Brush & Broom Han...|123|
|   805|         ael|Wheel Brush Parts...|124|
|   813|         ael|      Drivers Gloves|125|
|   632|        west|       Pipe Wrenches|126|
|   804|         bil|     Masonry Brushes|127|
|   497|        west|   Power Tools Other|128|
|   496|        west|   Power Tools Other|129|
|   495|         bil|           Hole Saws|130|
|   499|         bil|    Battery Chargers|131|
|   497|        west|   Power Tools Other|132|
+------+------------+--------------------+---+

Output

+------------+
|Manufacturer|
+------------+
|         ael|
|        west|
|         bil|
+------------+

+------+------------+--------------------+---+
|ItemID|Manufacturer|       Category name|UPC|
+------+------------+--------------------+---+
|   804|         ael|Brush & Broom Han...|123|
|   805|         ael|Wheel Brush Parts...|124|
|   813|         ael|      Drivers Gloves|125|
+------+------------+--------------------+---+

+------+------------+-----------------+---+
|ItemID|Manufacturer|    Category name|UPC|
+------+------------+-----------------+---+
|   632|        west|    Pipe Wrenches|126|
|   497|        west|Power Tools Other|128|
|   496|        west|Power Tools Other|129|
|   497|        west|Power Tools Other|132|
+------+------------+-----------------+---+

+------+------------+----------------+---+
|ItemID|Manufacturer|   Category name|UPC|
+------+------------+----------------+---+
|   804|         bil| Masonry Brushes|127|
|   495|         bil|       Hole Saws|130|
|   499|         bil|Battery Chargers|131|
+------+------------+----------------+---+
Maloney answered 7/3, 2017 at 10:30 Comment(0)
O
0

You have two choice in this case:

  1. First you have to collect unique manufacturer values and then map over resulting array:

    val df = Seq(("HP", 1), ("Brother", 2), ("Canon", 3), ("HP", 5)).toDF("k", "v")    
    val brands = df.select("k").distinct.collect.flatMap(_.toSeq)
    val BrandArray = brands.map(brand => df.where($"k" <=> brand))
    BrandArray.foreach { x =>
    x.show()
    println("---------------------------------------")
    }
    
  2. You can also save the data frame based on manufacturer.

    df.write.partitionBy("hour").saveAsTable("parquet")

Osher answered 18/4, 2017 at 9:46 Comment(1)
@Osher How do you reach "df" from inside the third line, in the map function? I think it is not possible, in java.Clinometer
C
0

Instead of splitting the dataset/dataframe by manufacturers it might be optimal to write the dataframe using manufacturer as the partition key if you need to query based on manufacturer frequently

Incase you still want separate dataframes based on one of the column values one of the approaches using pyspark and spark 2.0+ could be-

from pyspark.sql import functions as F

df = spark.read.csv("sample.csv",header=True)

# collect list of manufacturers
manufacturers = df.select('manufacturer').distinct().collect()

# loop through manufacturers to filter df by manufacturers and write it separately 
for m in manufacturers:
    df1 = df.where(F.col('manufacturers')==m[0])
    df1[.repartition(repartition_col)].write.parquet(<write_path>,[write_mode])

Cultivation answered 23/1, 2020 at 6:50 Comment(0)
L
0

Change spark.scheduler.mode to FAIR using conf option

spark-shell --conf spark.scheduler.mode=FAIR

OR

spark-submit --conf spark.scheduler.mode=FAIR

Given Input

scala> :paste
// Entering paste mode (ctrl-D to finish)

val df = Seq(
    (804,"ael","Brush & Broom Han","123"),
    (805,"ael","Wheel Brush Parts","124"),
    (813,"ael","Drivers Gloves","125"),
    (632,"west","Pipe Wrenches","126"),
    (804,"bil","Masonry Brushes","127"),
    (497,"west","Power Tools Other","128"),
    (496,"west","Power Tools Other","129"),
    (495,"bil","Hole Saws","130"),
    (499,"bil","Battery Chargers","131"),
    (497,"west","Power Tools Other","132")
)
.toDF("ItemID","Manufacturer"," Category name","UPC")

// Exiting paste mode, now interpreting.

df: org.apache.spark.sql.DataFrame = [ItemID: int, Manufacturer: string ... 2 more fields]

Using parallel collection to run multiple jobs in spark

scala> import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.ForkJoinTaskSupport
scala> import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinPool

Collecting unique values of manufacturers

scala> val manufacturers = df.select("Manufacturer").as[String].collect.distinct.toSeq
manufacturers: Seq[String] = WrappedArray(ael, west, bil)

Number of threads to process

scala> val numThreads = manufacturers.size
numThreads: Int = 3

Parallel Collection

scala> val manufacturersPar = manufacturers.par
manufacturersPar: scala.collection.parallel.ParSeq[String] = ParArray(ael, west, bil)

Creating task support & passing number of threads to run.

scala> val taskSupport = new ForkJoinTaskSupport(new ForkJoinPool(numThreads))
taskSupport: scala.collection.parallel.ForkJoinTaskSupport = scala.collection.parallel.ForkJoinTaskSupport@10706411

Assigning task support to parallel collection.

scala> manufacturersPar.tasksupport = taskSupport
manufacturersPar.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@10706411

Finally running code. using first count and then show method as show method will not trigger any jobs.

scala> :paste
// Entering paste mode (ctrl-D to finish)

manufacturersPar
.foreach{ f =>
    spark.sparkContext.setLocalProperty("callSite.short", s"Manufacturer=${f}")

    val fdf = df.filter($"Manufacturer" === f)

    println(fdf.count())
    fdf.show(false)
}

// Exiting paste mode, now interpreting.

3
4
3
+------+------------+-----------------+---+
|ItemID|Manufacturer| Category name   |UPC|
+------+------------+-----------------+---+
|804   |ael         |Brush & Broom Han|123|
|805   |ael         |Wheel Brush Parts|124|
|813   |ael         |Drivers Gloves   |125|
+------+------------+-----------------+---+

+------+------------+----------------+---+
|ItemID|Manufacturer| Category name  |UPC|
+------+------------+----------------+---+
|804   |bil         |Masonry Brushes |127|
|495   |bil         |Hole Saws       |130|
|499   |bil         |Battery Chargers|131|
+------+------------+----------------+---+

+------+------------+-----------------+---+
|ItemID|Manufacturer| Category name   |UPC|
+------+------------+-----------------+---+
|632   |west        |Pipe Wrenches    |126|
|497   |west        |Power Tools Other|128|
|496   |west        |Power Tools Other|129|
|497   |west        |Power Tools Other|132|
+------+------------+-----------------+---+


scala>

You can check multiple jobs are started in spark ui

enter image description here

Lithesome answered 2/3 at 15:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.