Making Transformations in Dataflow Generic

2019-05-22 23:27发布

问题:

This is related to another SO question [here] (Setting Custom Coders & Handling Parameterized types) Following the work-arounds there helped me use custom-types in the transforms. But since my custom types are generic, I was hoping to make even the transform classes generic which then could parameterize the custom type with the same type. But when I try doing that, I run into Cannot provide a Coder for type variable T because the actual type is unknown due to erasure. The work around suggested registering a coder which would return the type parameter but since the type parameter is itself unknown, I guess this exception is thrown and I was not sure how to get around this.

static class Processor<T> 
  extends PTransform<PCollection<String>, 
                     PCollection<KV<String, Set<CustomType<T>>>>> { 

  private static final long serialVersionUID = 0; 

  @Override public PCollection<KV<String, Set<CustomType<T>>>>  
  apply(PCollection<String> items) {
    PCollection<KV<String, Set<CustomType<T>>>> partitionedItems = items     
        .apply(ParDo.of(new ParDoFn())); 
    PCollection<KV<String, Set<CustomType<T>>>> combinedItems = partitionedItems
        .apply(Combine.<String, Set<CustomType<T>>>perKey(new Merger()));
  }
} 

回答1:

This looks to be also caused by Github Issue #57 and should be fixed along with that issue.

In the meantime, Dataflow actually includes advanced features that can solve your problem immediately. It appears from your code snippet that the entire system in question may look something like this:

class CustomType<T extends Serializable> { ... }

class Processor<T extends Serializable>
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Set<CustomType<T>>>>> {

    class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> { … }

    class Merger extends BinaryCombineFn<Set<CustomType<T>>> { … }

    @Override
    public PCollection<KV<String, Set<CustomType<T>>>>
    apply(PCollection<String> items) {

      PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
          items.apply(ParDo.of(new ParDoFn()));

      PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
          partitionedItems.apply(
              Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
                  new Merger()));

      return combinedItems;
    }
}

…

PCollection<String> input = ...
input.apply(new Processor<String>());

Dataflow obtains the output type of each DoFn by using a TypeDescriptor returned by getOutputTypeDescriptor

Because your ParDoFn is an inner class of Processor<T>, the output type descriptor is simply Set<CustomType<T>>, even when it is instantiated as new Processor<String>.

To gain type information, we need ParDoFn to know, statically, the type provided for T. There are two steps for this.

1. Create an anonymous subclass of Processor

PCollection<String> input = ...
input.apply(new Processor<String>() {});

This ensures that for all the inner classes of this instance of Processor, the type variable T is statically bound to the type String. It is probably best in this case to make Processor an abstract class so consumers are required to subclass it.

2.Override getOutputTypeDescriptor of ParDoFn to resolve its types against the outer class Processor.

class Processor<T extends Serializable> extends ... {
  class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
    @Override
    protected TypeDescriptor<KV<String, Set<CustomType<T>>>>
    getOutputTypeDescriptor() {
      return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
        Processor.this.getClass()) {};
    }
 }

The complete working version of the code from the beginning is then the following. Note again that none of this will be necessary when Github Issue #57 is resolved.

class CustomType<T extends Serializable> { ... }

abstract class Processor<T extends Serializable>
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Set<CustomType<T>>>>> {

  class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
    ...

    @Override
    protected TypeDescriptor<KV<String, Set<CustomType<T>>>> 
    getOutputTypeDescriptor() {
      return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
          Processor.this.getClass()) {};
    }
  }

  class Merger extends BinaryCombineFn<Set<CustomType<T>>> { ... }

    @Override
    public PCollection<KV<String, Set<CustomType<T>>>> apply(PCollection<String> items) {

    PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
        items.apply(ParDo.of(new ParDoFn()));

    PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
        partitionedItems.apply(
          Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
              new Merger()));

    return combinedItems;
  }
}

PCollection<String> input = …;
input.apply(new Processor<String>() {});

This is not the only solution -- you could also override Processor.getDefaultOutputCoder or explicitly call setCoder on the intermediate partitionedItems collection -- but it seems to be the most general for this use.