to store a message in rabbitmq from mule

2019-06-03 08:14发布

问题:

Below is my flow and i'm trying to store my salesforce batch information into a queue in rabbit mq

<flow name="foreachsimilar_pmFlow1" doc:name="foreachsimilar_pmFlow1">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP"/>
        <sfdc:create-job config-ref="Salesforce1" type="HRISASI__c" operation="insert" doc:name="Salesforce"/>
        <set-variable variableName="batchID" value="1" doc:name="Variable"/>
        <set-property propertyName="jobInfo" value="#[payload]" doc:name="Property"/>
        <set-variable variableName="jobId" value="#[payload.id]" doc:name="Variable"/>
        <jdbc-ee:outbound-endpoint exchange-pattern="request-response" queryKey="id" queryTimeout="-1" connector-ref="Database1" doc:name="Database"/>
        <logger message="#[payload]" level="INFO" doc:name="Logger"/>
        <scripting:transformer doc:name="Groovy">
            <scripting:script engine="Groovy"><![CDATA[payload.collect { it.EmpId }.collate(3).collect { [min: it[0], max: it[-1]] }
                       ]]></scripting:script>
        </scripting:transformer>
        <foreach doc:name="For Each">
            <set-variable variableName="bulkPayload" value="#[groovy: return[];]" doc:name="bulkPayload EmptyArray"/>
            <set-variable variableName="IDs" value="#[groovy:return[];]" doc:name="Id EmptyArray"/>
            <set-variable variableName="jdbdinsbatch" value="#[groovy: return[];]" doc:name="Variable"/>
            <jdbc-ee:outbound-endpoint exchange-pattern="request-response" queryTimeout="-1" connector-ref="Database1" doc:name="Database" queryKey="all"/>
            <foreach doc:name="For Each">
            <set-variable variableName="empId" value="#[payload['EmpId']]" doc:name="Variable"/>
            <logger message="#[payload]" level="INFO" doc:name="Logger"/>
            <data-mapper:transform config-ref="map_to_hrisasi__c" doc:name="Map To HRISASI__c"/>
            <scripting:transformer doc:name="Groovy">
                <scripting:script engine="Groovy"><![CDATA[payloadMap = payload[0];
IDs = flowVars['IDs'];
IDs.add( [ EmpId: flowVars['EmpId'] ] );
flowVars['IDs'] = IDs;
jdbdinsbatch= flowVars['jdbdinsbatch'];
jdbdinsbatch.add( [ EmpId: flowVars['EmpId'], batchID: flowVars['batchID'] ] );
flowVars['jdbdinsbatch'] = jdbdinsbatch;
return [ payloadMap ];]]></scripting:script>
            </scripting:transformer>
            <set-variable variableName="bulkPayload" value="#[groovy: bulkPayload = flowVars['bulkPayload']; bulkPayload.add(payload[0]); return bulkPayload;]" doc:name="bulkPayload"/>
        </foreach>
        <set-payload value="#[flowVars['bulkPayload']]" doc:name="Set Payload"/>
        <sfdc:create-batch config-ref="Salesforce1"  doc:name="Salesforce">
                <sfdc:job-info ref="#[message.outboundProperties['jobInfo']]"/>
                <sfdc:objects ref="#[payload]"/>
            </sfdc:create-batch>
            <scripting:transformer doc:name="Groovy">
                <scripting:script engine="Groovy"><![CDATA[return [ batch: payload, IDs: flowVars['IDs'], batchid: flowVars['batchID'] ]]]></scripting:script>
            </scripting:transformer>
        <amqp:outbound-endpoint exchangeName="Salesforce-Batch" queueName="batchInfo" exchangeDurable="true" queueDurable="true" responseTimeout="10000" doc:name="AMQP"/>

         </foreach>
        <flow-ref name="foreachsimilar_pmFlow2" doc:name="Flow Reference"/>
    </flow>

The below exception is thrown

