I have two questions related to coder issues I am facing with my Dataflow pipeline.
- How do I go about setting a coder for my custom data types? The class consists of just three items - two doubles and another parameterized property. I tried annotating the type with SerializableCoder but I still end up with the error "com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException: Cannot provide coder based on value with class interface java.util.Set: No CoderFactory has been registered for the class." The Set actually contains the parameterized custom data-type - so I am assuming that the custom datatype is the problem. I could not find enough documentation/examples on the right way to do this. Please point me to the right place if its available.
- Even without the custom datatype, whenever I try switching to a parameterized version of Transform functions, it results in coder errors. Specifically, inside a complex transform which is parameterized, a ParDo works with parameterized types but when I apply a Combine.PerKey on the resulting PCollection after the ParDo, it results in the CoderNotFoundException.
Any help regarding these two items would be helpful as I am kind of stuck on this for sometime now.
It looks like you have been bitten by two issues. Thanks for bringing them to our attention! Fortunately, there are easy workarounds for both while we improve things.
The first issue is that the default coder registry does not have an entry for mapping Set.class
to SetCoder
. We have filed GitHub issue #56 to track its resolution. In the meantime, you can use the following code to perform the needed registration:
pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
The second issue is that parameterized types currently require advanced treatment in the coder registry, so the @DefaultCoder
will not be honored. We have filed Github issue #57 to track this. The best way to ensure that SerializableCoder
is used everywhere for CustomType
is to register a CoderFactory
for your type that will return a SerializableCoder
. Supposing your type is something like this:
public class CustomType<T extends Serializable> implements Serializable {
T field;
}
Then the following code registers a CoderFactory
that produces appropriate SerializableCoder
instances:
pipeline.getCoderRegistry().registerCoder(CustomType.class, new CoderFactory() {
@Override
public Coder<?> create(List<? extends Coder<?>>) {
// No matter what the T is, return SerializableCoder
return SerializableCoder.of(CustomType.class);
}
@Override
public List<Object> getInstanceComponents(Object value) {
// Return the T inside your CustomType<T> to enable coder inference for Create
return Collections.singletonList(((CustomType<Object>) value).field);
}
});
Now, whenever you use CustomType
in your pipeline, the coder registry will produce a SerializableCoder
.
Note that SerializableCoder
is not deterministic (the bytes of encoded objects are not necessarily equal for objects that are equals()
) so values encoded using this coder cannot be used as keys in a GroupByKey
operation.