join two json in Google Cloud Platform with dataflow
Asked Answered
C

2

5

I want to find out only female employees out of the two different JSON files and select only the fields which we are interested in and write the output into another JSON.

Also I am trying to implement it in Google's cloud platform using Dataflow. Can someone please provide any sample Java code which can be implemented to get the result.

Employee JSON

{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"}
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"}

Department JSON

{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"}
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"}
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"}

The expected output JSON file should be like

{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"}
Cyruscyst answered 24/7, 2017 at 18:24 Comment(0)
D
15

You can do this using CoGroupByKey (where shuffle will be used), or using side inputs, if your departments collection is significantly smaller.

I will give you code in Python, but you can use the same pipeline in Java.


With side inputs, you will:

  1. Convert your departments PCollection into a dictionary that maps dept_id to the department JSON dictionary.

  2. Then you take the employees PCollection as main input, where you can use the dept_id to get the JSON for each department in the departments PCollection.

Like so:

departments = (p | LoadDepts()
                 | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

deps_si = beam.pvalue.AsDict(departments)

employees = (p | LoadEmps())

def join_emp_dept(employee, dept_dict):
  return employee.update(dept_dict[employee['dept_id']])

joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si)

With CoGroupByKey, you can use dept_id as a key to group both collections. This will result in a PCollection of key-value pairs where the key is the dept_id, and the value are two iterables of the department, and the employees in that department.

departments = (p | LoadDepts()
               | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

employees = (p | LoadEmps()
               | 'key_emp' >> beam.Map(lambda emp: (emp['dept_id'], emp)))

def join_lists((k, v)):
  itertools.product(v['employees'], v['departments'])

joined_dicts = (
    {'employees': employees, 'departments': departments} 
    | beam.CoGroupByKey()    
    | beam.FlatMap(join_lists)
    | 'mergedicts' >> beam.Map(lambda (emp_dict, dept_dict): emp_dict.update(dept_dict))
    | 'filterfields'>> beam.Map(filter_fields)
)
Dambro answered 24/7, 2017 at 21:28 Comment(9)
Note that using the side-input is probably the better option since as Pablo mentioned the department collection is likely smaller than the employees collection.Swob
Pablo - Thanks for your reply. Could you please provide one liner explanation of the steps you mentioned. As I am newbie in this area so a small explanation will help.Cyruscyst
I've added an explanation. Also, I moved the side input solution up so you'll consider that one first.Dambro
If the answer is useful, you can select it Koushik, so it's available to everyone on top.Dambro
@Dambro thank you so much, been stuck on joining but reading your example I figured out more about how I need to solve it in 5 minutes than I did in 5 hours yesterday. You deserve "solved", upvotes and loveWoothen
I am having trouble adapting this to my problem. I think I may have got something wrong. Can I get anyone of you to assist me at #57388249Alphonsoalphonsus
What is "join_dicts" and "employee" (I see "employees" but not "employee") in the in the side input solution? I see it not defined anywhere...Alphonsoalphonsus
@Dambro In your 1st solution, what is "employee" in join_emp_dept?Alphonsoalphonsus
@Dambro And what is "join_dicts" in "joined_dicts" assignment?Alphonsoalphonsus
D
4

Someone has asked for a Java-based solution for this question. Here is the Java code for this. It's more verbose, but it does essentially the same thing.

// First we want to load all departments, and put them into a PCollection
// of key-value pairs, where the Key is their identifier. We assume that it is String-type.
PCollection<KV<String, Department>> departments = 
    p.apply(new LoadDepts())
     .apply("getKey", MapElements.via((Department dept) -> KV.of(dept.getId(), dept)));

// We then convert this PCollection into a map-type PCollectionView.
// We can access this map directly within a ParDo.
PCollectionView<Map<String, Department>> departmentSideInput = 
    departments.apply("ToMapSideInput", View.<String, Department>asMap());

// We load the PCollection of employees
PCollection<Employee> employees = p.apply(new LoadEmployees());

// Let us suppose that we will *extend* an employee information with their
// Department information. I have assumed the existence of an ExtendedEmployee
// class to represent an employee extended with department information.
class JoinDeptEmployeeDoFn extends DoFn<Employee, ExtendedEmployee> {

  @ProcessElement
  public void processElement(ProcessContext c) {
    // We obtain the Map-type side input with department information.
    Map<String, Department> departmentMap = c.sideInput(departmentSideInput);
    Employee empl = c.element();
    Department dept = departmentMap.get(empl.getDepartmentId(), null);
    if (department == null) return;

    ExtendedEmployee result = empl.extendWith(dept);
    c.output(result);
  }

}

// We apply the ParDo to extend the employee with department information
// and specify that it takes in a departmentSideInput.
PCollection<ExtendedEmployee> extendedEmployees = 
    employees.apply(
        ParDo.of(new JoinDeptEmployeeDoFn()).withSideInput(departmentSideInput));

With CoGroupByKey, you can use dept_id as a key to group both collections. The way this looks in Beam Java SDK is a CoGbkResult.

// We load the departments, and make them a key-value collection, to Join them
// later with employees.
PCollection<KV<String, Department>> departments = 
    p.apply(new LoadDepts())
     .apply("getKey", MapElements.via((Department dept) -> KV.of(dept.getId(), dept)));

// Because we will perform a join, employees also need to be put into
// key-value pairs, where their key is their *department id*.
PCollection<KV<String, Employee>> employees = 
    p.apply(new LoadEmployees())
     .apply("getKey", MapElements.via((Employee empl) -> KV.of(empl.getDepartmentId(), empl)));

// We define a DoFn that is able to join a single department with multiple
// employees.
class JoinEmployeesWithDepartments extends DoFn<KV<String, CoGbkResult>, ExtendedEmployee> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<...> elm = c.element();
    // We assume one department with the same ID, and assume that
    // employees always have a department available.
    Department dept = elm.getValue().getOnly(departmentsTag);
    Iterable<Employee> employees = elm.getValue().getAll(employeesTag);

    for (Employee empl : employees) {
      ExtendedEmployee result = empl.extendWith(dept);
      c.output(result);
    }
  }
}

// The syntax for a CoGroupByKey operation is a bit verbose.
// In this step we define a TupleTag, which serves as identifier for a
// PCollection.
final TupleTag<String> employeesTag = new TupleTag<>();
final TupleTag<String> departmentsTag = new TupleTag<>();

// We use the PCollection tuple-tags to join the two PCollections.
PCollection<KV<String, CoGbkResult>> results =
    KeyedPCollectionTuple.of(departmentsTag, departments)
        .and(employeesTag, employees)
        .apply(CoGroupByKey.create());

// Finally, we convert the joined PCollections into a kind that
// we can use: ExtendedEmployee.
PCollection<ExtendedEmployee> extendedEmployees =
    results.apply("ExtendInformation", ParDo.of(new JoinEmployeesWithDepartments()));
Dambro answered 31/7, 2018 at 23:1 Comment(1)
Feel free to ask questions if you'd like me to clarify further on this.Dambro

© 2022 - 2024 — McMap. All rights reserved.