I have a pipeline that successfully outputs an Avro file as follows:
@DefaultCoder(AvroCoder.class)
class MyOutput_T_S {
T foo;
S bar;
Boolean baz;
public MyOutput_T_S() {}
}
@DefaultCoder(AvroCoder.class)
class T {
String id;
public T() {}
}
@DefaultCoder(AvroCoder.class)
class S {
String id;
public S() {}
}
...
PCollection<MyOutput_T_S> output = input.apply(myTransform);
output.apply(AvroIO.Write.to("/out").withSchema(MyOutput_T_S.class));
How can I reproduce this exact behavior except with a parameterized output MyOutput<T, S>
(where T
and S
are both Avro code-able using reflection).
The main issue is that Avro reflection doesn't work for parameterized types. So based on these responses:
- Setting Custom Coders & Handling Parameterized types
- Using Avrocoder for Custom Types with Generics
1) I think I need to write a custom CoderFactory
but, I am having difficulty figuring out exactly how this works (I'm having trouble finding examples). Oddly enough, a completely naive coder factory appears to let me run the pipeline and inspect proper output using DataflowAssert:
cr.RegisterCoder(MyOutput.class, new CoderFactory() {
@Override
public Coder<?> create(List<? excents Coder<?>> componentCoders) {
Schema schema = new Schema.Parser().parse("{\"type\":\"record\,"
+ "\"name\":\"MyOutput\","
+ "\"namespace\":\"mypackage"\","
+ "\"fields\":[]}"
return AvroCoder.of(MyOutput.class, schema);
}
@Override
public List<Object> getInstanceComponents(Object value) {
MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
List components = new ArrayList();
return components;
}
While I can successfully assert against the output now, I expect this will not cut it for writing to a file. I haven't figured out how I'm supposed to use the provided componentCoders
to generate the correct schema and if I try to just shove the schema of T
or S
into fields
I get:
java.lang.IllegalArgumentException: Unable to get field id from class null
2) Assuming I figure out how to encode MyOutput
. What do I pass to AvroIO.Write.withSchema
? If I pass either MyOutput.class
or the schema I get type mismatch errors.