Java Camel framework: Losing message body in proce

2020-07-24 05:00发布

问题:

All,

Here's a simple route:

 <route>
    <from uri="jetty://http://0.0.0.0:9090/myproject" />
    <setExchangePattern pattern="InOnly" />
    <process ref="JsonValidator"/> 
    <unmarshal>
       <json library="Jackson" unmarshalTypeName="com.myproject.JsonPojo"/>
    </unmarshal>
    ...
 </route>

JsonValidator is a simple Java bean where I extend processor. Here, I want to make sure all the required fields are being passed in before I continue to the unmarshal call using Jackson to unmarshal the JSON to my POJO.

All I'm doing in that bean right now is just one line:

  public void process(Exchange exchange) throws Exception {
      String input = exchange.getIn().getBody(String.class);
  }

Simply calling exchange.getIn().getBody(String.class) results in the next (unmarshal) step in my route throwing an error saying there's nothing to unmarshal. As a matter of fact, I tested this by adding another processor after JsonValidator - in there, the exchange body isn't null but it's empty.

Here's the error:

com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
         at [Source: org.apache.camel.converter.stream.InputStreamCache@78f0a00a; line: 1, column: 1]
            at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:164)
            at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:2931)
            at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2873)
            at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2080)
            at org.apache.camel.component.jackson.JacksonDataFormat.unmarshal(JacksonDataFormat.java:105)
            at org.apache.camel.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:65)
            at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
            at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)
            at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
            at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
            at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
            at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
            at org.apache.camel.component.jetty.CamelContinuationServlet.service(CamelContinuationServlet.java:151)
            at javax.servlet.http.HttpServlet.service(HttpServlet.java:668)
            at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1496)
            at org.eclipse.jetty.servlets.MultiPartFilter.doFilter(MultiPartFilter.java:136)
            at org.apache.camel.component.jetty.CamelFilterWrapper.doFilter(CamelFilterWrapper.java:44)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1467)
            at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:499)
            at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
            at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
            at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
            at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
            at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
            at org.eclipse.jetty.server.Server.handle(Server.java:370)
            at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
            at org.eclipse.jetty.server.AbstractHttpConnection.content(AbstractHttpConnection.java:982)
            at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.content(AbstractHttpConnection.java:1043)
            at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:865)
            at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:240)
            at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
            at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
            at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
            at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
            at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
            at java.lang.Thread.run(Thread.java:744)

What am I missing?

回答1:

Streams can (normally) only be read once. After reading you must store the result in the exchange:

public void process(Exchange exchange) throws Exception {
    String input = exchange.getIn().getBody(String.class);
    exchange.getIn().setBody(input);
}

Alternatively, you may let Camel do the caching as described in the Camel documentation.