I have a pipeline that successfully outputs an Avro file as follows:
@DefaultCoder(AvroCoder.class)
class MyOutput_T_S {
T foo;
S bar;
Boolean baz;
public MyOutput_T_S() {}
}
@DefaultCoder(AvroCoder.class)
class T {
String id;
public T() {}
}
@DefaultCoder(AvroCoder.class)
class S {
String id;
public S() {}
}
...
PCollection<MyOutput_T_S> output = input.apply(myTransform);
output.apply(AvroIO.Write.to("/out").withSchema(MyOutput_T_S.class));
How can I reproduce this exact behavior except with a parameterized output MyOutput<T, S>
(where T
and S
are both Avro code-able using reflection).
The main issue is that Avro reflection doesn't work for parameterized types. So based on these responses:
1) I think I need to write a custom CoderFactory
but, I am having difficulty figuring out exactly how this works (I'm having trouble finding examples). Oddly enough, a completely naive coder factory appears to let me run the pipeline and inspect proper output using DataflowAssert:
cr.RegisterCoder(MyOutput.class, new CoderFactory() {
@Override
public Coder<?> create(List<? excents Coder<?>> componentCoders) {
Schema schema = new Schema.Parser().parse("{\"type\":\"record\,"
+ "\"name\":\"MyOutput\","
+ "\"namespace\":\"mypackage"\","
+ "\"fields\":[]}"
return AvroCoder.of(MyOutput.class, schema);
}
@Override
public List<Object> getInstanceComponents(Object value) {
MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
List components = new ArrayList();
return components;
}
While I can successfully assert against the output now, I expect this will not cut it for writing to a file. I haven't figured out how I'm supposed to use the provided componentCoders
to generate the correct schema and if I try to just shove the schema of T
or S
into fields
I get:
java.lang.IllegalArgumentException: Unable to get field id from class null
2) Assuming I figure out how to encode MyOutput
. What do I pass to AvroIO.Write.withSchema
? If I pass either MyOutput.class
or the schema I get type mismatch errors.
I think there are two questions (correct me if I am wrong):
MyOutput<T, S>
?MyOutput<T, S>
to a file usingAvroIO.Write
.The first question is to be solved by registering a
CoderFactory
as in the linked question you found.Your naive coder is probably allowing you to run the pipeline without issues because serialization is being optimized away. Certainly an Avro schema with no fields will result in those fields being dropped in a serialization+deserialization round trip.
But assuming you fill in the schema with the fields, your approach to
CoderFactory#create
looks right. I don't know the exact cause of the messagejava.lang.IllegalArgumentException: Unable to get field id from class null
, but the call toAvroCoder.of(MyOutput.class, schema)
should work, for an appropriately assembledschema
. If there is an issue with this, more details (such as the rest of the stack track) would be helpful.However, your override of
CoderFactory#getInstanceComponents
should return a list of values, one per type parameter ofMyOutput
. Like so:The second question can be answered using some of the same support code as the first, but otherwise is independent.
AvroIO.Write.withSchema
always explicitly uses the provided schema. It does useAvroCoder
under the hood, but this is actually an implementation detail. Providing a compatible schema is all that is necessary - such a schema will have to be composed for each value ofT
andS
for which you want to outputMyOutput<T, S>
.