Storm fields grouping

2020-02-14 07:31发布

问题:

I'm having the following situation:

  • There is a number of bolts that calculate different values
  • This values are sent to visualization bolt
  • Visualization bolt opens a web socket and sends values to be visualized somehow

The thing is, visualization bolt is always the same, but it sends a message with a different header for each type of bolt that can be its input. For example:

  • BoltSum calculates sum
  • BoltDif calculates difference
  • BoltMul calculates multiple

  • All this bolts use VisualizationBolt for visualization

  • There are 3 instances of VisualizationBolt in this case

My question is, should I create 3 independent instances, where each instance will have one thread, e.g.

builder.setBolt("forSum", new VisualizationBolt(),1).globalGrouping("bolt-sum");
builder.setBolt("forDif", new VisualizationBolt(),1).globalGrouping("bolt-dif");
builder.setBolt("forMul", new VisualizationBolt(),1).globalGrouping("bolt-mul");

Or should I do the following

builder.setBolt("forAll", new VisualizationBolt(),3)
.fieldsGrouping("forSum", new Fields("type"))
.fieldsGrouping("forDif", new Fields("type"))
.fieldsGrouping("forMul", new Fields("type"));

And emit type from each of the previous bolts, so they can be grouped on based on it?

What are the advantages?

Also, should I expect that each and every time bolt-sum will go to first visualization bolt, bolt-dif will go to second visualization bolt and bolt-mul will go to third visualization bolt? They won't be mixed?

I think that that should be the case, but it currently isn't in my implementation, so I'm not sure if it's a bug or I'm missing something?

回答1:

The first approach using three instances is the correct approach. Using fieldsGrouping does not ensure, that "sum" values go to "Sum-Visualization-Bolt" and neither that sum/diff/mul values are distinct (ie, in different bolt instances).

The semantic of fieldGrouping is more relaxed: it only guarantees, that all tuples of the same type will be processed by a single bolt instance, ie, that it will never be the case, that two different bolt instances get the same type.



回答2:

I guess you can use Partial Key grouping (partialKeyGrouping). On the Storm documentation about stream groups says:

Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.

I implemented a simple topology using this grouping and the chart on Graphite server show a better load balance compared to fieldsGrouping. The full source code is here.

topologyBuilder.setBolt(MqttSensors.BOLT_SENSOR_TYPE.getValue(), new SensorAggregateValuesWindowBolt().withTumblingWindow(Duration.seconds(5)), 2)
        // .fieldsGrouping(MqttSensors.SPOUT_STATION_01.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        // .fieldsGrouping(MqttSensors.SPOUT_STATION_02.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .partialKeyGrouping(MqttSensors.SPOUT_STATION_01.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .partialKeyGrouping(MqttSensors.SPOUT_STATION_02.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .setNumTasks(4) // This will create 4 Bolt instances 
        .addConfiguration(TagSite.SITE.getValue(), TagSite.EDGE.getValue())
        ;