Is there a way to apply multiple groupings in stor

2019-08-10 08:58发布

问题:

I want to apply "Fields grouping" as well as "Local or shuffle grouping" to my topology such that each spout sends data to local bolts only but also uses a field in my document to decide what local-bolts it should go to.

So if there were two worker processes each with 1 Kafka-Spout and 2 elastic-search-bolts, local-or-shuffle grouping gives me the following:

Each KS ---> Two local ES-Bolts

fields-grouping gives me the following:

Each KS ---> Possibly all 4 ES-bolts, depending on the value of the field

But I want to have the following:

Each KS ---> Two local ES-bolts only, but distribution among these
             local bolts should depend on the value of the field

where:

KS = Kafka-Spout

ES = Elastic-Search

I want to do this so that I can group all the documents for a single shard together in ES-bolt. This way the batch sent by an ES-bolt will not be split further by the ES-server as all those document's destination shard will be the same (I plan to add field destination_shard to the documents for field-level grouping and destination_shard will be calculated as Murmurm3.hash(ID)%numShards).

And then I do not want any inter-process communication, hence the need for "local or shuffle grouping"

Thanks for the help !

回答1:

No and Yes.

There is no grouping value that does what you want, but you can implement that grouping yourself using:

1) Directed streams, in which you specify the task id of the bolt instance to process the tuple (rather than let Storm figure it out)

2) The topology context passed to each bolt and spout on startup. That object can tell you which tasks are running on the current worker (using getThisWorkerTasks()) and what bolts have which tasks (getComponentTasks())

3) Your own partitioning logic as you've described above and which makes use of the info in (2) above to specify the specific target task for each of your bolt's outbound tuples.