Apache Beam - how can I apply .getSideInputsMap to

2019-08-31 01:49发布

问题:

I get a PCollection by subscribing to a Google pub/sub in Apache Beam like this:

deviceReferenceDataUpdates = pipeLine.begin()
    .apply("subscribe to published data"),
         PubsubIO.readMessages().fromTopic("my_data"))

I want to use the data to update a HashMap of other data, that exists in the same class (the HashMap is directly available in the same scope as the PCollection). I'm investigating how I can use the .getSideInputsMap() method in Apache Beam to achieve this. Here is an example of how getSideInputsMap() can be used to put items into a new HashMap:

public Map<TupleTag<?>, PValue> getAdditionalInputs() {
  Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
  for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
try {
  additionalInputs.put(
      new TupleTag<>(sideInputEntry.getKey()),
      rehydratedComponents.getPCollection(
          protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
} catch {

.... error handling

(taken from example 10 here - https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine)

I'm not sure how (or if) I can use getSideInputsMap() to update my HashMap. It's not available as a direct method of my PCollection. In the example above it's being applied to a payload, and I'm not sure if I need to somehow generate a payload, or if I should generate something else to which I can call getSideInputsMap()?