I'm receiving messages from a service (S) that publishes each individual property change to an entity as a separate message. A contrived example would be an entity like this:
Person {
id: 123
name: "Something",
address: {...}
}
If name and address are updated in the same transaction then (S) will publish two messages, PersonNameCorrected
and PersonMoved
. The problem is on the receiving side where I'm storing a projection of this Person
entity and each property change causes a write to the database. So in this example there would be two writes to the database but if I could batch messages for a short period of time and group them by id then I would only have to make a single write to the database.
How does one typically handle this in RabbitMQ? Does Spring AMQP provide an easier abstraction?
Note that I have looked briefly at prefetch but I'm not sure if this is the way to go. Also prefetch, if I understand it correctly, is per connection basis. I'm trying to achieve this on a per-queue basis, because if batching (and thus added latency) is the way to go I wouldn't like to add this latency to ALL queues consumed by my service (but only to those that need the "group-by-id" features).
Prefetch won't help for a case like this.
Consider using Spring Integration which has adapters that sit on top of Spring AMQP; it also provides an aggregrator which can be used to group messages together before sending them on to the next stage in the pipeline.
EDIT
Here's a quick boot app to demostrate...
@SpringBootApplication
public class So42969130Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So42969130Application.class, args)
.close();
}
@Autowired
private RabbitTemplate template;
@Autowired
private Handler handler;
@Override
public void run(String... args) throws Exception {
this.template.convertAndSend("so9130", new PersonNameChanged(123));
this.template.convertAndSend("so9130", new PersonMoved(123));
this.handler.latch.await(10, TimeUnit.SECONDS);
}
@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130")
.messageConverter(converter()))
.aggregate(a -> a
.correlationExpression("payload.id")
.releaseExpression("false") // open-ended release, timeout only
.sendPartialResultOnExpiry(true)
.groupTimeout(2000))
.handle(handler())
.get();
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Handler handler() {
return new Handler();
}
@Bean
public Queue queue() {
return new Queue("so9130", false, false, true);
}
public static class Handler {
private final CountDownLatch latch = new CountDownLatch(1);
@ServiceActivator
public void handle(Collection<?> aggregatedData) {
System.out.println(aggregatedData);
this.latch.countDown();
}
}
public static class PersonNameChanged {
private int id;
PersonNameChanged() {
}
PersonNameChanged(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PersonNameChanged [id=" + this.id + "]";
}
}
public static class PersonMoved {
private int id;
PersonMoved() {
}
PersonMoved(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PersonMoved [id=" + this.id + "]";
}
}
}
Pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>so42969130</artifactId>
<version>2.0.0-BUILD-SNAPSHOT</version>
<packaging>jar</packaging>
<name>so42969130</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Result:
2017-03-23 09:56:57.501 INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler :
Expiring MessageGroup with correlationKey[123]
[PersonNameChanged [id=123], PersonMoved [id=123]]
This is just plainly wrong to shift shortcoming of messaging system to software/service-side using Spring-Integration It is also not a case for Spring Integration nor any framework. It also does not scale well & is not fault-tolerant
The core of this issue is to separate routing messages from business logic/sending messages
AFAIK only Kafka & Apache Artemis support JMSXGroup from JMS API looking at currently mature queue providers RabbitMQ does not have it but AMQP has is specified BUT again RabbitMq haven't ever implemented it despite of requests from community.
That single but also a very often and important case in enterprise architecture to ensure ordered sequential processing from many independent sources excludes RabbitMQ from further considerations as default messaging solution