join two json in Google Cloud Platform with datafl

2019-02-05 12:26发布

问题:

I want to find out only female employees out of the two different JSON files and select only the fields which we are interested in and write the output into another JSON.

Also I am trying to implement it in Google's cloud platform using Dataflow. Can someone please provide any sample Java code which can be implemented to get the result.

Employee JSON

{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"}
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"}

Department JSON

{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"}
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"}
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"}

The expected output JSON file should be like

{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"}

回答1:

You can do this using CoGroupByKey (where shuffle will be used), or using side inputs, if your departments collection is significantly smaller.

I will give you code in Python, but you can use the same pipeline in Java.


With side inputs, you will:

  1. Convert your departments PCollection into a dictionary that maps dept_id to the department JSON dictionary.

  2. Then you take the employees PCollection as main input, where you can use the dept_id to get the JSON for each department in the departments PCollection.

Like so:

departments = (p | LoadDepts()
                 | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

deps_si = beam.pvalue.AsDict(departments)

employees = (p | LoadEmps())

def join_emp_dept(employee, dept_dict):
  return employee.update(dept_dict[employee['dept_id']])

joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si)

With CoGroupByKey, you can use dept_id as a key to group both collections. This will result in a PCollection of key-value pairs where the key is the dept_id, and the value are two iterables of the department, and the employees in that department.

departments = (p | LoadDepts()
               | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

employees = (p | LoadEmps()
               | 'key_emp' >> beam.Map(lambda emp: (emp['dept_id'], emp)))

def join_lists((k, v)):
  itertools.product(v['employees'], v['departments'])

joined_dicts = (
    {'employees': employees, 'departments': departments} 
    | beam.CoGroupByKey()    
    | beam.FlatMap(join_lists)
    | 'mergedicts' >> beam.Map(lambda (emp_dict, dept_dict): emp_dict.update(dept_dict))
    | 'filterfields'>> beam.Map(filter_fields)
)


回答2:

Someone has asked for a Java-based solution for this question. Here is the Java code for this. It's more verbose, but it does essentially the same thing.

// First we want to load all departments, and put them into a PCollection
// of key-value pairs, where the Key is their identifier. We assume that it's String-type.
PCollection<KV<String, Department>> departments = 
    p.apply(new LoadDepts())
     .apply("getKey", MapElements.via((Department dept) -> KV.of(dept.getId(), dept)));

// We then convert this PCollection into a map-type PCollectionView.
// We can access this map directly within a ParDo.
PCollectionView<Map<String, Department>> departmentSideInput = 
    departments.apply("ToMapSideInput", View.<String, Department>asMap());

// We load the PCollection of employees
PCollection<Employee> employees = p.apply(new LoadEmployees());

// Let's suppose that we will *extend* an employee's information with their
// Department information. I have assumed the existence of an ExtendedEmployee
// class to represent an employee extended with department information.
class JoinDeptEmployeeDoFn extends DoFn<Employee, ExtendedEmployee> {

  @ProcessElement
  public void processElement(ProcessContext c) {
    // We obtain the Map-type side input with department information.
    Map<String, Department> departmentMap = c.sideInput(departmentSideInput);
    Employee empl = c.element();
    Department dept = departmentMap.get(empl.getDepartmentId(), null);
    if (department == null) return;

    ExtendedEmployee result = empl.extendWith(dept);
    c.output(result);
  }

}

// We apply the ParDo to extend the employee with department information
// and specify that it takes in a departmentSideInput.
PCollection<ExtendedEmployee> extendedEmployees = 
    employees.apply(
        ParDo.of(new JoinDeptEmployeeDoFn()).withSideInput(departmentSideInput));

With CoGroupByKey, you can use dept_id as a key to group both collections. The way this looks in Beam Java SDK is a CoGbkResult.

// We load the departments, and make them a key-value collection, to Join them
// later with employees.
PCollection<KV<String, Department>> departments = 
    p.apply(new LoadDepts())
     .apply("getKey", MapElements.via((Department dept) -> KV.of(dept.getId(), dept)));

// Because we will perform a join, employees also need to be put into
// key-value pairs, where their key is their *department id*.
PCollection<KV<String, Employee>> employees = 
    p.apply(new LoadEmployees())
     .apply("getKey", MapElements.via((Employee empl) -> KV.of(empl.getDepartmentId(), empl)));

// We define a DoFn that is able to join a single department with multiple
// employees.
class JoinEmployeesWithDepartments extends DoFn<KV<String, CoGbkResult>, ExtendedEmployee> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<...> elm = c.element();
    // We assume one department with the same ID, and assume that
    // employees always have a department available.
    Department dept = elm.getValue().getOnly(departmentsTag);
    Iterable<Employee> employees = elm.getValue().getAll(employeesTag);

    for (Employee empl : employees) {
      ExtendedEmployee result = empl.extendWith(dept);
      c.output(result);
    }
  }
}

// The syntax for a CoGroupByKey operation is a bit verbose.
// In this step we define a TupleTag, which serves as identifier for a
// PCollection.
final TupleTag<String> employeesTag = new TupleTag<>();
final TupleTag<String> departmentsTag = new TupleTag<>();

// We use the PCollection tuple-tags to join the two PCollections.
PCollection<KV<String, CoGbkResult>> results =
    KeyedPCollectionTuple.of(departmentsTag, departments)
        .and(employeesTag, employees)
        .apply(CoGroupByKey.create());

// Finally, we convert the joined PCollections into a kind that
// we can use: ExtendedEmployee.
PCollection<ExtendedEmployee> extendedEmployees =
    results.apply("ExtendInformation", ParDo.of(new JoinEmployeesWithDepartments()));