I try to configure ActiveMQ for the following behavior: when broker exceeds its memory limit, it should store message in persistence storage. If use the following configuration:
BrokerService broker = new BrokerService();
broker.setBrokerName("activemq");
KahaDBPersistenceAdapter persistence = new KahaDBPersistenceAdapter();
persistence.setDirectory(new File(config.getProperty("amq.persistenceDir", "amq")));
broker.setPersistenceAdapter(persistence);
broker.setVmConnectorURI(new URI("vm://activemq"));
broker.getSystemUsage().getMemoryUsage().setLimit(64 * 1024 * 1024L);
broker.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 1024 * 100L);
broker.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 1024 * 100L);
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setCursorMemoryHighWaterMark(50);
policyEntry.setExpireMessagesPeriod(0L);
policyEntry.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy());
policyEntry.setMemoryLimit(64 * 1024 * 1024L);
policyEntry.setProducerFlowControl(false);
broker.setDestinationPolicy(new PolicyMap());
broker.getDestinationPolicy().setDefaultEntry(policyEntry);
broker.setUseJmx(true);
broker.setPersistent(true);
broker.start();
However, this does not work. ActiveMQ still consumes as much memory as needed to store the full queue. I also tried to remove PolicyEntry, that caused broker to stop producers after memory limit is reached. I could find nothing in documentation about what I am doing wrong.
we use a storeCursor and set the memory limit as follows...this will limit the amount of memory for all queues to 100MB...
make sure you set the "destinations" that your policy should apply against...in my XML examples this is done using
queue=">"
, but your example is using anew PolicyMap()
...try callingpolicyEntry.setQueue(">")
instead to apply to all queues or add specific destinations to your PolicyMap, etc.see this test for a full example...
https://github.com/apache/activemq/blob/master/activemq-unit-tests/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java