I'm setting up an ActiveMQ broker with MQTT that uses an external service for user authentication.
I figured out how to write a BrokerFilter and plug it into a broker, so the basics are covered.
I could even limit users in the topics they are allowed to subscribe on using the addConsumer()
override. That method looks like this, and works:
override fun addConsumer(context: ConnectionContext?, info: ConsumerInfo?): Subscription {
// let connections from the local vm through
return if (isLocalConnection(context!!)) {
super.addConsumer(context, info)
} else {
val username = context?.userName ?: ""
val cameraCode = getTopicElementsFromDestination(info!!.destination).first()
assertUserHasPermissionForCamera(username, cameraCode)
super.addConsumer(context, info)
}
}
So I thought that restricting publishing would work pretty much the same with an override of addProducer()
, but I stumbled over a few problems.
The first problem was that subscriptions also called addProducer()
, which was surprising to say the least. But the real surprise was that ConsumerInfo::destination
is always null when this method is called. I've searched far and wide, but I'm unable to find a way to extract the topic being published to from the information passed to addProducer()
. Without knowing which topic the producer wants to publish to, I obviously can't limit it.
So I tried overriding messageDelivered()
instead, figuring I could just discard the message when it is published to the wrong topic, achieving more or less the same effect. According to the documentation, this method should be called whenever the broker receives a message. But it doesn't get called when I send a message to the broker, so either I'm misunderstanding what "a message delivered to the broker" means or something is fishy.
I also tried addSession()
, which also doesn't get called.
So... How can I intercept when a client publishes to a certain topic?