Running BeamSql WithoutCoder or Making Coder Dynam

2019-07-11 21:40发布

问题:

I am reading data from file and converting it to BeamRecord But While i am Doing Query on that it Show Error-:

Exception in thread "main" java.lang.ClassCastException: org.apache.beam.sdk.coders.SerializableCoder cannot be cast to org.apache.beam.sdk.coders.BeamRecordCoder
    at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.registerTables(BeamSql.java:173)
    at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:153)
    at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:116)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:533)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:465)
    at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:160)
    at TestingClass.main(TestingClass.java:75)

But When I am Providing it a Coder Then It Runs Perfectly.

I am little confused that if I am reading data from a file the file data schema changes on every run because I am using templates so is there any way I can use Default Coder or Without Coder, i can Run the Query.

For Reference Code is Below Please Check.

 PCollection<String> ReadFile1 = PBegin.in(p).apply(TextIO.read().from("gs://Bucket_Name/FileName.csv"));
 PCollection<BeamRecord> File1_BeamRecord = ReadFile1.apply(new StringToBeamRecord()).setCoder(new Temp().test().getRecordCoder());


  PCollection<String> ReadFile2= p.apply(TextIO.read().from("gs://Bucket_Name/FileName.csv"));
  PCollection<BeamRecord> File2_Beam_Record = ReadFile2.apply(new StringToBeamRecord()).setCoder(new Temp().test1().getRecordCoder());

new Temp().test1().getRecordCoder() --> Returning HardCoded BeamRecordCoder Values Which I need to fetch at runtime

Conversion From PColletion<String> to PCollection<TableRow> is Below-:

Public class StringToBeamRecord extends PTransform<PCollection<String>,PCollection<BeamRecord>> {

    private static final Logger LOG = LoggerFactory.getLogger(StringToBeamRecord.class);
    @Override
    public  PCollection<BeamRecord> expand(PCollection<String> arg0) {

        return arg0.apply("Conversion",ParDo.of(new ConversionOfData()));
    }

    static class ConversionOfData extends DoFn<String,BeamRecord> implements Serializable{

        @ProcessElement
        public void processElement(ProcessContext c){
            String Data = c.element().replaceAll(",,",",blank,");
            String[] array = Data.split(",");   
            List<String> fieldNames = new ArrayList<>();
            List<Integer> fieldTypes = new ArrayList<>();
            List<Object> Data_Conversion = new ArrayList<>();
            int Count = 0;
            for(int i = 0 ; i < array.length;i++){
                fieldNames.add(new String("R"+Count).toString());
                Count++;
                fieldTypes.add(Types.VARCHAR); //Using Schema I can Set it
                Data_Conversion.add(array[i].toString());
            }
            LOG.info("The Size is : "+Data_Conversion.size());
            BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
            c.output(new BeamRecord(type,Data_Conversion));
        }
    }
}

Query is -:

PCollectionTuple test = PCollectionTuple.of(
                    new TupleTag<BeamRecord>("File1_BeamRecord"),File1_BeamRecord)
                    .and(new TupleTag<BeamRecord>("File2_BeamRecord"), File2_BeamRecord);

PCollection<BeamRecord> output = test.apply(BeamSql.queryMulti(
                    "Select * From File1_BeamRecord JOIN File2_BeamRecord "));

Is thier anyway i can make Coder Dynamic or I can Run Query with Default Coder.