Dataflow output parameterized type to avro file

2019-03-06 05:00发布

I have a pipeline that successfully outputs an Avro file as follows:

@DefaultCoder(AvroCoder.class)
class MyOutput_T_S {
  T foo;
  S bar;
  Boolean baz;
  public MyOutput_T_S() {}
}

@DefaultCoder(AvroCoder.class)
class T {
  String id;
  public T() {}
}

@DefaultCoder(AvroCoder.class)
class S {
  String id;
  public S() {}
}
...
PCollection<MyOutput_T_S> output = input.apply(myTransform);
output.apply(AvroIO.Write.to("/out").withSchema(MyOutput_T_S.class));

How can I reproduce this exact behavior except with a parameterized output MyOutput<T, S> (where T and S are both Avro code-able using reflection).

The main issue is that Avro reflection doesn't work for parameterized types. So based on these responses:

1) I think I need to write a custom CoderFactory but, I am having difficulty figuring out exactly how this works (I'm having trouble finding examples). Oddly enough, a completely naive coder factory appears to let me run the pipeline and inspect proper output using DataflowAssert:

cr.RegisterCoder(MyOutput.class, new CoderFactory() {
  @Override
  public Coder<?> create(List<? excents Coder<?>> componentCoders) {
    Schema schema = new Schema.Parser().parse("{\"type\":\"record\,"
      + "\"name\":\"MyOutput\","
      + "\"namespace\":\"mypackage"\","
      + "\"fields\":[]}"
    return AvroCoder.of(MyOutput.class, schema);
  }
  @Override
  public List<Object> getInstanceComponents(Object value) {
    MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
    List components = new ArrayList();
    return components;
  }

While I can successfully assert against the output now, I expect this will not cut it for writing to a file. I haven't figured out how I'm supposed to use the provided componentCoders to generate the correct schema and if I try to just shove the schema of T or S into fields I get:

java.lang.IllegalArgumentException: Unable to get field id from class null

2) Assuming I figure out how to encode MyOutput. What do I pass to AvroIO.Write.withSchema? If I pass either MyOutput.class or the schema I get type mismatch errors.

1条回答
Root(大扎)
2楼-- · 2019-03-06 05:40

I think there are two questions (correct me if I am wrong):

  1. How do I enable the coder registry to provide coders for various parameterizations of MyOutput<T, S>?
  2. How do I values of MyOutput<T, S> to a file using AvroIO.Write.

The first question is to be solved by registering a CoderFactory as in the linked question you found.

Your naive coder is probably allowing you to run the pipeline without issues because serialization is being optimized away. Certainly an Avro schema with no fields will result in those fields being dropped in a serialization+deserialization round trip.

But assuming you fill in the schema with the fields, your approach to CoderFactory#create looks right. I don't know the exact cause of the message java.lang.IllegalArgumentException: Unable to get field id from class null, but the call to AvroCoder.of(MyOutput.class, schema) should work, for an appropriately assembled schema. If there is an issue with this, more details (such as the rest of the stack track) would be helpful.

However, your override of CoderFactory#getInstanceComponents should return a list of values, one per type parameter of MyOutput. Like so:

@Override
public List<Object> getInstanceComponents(Object value) {
  MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
  return ImmutableList.of(myOutput.foo, myOutput.bar);
}

The second question can be answered using some of the same support code as the first, but otherwise is independent. AvroIO.Write.withSchema always explicitly uses the provided schema. It does use AvroCoder under the hood, but this is actually an implementation detail. Providing a compatible schema is all that is necessary - such a schema will have to be composed for each value of T and S for which you want to output MyOutput<T, S>.

查看更多
登录 后发表回答