Setting Custom Coders & Handling Parameterized typ

2019-02-23 01:19发布

问题:

I have two questions related to coder issues I am facing with my Dataflow pipeline.

  • How do I go about setting a coder for my custom data types? The class consists of just three items - two doubles and another parameterized property. I tried annotating the type with SerializableCoder but I still end up with the error "com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException: Cannot provide coder based on value with class interface java.util.Set: No CoderFactory has been registered for the class." The Set actually contains the parameterized custom data-type - so I am assuming that the custom datatype is the problem. I could not find enough documentation/examples on the right way to do this. Please point me to the right place if its available.
  • Even without the custom datatype, whenever I try switching to a parameterized version of Transform functions, it results in coder errors. Specifically, inside a complex transform which is parameterized, a ParDo works with parameterized types but when I apply a Combine.PerKey on the resulting PCollection after the ParDo, it results in the CoderNotFoundException.

Any help regarding these two items would be helpful as I am kind of stuck on this for sometime now.

回答1:

It looks like you have been bitten by two issues. Thanks for bringing them to our attention! Fortunately, there are easy workarounds for both while we improve things.

The first issue is that the default coder registry does not have an entry for mapping Set.class to SetCoder. We have filed GitHub issue #56 to track its resolution. In the meantime, you can use the following code to perform the needed registration:

pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);

The second issue is that parameterized types currently require advanced treatment in the coder registry, so the @DefaultCoder will not be honored. We have filed Github issue #57 to track this. The best way to ensure that SerializableCoder is used everywhere for CustomType is to register a CoderFactory for your type that will return a SerializableCoder. Supposing your type is something like this:

public class CustomType<T extends Serializable> implements Serializable {
  T field;
}

Then the following code registers a CoderFactory that produces appropriate SerializableCoder instances:

pipeline.getCoderRegistry().registerCoder(CustomType.class, new CoderFactory() {
  @Override
  public Coder<?> create(List<? extends Coder<?>>) {
    // No matter what the T is, return SerializableCoder
    return SerializableCoder.of(CustomType.class);
  }

  @Override
  public List<Object> getInstanceComponents(Object value) {
    // Return the T inside your CustomType<T> to enable coder inference for Create
    return Collections.singletonList(((CustomType<Object>) value).field);
  }
});

Now, whenever you use CustomType in your pipeline, the coder registry will produce a SerializableCoder.

Note that SerializableCoder is not deterministic (the bytes of encoded objects are not necessarily equal for objects that are equals()) so values encoded using this coder cannot be used as keys in a GroupByKey operation.