WSO2 CEP - Out of Memory

2019-06-25 17:36发布

We are seeing out of memory errors on CEP. The thread dump shows that there were around 32000 threads sleeping on monitor. Also even though CEP JVM options specifies to generate the HeapDump on outofmemory, we dont see any heap dump generated.. Please advice. (CEP JVM -Xms256m -Xmx1536m )

1) The Cassandra is disabled on this CEP
2) CEP version is 2.1.0
3) The CEP is fronted by WSO2 ESB (using BAM Mediator).
4) Apart from sending the actual payload data to CEP, the ESB is also sending a periodic heartbeat to CEP (every 15 ec).

5) We also have configured JMX Agent on ESB which is monitoring the CEP every 15 mins (cpu/memorythreads)
6) No heap dump found even though -XX:HeapDumpPath= parameter is specified

  • The CEP ran continously for 7 days before this OOM. After restart we observe that the thread count is steadily increasing at rate of approx 4000-5000 threads per day

CEP logs..

[2013-06-10 05:31:49,040] ERROR -  Thread Thread[ActiveMQ InactivityMonitor  WriteCheckTimer,5,main] died {org.apache.zookeeper.server.NIOServerCnxn}
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:640)
at  java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(ThreadPoolExecutor.java:727)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:657)
at org.apache.activemq.transport.AbstractInactivityMonitor.writeCheck(AbstractInactivityMonitor.java:153)
at org.apache.activemq.transport.AbstractInactivityMonitor$2.run(AbstractInactivityMonitor.java:117)
at org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
at java.util.TimerThread.mainLoop(Timer.java:512)
at java.util.TimerThread.run(Timer.java:462)
[2013-06-10 05:31:49,040] ERROR -  Thread Thread[ActiveMQ InactivityMonitor WriteCheckTimer,5,main] died {org.apache.zookeeper.server.NIOServerCnxn}
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:640)
at java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(ThreadPoolExecutor.java:727)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:657)
at org.apache.activemq.transport.AbstractInactivityMonitor.writeCheck(AbstractInactivityMonitor.java:153)
at org.apache.activemq.transport.AbstractInactivityMonitor$2.run(AbstractInactivityMonitor.java:117)
at org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
at java.util.TimerThread.mainLoop(Timer.java:512)
at java.util.TimerThread.run(Timer.java:462)

Some of Queries configured in CEP

<cep:query name="xxxBuildUpQuery">
<cep:expression><![CDATA[from  xxxCEPIntgDataStream[interfaceInformationInterfaceName=='xxx-xxx' or interfaceInformationInterfaceName=='xxx-xxx'or 
                               interfaceInformationInterfaceName=='xxx-xxx' or interfaceInformationInterfaceName=='xxx-xxx' or 
                               interfaceInformationInterfaceName=='xxx-xxx' or interfaceInformationInterfaceName=='xxxx-xxx' or 
                               interfaceInformationInterfaceName=='xxx-xxx' or 
                               interfaceInformationInterfaceName=='xxx-xxx' ]#window.time(60000)
insert into buildUpStream interfaceInformationInterfaceName, count(interfaceInformationxxxId) as noOfInflowMsgs group by interfaceInformationInterfaceName]]></cep:expression>
<cep:output brokerName="activemqJmsBroker" topic="xxxBuildUpInfoTopic">
  <cep:xmlMapping>
    <xxxAnalytics>
      <buildUpInfo>
        <interfaceName>{interfaceInformationInterfaceName}</interfaceName>
        <buildUpPerMin>{noOfInflowMsgs}</buildUpPerMin>
      </buildUpInfo>
    </xxxAnalytics>
  </cep:xmlMapping>
</cep:output>
</cep:query>
<cep:query name="xxxQueueDepthQuery">
<cep:expression><![CDATA[from xxxIntgrQueueDepthData_v1
insert into xxxIntgrQueueDepthStream flowName,appName, queueDepth]]>  </cep:expression>
<cep:output brokerName="activemqJmsBroker" topic="xxxIntgrQueueDepthTopic">
  <cep:xmlMapping>
    <xxxAnalytics>
      <queueDepthInfo>
        <flowName>{flowName}</flowName> 
        <appName>{appName}</appName>
        <depth>{queueDepth}</depth>
      </queueDepthInfo>
    </xxxAnalytics>
  </cep:xmlMapping>
</cep:output>
</cep:query>
<cep:query name="xxxClockDataQuery">
  <cep:expression><![CDATA[from testStream
insert into testOutClockDataStream AEDateTime]]></cep:expression>
  <cep:output brokerName="activemqJmsBroker" topic="xxxClockDataTopic">
    <cep:xmlMapping>
      <xxxClockFeed>
        <data>
          <XXDateTime>{XXDateTime}</XXDateTime>
        </data>
      </xxxClockFeed>
    </cep:xmlMapping>
  </cep:output>
 </cep:query>
 <cep:query name="xxxSimltrPaymntAvgQuery_1">
  <cep:expression><![CDATA[from xxxCEPIntgDataStream#window.time(15000)
    insert into xxxSimltrPymntAvgData avg(amount) as avgAmount, currency group by currency]]></cep:expression>
  <cep:output brokerName="activemqJmsBroker" topic="xxxAvgPaymntDetails">
    <cep:xmlMapping>
      <xxxAnalytics>
        <avgPaymentData>
          <avgAmount>{avgAmount}</avgAmount>
          <currency>{currency}</currency>
        </avgPaymentData>
      </xxxAnalytics>
    </cep:xmlMapping>
  </cep:output>

Thanks Rajiv Patil

2条回答
疯言疯语
2楼-- · 2019-06-25 17:45

Looks like timeBatch window is the issue, we had couple of queries using that... once removed the thread ramp up was arrested. Probably the timeBatch window feature of CEP needs more testing.

查看更多
看我几分像从前
3楼-- · 2019-06-25 18:06

I found that the Siddhi Manager initiates a scheduled thread pool with Integer.MAX_VALUE as core pool size. That's mean that every request will create a new thread, with no time out policy. (ref: ThreadPoolExecutor)

Until WSO2 fix this problem, you can change the size of this thread pool. P.e., in the class org.wso2.siddhi.core.SiddhiManager change the line:

this.siddhiContext.setScheduledExecutorService(Executors.newScheduledThreadPool(Integer.MAX_VALUE));

(line 77 in SiddhiManager ver 1.1.0-wso2v1)

to this one:

this.siddhiContext.setScheduledExecutorService(Executors.newScheduledThreadPool(100));

This change will create a core pool size of 100, a maximum pool size of Integer.MAX_VALUE, and the idle threads (over the core pool size) will be removed as soon as they finish.

查看更多
登录 后发表回答