Message               : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=amqp://Salesforce-Batch/amqp-queue.batchInfo, connector=AmqpConnector
{
  name=AMQP_Connector
  lifecycle=start
  this=33982399
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[amqp]
  serviceOverrides=<none>
}
,  name='endpoint.amqp.Salesforce.Batch.amqp.queue.batchInfo', mep=ONE_WAY, properties={queueDurable=true, exchangeDurable=true}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: AmqpMessage
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. invalid value in table (java.lang.IllegalArgumentException)
  com.rabbitmq.client.impl.Frame:306 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=amqp://Salesforce-Batch/amqp-queue.batchInfo, connector=AmqpConnector
{
  name=AMQP_Connector
  lifecycle=start
  this=33982399
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[amqp]
  serviceOverrides=<none>
}
,  name='endpoint.amqp.Salesforce.Batch.amqp.queue.batchInfo', mep=ONE_WAY, properties={queueDurable=true, exchangeDurable=true}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: AmqpMessage (org.mule.api.transport.DispatchException)
  org.mule.transport.AbstractMessageDispatcher:109 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.IllegalArgumentException: invalid value in table
    at com.rabbitmq.client.impl.Frame.fieldValueSize(Frame.java:306)
    at com.rabbitmq.client.impl.Frame.tableSize(Frame.java:246)
    at com.rabbitmq.client.impl.ValueWriter.writeTable(ValueWriter.java:120)
    + 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)

Could somebody please tell as why is the above exception thrown and what should be done to resolve it struck with this for the past two days

Thank you in advance.

below is the stack trace after enabling verbose exception logging

********************************************************************************
Root Exception stack trace:
java.lang.IllegalArgumentException: invalid value in table
    at com.rabbitmq.client.impl.Frame.fieldValueSize(Frame.java:306)
    at com.rabbitmq.client.impl.Frame.tableSize(Frame.java:246)
    at com.rabbitmq.client.impl.ValueWriter.writeTable(ValueWriter.java:120)
    at com.rabbitmq.client.impl.ContentHeaderPropertyWriter.writeTable(ContentHeaderPropertyWriter.java:98)
    at com.rabbitmq.client.AMQP$BasicProperties.writePropertiesTo(AMQP.java:1782)
    at com.rabbitmq.client.impl.AMQContentHeader.writeTo(AMQContentHeader.java:51)
    at com.rabbitmq.client.impl.AMQContentHeader.toFrame(AMQContentHeader.java:78)
    at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:106)
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:316)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:292)
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:636)
    at org.mule.transport.amqp.AmqpMessageDispatcher$OutboundAction$1.run(AmqpMessageDispatcher.java:55)
    at org.mule.transport.amqp.AmqpMessageDispatcher.doOutboundAction(AmqpMessageDispatcher.java:172)
    at org.mule.transport.amqp.AmqpMessageDispatcher.doDispatch(AmqpMessageDispatcher.java:127)
    at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:99)
    at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2627)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:101)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.endpoint.outbound.OutboundResponsePropertiesMessageProcessor.process(OutboundResponsePropertiesMessageProcessor.java:39)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:50)
    at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:47)
    at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:20)
    at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:58)
    at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:48)
    at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:54)
    at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:44)
    at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(I...
********************************************************************************

回答1:

The payload you're trying to send to Rabbit looks quite complex (sfdc BatchInfo object). How do you except it to be readable for the queue consumer? I think that, for some reason, the message transformers involved in transforming this payload to a byte array suited for AMQP transport fails and you end up trying to send the groovy generated Map with an sfdc BatchInfo object as is to the AMQP client lib.

You should convert the map with contents to a string before sending it to Rabbit. Something like this (call toString on the BatchInfo object and then convert the map to JSON):

Edited (removed outbound properties)

<scripting:transformer doc:name="Groovy">
    <scripting:script engine="Groovy"><![CDATA[return [ batch: payload.toString(), IDs: flowVars['IDs'], batchid: flowVars['batchID'] ]]]></scripting:script>
</scripting:transformer>
<json:object-to-json-transformer doc:name="Object to JSON"/>
<amqp:outbound-endpoint exchangeName="Salesforce-Batch" queueName="batchInfo" exchangeDurable="true" queueDurable="true" responseTimeout="10000" doc:name="AMQP">
    <message-properties-transformer scope="outbound">
        <delete-message-property key="*" />
    </message-properties-transformer>
</amqp:outbound-endpoint>