I am reading data from file and converting it to BeamRecord But While i am Doing Query on that it Show Error-:
Exception in thread "main" java.lang.ClassCastException: org.apache.beam.sdk.coders.SerializableCoder cannot be cast to org.apache.beam.sdk.coders.BeamRecordCoder
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.registerTables(BeamSql.java:173)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:153)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:116)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:533)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:465)
at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:160)
at TestingClass.main(TestingClass.java:75)
But When I am Providing it a Coder Then It Runs Perfectly.
I am little confused that if I am reading data from a file the file data schema changes on every run because I am using templates so is there any way I can use Default Coder or Without Coder, i can Run the Query.
For Reference Code is Below Please Check.
PCollection<String> ReadFile1 = PBegin.in(p).apply(TextIO.read().from("gs://Bucket_Name/FileName.csv"));
PCollection<BeamRecord> File1_BeamRecord = ReadFile1.apply(new StringToBeamRecord()).setCoder(new Temp().test().getRecordCoder());
PCollection<String> ReadFile2= p.apply(TextIO.read().from("gs://Bucket_Name/FileName.csv"));
PCollection<BeamRecord> File2_Beam_Record = ReadFile2.apply(new StringToBeamRecord()).setCoder(new Temp().test1().getRecordCoder());
new Temp().test1().getRecordCoder() --> Returning HardCoded BeamRecordCoder Values Which I need to fetch at runtime
Conversion From PColletion<String> to PCollection<TableRow> is Below-:
Public class StringToBeamRecord extends PTransform<PCollection<String>,PCollection<BeamRecord>> {
private static final Logger LOG = LoggerFactory.getLogger(StringToBeamRecord.class);
@Override
public PCollection<BeamRecord> expand(PCollection<String> arg0) {
return arg0.apply("Conversion",ParDo.of(new ConversionOfData()));
}
static class ConversionOfData extends DoFn<String,BeamRecord> implements Serializable{
@ProcessElement
public void processElement(ProcessContext c){
String Data = c.element().replaceAll(",,",",blank,");
String[] array = Data.split(",");
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
List<Object> Data_Conversion = new ArrayList<>();
int Count = 0;
for(int i = 0 ; i < array.length;i++){
fieldNames.add(new String("R"+Count).toString());
Count++;
fieldTypes.add(Types.VARCHAR); //Using Schema I can Set it
Data_Conversion.add(array[i].toString());
}
LOG.info("The Size is : "+Data_Conversion.size());
BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
c.output(new BeamRecord(type,Data_Conversion));
}
}
}
Query is -:
PCollectionTuple test = PCollectionTuple.of(
new TupleTag<BeamRecord>("File1_BeamRecord"),File1_BeamRecord)
.and(new TupleTag<BeamRecord>("File2_BeamRecord"), File2_BeamRecord);
PCollection<BeamRecord> output = test.apply(BeamSql.queryMulti(
"Select * From File1_BeamRecord JOIN File2_BeamRecord "));
Is thier anyway i can make Coder Dynamic or I can Run Query with Default Coder.