I have two questions related to coder issues I am facing with my Dataflow pipeline.
- How do I go about setting a coder for my custom data types? The class consists of just three items - two doubles and another parameterized property. I tried annotating the type with SerializableCoder but I still end up with the error "com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException: Cannot provide coder based on value with class interface java.util.Set: No CoderFactory has been registered for the class." The Set actually contains the parameterized custom data-type - so I am assuming that the custom datatype is the problem. I could not find enough documentation/examples on the right way to do this. Please point me to the right place if its available.
- Even without the custom datatype, whenever I try switching to a parameterized version of Transform functions, it results in coder errors. Specifically, inside a complex transform which is parameterized, a ParDo works with parameterized types but when I apply a Combine.PerKey on the resulting PCollection after the ParDo, it results in the CoderNotFoundException.
Any help regarding these two items would be helpful as I am kind of stuck on this for sometime now.
It looks like you have been bitten by two issues. Thanks for bringing them to our attention! Fortunately, there are easy workarounds for both while we improve things.
The first issue is that the default coder registry does not have an entry for mapping
Set.class
toSetCoder
. We have filed GitHub issue #56 to track its resolution. In the meantime, you can use the following code to perform the needed registration:The second issue is that parameterized types currently require advanced treatment in the coder registry, so the
@DefaultCoder
will not be honored. We have filed Github issue #57 to track this. The best way to ensure thatSerializableCoder
is used everywhere forCustomType
is to register aCoderFactory
for your type that will return aSerializableCoder
. Supposing your type is something like this:Then the following code registers a
CoderFactory
that produces appropriateSerializableCoder
instances:Now, whenever you use
CustomType
in your pipeline, the coder registry will produce aSerializableCoder
.Note that
SerializableCoder
is not deterministic (the bytes of encoded objects are not necessarily equal for objects that areequals()
) so values encoded using this coder cannot be used as keys in aGroupByKey
operation.