Apache Spark Joins example with Java
Asked Answered
R

3

15

I am very new to Apache Spark. I would actually like to focus on basic Spark API specification and want to understand and write some programs using Spark API. I have written a java program using Apache Spark to implement Joins concept.

When I use Left Outer Join -- leftOuterJoin() or Right Outer Join -- rightOuterJoin(), both two methods are returning a JavaPairRDD which contains a special type Google Options. But I do not know how to extract the original values from Optional type.

Anyways I would like to know can I use same join methods which return the data in my own format. I did not find any way to do that. Meaning is when I am using Apache Spark, I am not able to customize the code in my own style since they already have given all pre-defined things.

Please find the code below

my 2 sample input datasets

customers_data.txt:
4000001,Kristina,Chung,55,Pilot
4000002,Paige,Chen,74,Teacher
4000003,Sherri,Melton,34,Firefighter

and

trasaction_data.txt
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit

Here is my Java code

**SparkJoins.java:**

public class SparkJoins {

    @SuppressWarnings("serial")
    public static void main(String[] args) throws FileNotFoundException {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local"));
        JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt");
        JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) {
                String[] customerSplit = s.split(",");
                return new Tuple2<String, String>(customerSplit[0], customerSplit[1]);
            }
        }).distinct();

        JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt");
        JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) {
                String[] transactionSplit = s.split(",");
                return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]);
            }
        });

        //Default Join operation (Inner join)
        JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs);
        System.out.println("Joins function Output: "+joinsOutput.collect());

        //Left Outer join operation
        JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey();
        System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect());

        //Right Outer join operation
        JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey();
        System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect());

        sc.close();
    }
}

And here the output which I am getting

Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))]

LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])]

RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])]

I am running this program on Windows platform

Please observe the above output and help me in extracting the values from Optional type

Thanks in advance

Roveover answered 5/2, 2015 at 7:47 Comment(3)
Why not use Scala instead?Doralynne
Hi @maasg, i am basically a java developer.. I really don't know Scala.. But I think Apache Spark is most suitable for Scala programming then Java.Roveover
@ShekarPatel can you please update your code with how did you removed that Optional .. that will be helpful for others.Ortiz
Z
12

When you do left outer join and right outer join, you might have null values. right!

So spark returns Optional object. after getting that result, you can map that result to your own format.

your can use isPresent() method of Optional to map your data.

Here is the example :

 JavaPairRDD<String,String> firstRDD = ....
 JavaPairRDD<String,String> secondRDD =....
 // join both rdd using left outerjoin
 JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> rddWithJoin = firstRDD.leftOuterJoin(secondRDD);


// mapping of join result
JavaPairRDD<String, String> mappedRDD = rddWithJoin
            .mapToPair(tuple -> {
                if (tuple._2()._2().isPresent()) {
                    //do your operation and return
                    return new Tuple2<String, String>(tuple._1(), tuple._2()._1());
                } else {
                    return new Tuple2<String, String>(tuple._1(), "not present");
                }
            });
Zak answered 5/2, 2015 at 16:29 Comment(2)
@Zak how to map that result to our own format? i am also facing the same problem.Ortiz
@Ortiz : I added the example in the above answer. mappedRDD is your own format.Zak
B
0

In Java, we can also implement JOINs using DataFrames as follows:

1) create spark session as:

SparkSession spark = SparkSession.builder().appName("JoinsInSpark").master("local").getOrCreate();

2) I've taken the Employee input as:

101,Alan,Franklyn Street,Melbourne,QLD

104,Stuart,Lonsdale Street,Sydney,NSW

create DataFrame as:

Dataset<Employee> e_data = spark
                        .read()
                        .textFile("C:/XX/XX/test.txt")
                        .map(line -> {
                            Employee e = new Employee();
                            String[] parts = line.split(",");
                            e.setE_id(Integer.valueOf(parts[0].trim()));
                            e.setE_name(parts[1].trim());
                            e.setAddress(parts[2].trim());
                            e.setCity(parts[3].trim());
                            e.setState(parts[4].trim());
                            return e;
                        }, Encoders.bean(Employee.class));

where Employee is the POJO class containing setter, getter along with constructor.

3) similarly create another DF for second table (say salary)

4) Apply INNER join on distinct elements of both views:

Dataset<Row> d1 = e_data.distinct().join(s_data.distinct(), "e_id").orderBy("salary");

d1.show();

5) similary, left outer join as:

spark.sql("select * from global_temp.employee e LEFT OUTER JOIN global_temp.salary s on e.e_id = s.e_id").show();
Budweis answered 6/9, 2018 at 16:24 Comment(0)
O
0
 Lis<Person> personList = List.of(Person.builder()
                .name("Tohid")
                .id(1)
                .build());
                
  Lis<Employee> employeeList = List.of(Employee.builder()
                .surname("Tohid")
                .id(1)
                .build());                  
  Dataset<Person> firstDataset = spark.createDataset(personList, Encoders.bean(Person.class));
  Dataset<Employee> secondDataset = spark.createDataset(employeeList, Encoders.bean(Employee.class));

  Dataset<Row> backlog = firstDataset.join(secondDataset, "id");
Outroar answered 5/9, 2023 at 14:29 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.