This is related to another SO question [here] (Setting Custom Coders & Handling Parameterized types) Following the work-arounds there helped me use custom-types in the transforms. But since my custom types are generic, I was hoping to make even the transform classes generic which then could parameterize the custom type with the same type. But when I try doing that, I run into Cannot provide a Coder for type variable T because the actual type is unknown due to erasure. The work around suggested registering a coder which would return the type parameter but since the type parameter is itself unknown, I guess this exception is thrown and I was not sure how to get around this.
static class Processor<T>
extends PTransform<PCollection<String>,
PCollection<KV<String, Set<CustomType<T>>>>> {
private static final long serialVersionUID = 0;
@Override public PCollection<KV<String, Set<CustomType<T>>>>
apply(PCollection<String> items) {
PCollection<KV<String, Set<CustomType<T>>>> partitionedItems = items
.apply(ParDo.of(new ParDoFn()));
PCollection<KV<String, Set<CustomType<T>>>> combinedItems = partitionedItems
.apply(Combine.<String, Set<CustomType<T>>>perKey(new Merger()));
}
}
This looks to be also caused by Github Issue #57 and should be fixed along with that issue.
In the meantime, Dataflow actually includes advanced features that can solve your problem immediately. It appears from your code snippet that the entire system in question may look something like this:
Dataflow obtains the output type of each
DoFn
by using aTypeDescriptor
returned bygetOutputTypeDescriptor
Because your
ParDoFn
is an inner class ofProcessor<T>
, the output type descriptor is simplySet<CustomType<T>>
, even when it is instantiated as newProcessor<String>
.To gain type information, we need
ParDoFn
to know, statically, the type provided forT
. There are two steps for this.1. Create an anonymous subclass of
Processor
This ensures that for all the inner classes of this instance of
Processor
, the type variableT
is statically bound to the typeString
. It is probably best in this case to makeProcessor
an abstract class so consumers are required to subclass it.2.Override
getOutputTypeDescriptor
ofParDoFn
to resolve its types against the outer classProcessor
.The complete working version of the code from the beginning is then the following. Note again that none of this will be necessary when Github Issue #57 is resolved.
This is not the only solution -- you could also override
Processor.getDefaultOutputCoder
or explicitly callsetCoder
on the intermediatepartitionedItems
collection -- but it seems to be the most general for this use.