Using Avrocoder for Custom Types with Generics

2019-02-23 08:34发布

问题:

I am trying to use AvroCoder to serialise a custom type which is passed around in PCollections in my pipeline. The custom type has a generic field (which currently is a String) When I run the pipeline, I get the AvroTypeException like below probably due to the generic field. Is building and passing the AvroSchema for the object the only way to get around this?

Exception in thread "main" org.apache.avro.AvroTypeException: Unknown type: T
 at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:255)
 at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:514)
 at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:593)
 at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:472)
 at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
 at com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:116)

I have also attached my registry code for reference.

pipelineCoderRegistry.registerCoder(GenericTypeClass.class, new CoderFactory() {
    @Override
    public Coder<?> create(List<? extends Coder<?>> componentCoders) {
        return AvroCoder.of(GenericTypeClass.class);
    }

    @Override
    public List<Object> getInstanceComponents(Object value) {
        return Collections.singletonList(((GenericTypeClass<Object>) value).key);
    }
});

回答1:

You’ve done everything right as far as setting up the CoderFactory, but Avro’s ReflectData mechanism which AvroCoder uses to automatically generate a schema does not work for generic types, as of this writing. This is tracked as issue AVRO-1571. See also this StackOverflow question.

In order to allow encoding of GenericTypeClass<T> for some particular values of T, you are correct that you will have to provide some explicit schema information. There are two ways to proceed:

The first approach is to provide an explicit schema on fields of type T within your GenericTypeClass<T>, like so:

class GenericTypeClass<T> {
  // Avro requires a no-args constructor
  public GenericTypeClass() {}

  @AvroSchema("[\"string\", \"int\", ...]")
  private T genericField;
}

The drawback is that it is limited to a finite, static union schema, and requires manually inlining the JSON schema for more complex values of T.

The second approach is to provide an explicit schema when you build an AvroCoder in your CoderFactory, and provide this schema to AvroCoder.of(Class, Schema).

pipelineCoderRegistry.registerCoder(GenericTypeClass.class, new CoderFactory() {
  @Override
  public Coder<?> create(List<? extends Coder<?>> componentCoders) {
      return AvroCoder.of(
          GenericTypeClass.class
          schemaFromCoder(componentCoders.get(0)));
  }

  ...
});

This will mostly revolve around converting a Coder<T> into a schema for T. This should be easy for basic types and manageable for POJOs that ReflectData supports. It does also provide a hook for ad hoc support of more difficult cases.