ActiveMQ memory limit exceeded

2019-08-11 00:12发布

I try to configure ActiveMQ for the following behavior: when broker exceeds its memory limit, it should store message in persistence storage. If use the following configuration:

    BrokerService broker = new BrokerService();
    broker.setBrokerName("activemq");
    KahaDBPersistenceAdapter persistence = new KahaDBPersistenceAdapter();
    persistence.setDirectory(new File(config.getProperty("amq.persistenceDir", "amq")));
    broker.setPersistenceAdapter(persistence);
    broker.setVmConnectorURI(new URI("vm://activemq"));
    broker.getSystemUsage().getMemoryUsage().setLimit(64 * 1024 * 1024L);
    broker.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 1024 * 100L);
    broker.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 1024 * 100L);
    PolicyEntry policyEntry = new PolicyEntry();
    policyEntry.setCursorMemoryHighWaterMark(50);
    policyEntry.setExpireMessagesPeriod(0L);
    policyEntry.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy());
    policyEntry.setMemoryLimit(64 * 1024 * 1024L);
    policyEntry.setProducerFlowControl(false);
    broker.setDestinationPolicy(new PolicyMap());
    broker.getDestinationPolicy().setDefaultEntry(policyEntry);
    broker.setUseJmx(true);
    broker.setPersistent(true);
    broker.start();

However, this does not work. ActiveMQ still consumes as much memory as needed to store the full queue. I also tried to remove PolicyEntry, that caused broker to stop producers after memory limit is reached. I could find nothing in documentation about what I am doing wrong.

标签: activemq
1条回答
叼着烟拽天下
2楼-- · 2019-08-11 00:27

we use a storeCursor and set the memory limit as follows...this will limit the amount of memory for all queues to 100MB...

   <destinationPolicy>
        <policyMap>
            <policyEntries>
                <policyEntry queue=">" producerFlowControl="false" memoryLimit="100mb">
                    <pendingQueuePolicy>
                        <storeCursor/>
                    </pendingQueuePolicy>
                </policyEntry>
            </policyEntries>
        </policyMap>
    </destinationPolicy>

make sure you set the "destinations" that your policy should apply against...in my XML examples this is done using queue=">", but your example is using a new PolicyMap()...try calling policyEntry.setQueue(">") instead to apply to all queues or add specific destinations to your PolicyMap, etc.

see this test for a full example...

https://github.com/apache/activemq/blob/master/activemq-unit-tests/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java

查看更多
登录 后发表回答