如何在UNNEST数据流嵌套PCollection(How to Unnest the nested

2019-10-28 23:17发布

连接两个嵌套结构PCollection,我们需要做的加入之前UNNEST的PCollection,为获得挑战(参见我的其他计算器的情况下链接 )。 所以,想知道如何UNNEST的PCollection。 这将是很好的,如果有一个人给理念无论是加入两个嵌套表或如何UNNEST PCollections。

我刚才提到的,我们有PTransform“UNNEST”( 链接 )从嵌套一个unnesting集合。 但我不能找到任何净样品。 不过,我只是试图通过下面的步骤来实现它转换嵌套的集合,但仍无法获得UNNEST集合中最后一个。

1)PCollection empCollection = ReadCollection(); 2)使用帕尔函数转换从PCollection(com.google.api.services.bigquery.model.TableRow)到PCollection(org.apache.beam.sdk.values.Row)的值3)定义的架构象下面架构项目= 。Schema.builder()addInt32Field( “ID”)addStringField( “名称”)建()。 。。架构雇员= Schema.builder()addStringField( “EMPNO”)addStringField( “empName”)addArrayField( “项目”,FieldType.row(项目))建立(); 4)使用UNNEST变换到UNNEST嵌套集合

PCollection<Row> pcColl = targetRowCollection.apply(Unnest.<Row>create().withFieldNameFunction(new SerializableFunction<java.util.List<java.lang.String>, java.lang.String>() {
@Override
public java.lang.String apply(java.util.List<java.lang.String> input) {
    return String.join("+", input);
    }
}));

5)使用帕尔函数转换从PCollection(org.apache.beam.sdk.values.Row)至PCollection的值(com.google.api.services.bigquery.model.TableRow)

能有人帮我,用这种UNNEST转换到UNNEST收集从嵌套集合转换。

Answer 1:

用于与嵌套结构在python与梁连接两个Pcollection代码:

with beam.Pipeline(options=option) as p:

    source_record1 =  p | "get data1" >> beam.io.avroio.ReadFromAvro(input_file1)
    source_record2 =  p | "get data2" >> beam.io.avroio.ReadFromAvro(input_file2)

    #convert into <k,v> form
    keyed_record1 = source_record1 | beam.ParDo(addkeysnested(),join_fileld_names1)
    keyed_record2 = source_record2 | beam.ParDo(addkeysnested(),join_fileld_names2)

    #Apply join operation
    rjoin = ({'File1Info': keyed_record1, 'File2Info': keyed_record2}                     
               | beam.CoGroupByKey())


    class addkeysnested(beam.DoFn):
        def process(self,element,fieldName):
            tmp_record = element    
            fieldName = fieldName.split(".")
            for i in range(len(fieldName)):

                if i != len(fieldName) - 1 :
                    tmp_record = tmp_record[fieldName[i].strip()][0]

                else:
                    tmp_record = tmp_record[fieldName[i].strip()]   

        return [(tmp_record,element)]

注:在上面的代码中我们可以嵌套字段的任何级别得到的键值即personalInfo.Address.City,适用CoGroupByKey()连接两个pcollection后



文章来源: How to Unnest the nested PCollection in Dataflow