ActiveMQ: Intercept publishing to a certain topic

2019-08-01 07:52发布

问题:

I'm setting up an ActiveMQ broker with MQTT that uses an external service for user authentication.

I figured out how to write a BrokerFilter and plug it into a broker, so the basics are covered.

I could even limit users in the topics they are allowed to subscribe on using the addConsumer() override. That method looks like this, and works:

    override fun addConsumer(context: ConnectionContext?, info: ConsumerInfo?): Subscription {
    // let connections from the local vm through
    return if (isLocalConnection(context!!)) {
        super.addConsumer(context, info)
    } else {
        val username = context?.userName ?: ""
        val cameraCode = getTopicElementsFromDestination(info!!.destination).first()
        assertUserHasPermissionForCamera(username, cameraCode)
        super.addConsumer(context, info)
    }
}

So I thought that restricting publishing would work pretty much the same with an override of addProducer(), but I stumbled over a few problems.

The first problem was that subscriptions also called addProducer(), which was surprising to say the least. But the real surprise was that ConsumerInfo::destination is always null when this method is called. I've searched far and wide, but I'm unable to find a way to extract the topic being published to from the information passed to addProducer(). Without knowing which topic the producer wants to publish to, I obviously can't limit it.

So I tried overriding messageDelivered() instead, figuring I could just discard the message when it is published to the wrong topic, achieving more or less the same effect. According to the documentation, this method should be called whenever the broker receives a message. But it doesn't get called when I send a message to the broker, so either I'm misunderstanding what "a message delivered to the broker" means or something is fishy. I also tried addSession(), which also doesn't get called.

So... How can I intercept when a client publishes to a certain topic?

回答1:

After some hunting through the source code and a lot of overriding methods and seeing when they get called and what they receive, I learned two things that helped me do what I wanted:

  1. The topic being published to is encapsulated in the message. A Producer doesn't register to publish to a certain topic, it just registers to publish to any topic. The broker itself doesn't know to what topic, that's handled on a message-by-message basis. So my first attempts to limit topics in addProducer() were futile, because nobody knows the topic yet.

  2. The earliest point in the publish chain where both username and topic are available is addDestination(). The topic can first be known in send(), but you don't have the user. It might be possible to do the authorisation there by persisting the username for the context, but I don't like state. So I put the authorisation into addDestination(), and it works.

There's a potential caveat here, though. According to the documentation, addDestination() only gets called if a destination doesn't exist yet. All tests I could do in the limited time confirmed that this is always the case on a publish, even if somebody is subscribed to the topic. But it might be different if the producer maintains a persistent connection, So use this solution with care.