Does Samza create partitions automatically when se

2019-07-10 00:11发布

If you use Samza's OutgoingMessageEnvelope to send a message using this format:

public OutgoingMessageEnvelope(SystemStream systemStream,
                               java.lang.Object partitionKey,
                               java.lang.Object key,
                               java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.

and you call this method within a stream task's process() method and want to route the incoming messages to an appropriate partition, will Samza create the partitions for you when you call the method?

E.g.

MessageA = {"id": "idA", "key": "keyA", "body":"some details"}
MessageB = {"id": "idB", "key": "keyB", "body":"some more details"}

If I call within a stream task's process() where msg is a message instance:

public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
    // ...
    String partition = msg["id"]
    String key = msg["key"]
    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "PartitionedMessages"), id, key, msg));
    // ...

Will this create partitions idA and idB automatically for me (i.e. do I need to have created these partitions before I send message to them)? I want to be able to route a message to an appropriate partition and also to be able to log compaction with a separate message key.

1条回答
狗以群分
2楼-- · 2019-07-10 01:10

You must specify number of partition when you create the topic. You cannot dynamically add new partitions (well, you can but it is not easy and Samza does not do it automatically). Samza should create new topic for you if the topic doesn't exists but with default number of partitions. It depends on settings. You can test it.

But the value msg["id"] does not specify the name of the partition. This value is just used to compute the number of targeted partition. This value is hashed to a number and then trimmed using modulo. Something like this (there are more algorithms, this is the basic one):

partitionID = hash(msg["id"]) % total_number_of_partitions

And partitionID is always a non-negative integer. It means that it doesn't matter how many partitions do you actually have. It always end up in some. The main idea is that if you have two messages with the same msg["id"], then the messages will end up in the same partitions. That is usually what you want.

The log compaction will work as you probably expected -- it will remove messages with same key from the specific partition (but if you have two messages with the same key with two different partitions, they will not be removed).

FYI you can use kafkacat to find out the number of partitions and other useful stuff.

查看更多
登录 后发表回答