Mule json-to-object-transformer and Dataweaver Tra

2019-07-31 14:10发布

问题:

I'm have an issue with a Mule Dataweave Transformer getting

INFO  2017-08-10 15:05:52,787 [amqpReceiver.02] org.mule.api.processor.LoggerMessageProcessor: Authorisation Changed Event received. authorisation id: 1
INFO  2017-08-10 15:05:57,844 [amqpReceiver.02] org.mule.api.processor.LoggerMessageProcessor: type before class com.mulesoft.weave.reader.ByteArraySeekableStream
INFO  2017-08-10 15:06:02,058 [amqpReceiver.02] org.mule.api.processor.LoggerMessageProcessor: type after: class com.mulesoft.weave.reader.ByteArraySeekableStream
INFO  2017-08-10 15:06:13,309 [amqpReceiver.02] org.mule.api.processor.LoggerMessageProcessor: Org Name :- Id:- Org Name 3
ERROR 2017-08-10 15:06:15,520 [amqpReceiver.02] org.mule.exception.RollbackMessagingExceptionStrategy: 
********************************************************************************
Message               : Exception while executing: 
{
^
Unexpected end-of-input at index 0 (line 1, position 1), expected JSON Value
Payload               : com.mulesoft.weave.reader.ByteArraySeekableStream@a622be8
Element               : /subscribe-to-changeFlow/processors/8 @ voa-009-mule-authorisation-search-api:test.xml:60 (Final Transform Message)
Element XML           : <dw:transform-message doc:name="Final Transform Message" metadata:id="fa7e7663-50bb-4675-a8c8-e8cb311946a9">
                        <dw:input-payload mimeType="application/json"></dw:input-payload>
                        <dw:set-payload resource="classpath:dwl/owner-authorisation-enrichment.dwl"></dw:set-payload>
                        </dw:transform-message>
--------------------------------------------------------------------------------
Root Exception stack trace:
com.mulesoft.weave.reader.json.JsonReaderException: Unexpected end-of-input at index 0 (line 1, position 1), expected JSON Value
    at com.mulesoft.weave.reader.json.JsonTokenizer.fail(JsonTokenizer.scala:193)
    at com.mulesoft.weave.reader.json.JsonTokenizer.readValue(JsonTokenizer.scala:49)
    at com.mulesoft.weave.reader.json.JsonTokenizer.tokens(JsonTokenizer.scala:16)
    at com.mulesoft.weave.reader.json.JsonReader.root(JsonReader.scala:17)
    at com.mulesoft.weave.mule.reader.ReusableReader.root(ReusableReader.scala:12)
    at com.mulesoft.weave.engine.EvaluationContext$InternalExecutionContext$$anonfun$3.apply(EvaluationContext.scala:106)
  • The flow receives an ID as the message payload
  • makes an HTTP call to get some JSON for the ID
  • does an initial Dataweave transform to transform JSON to a new JSON schema
  • in order to extract some values from the JSON I'm using json:json-to-object-transformer but I've put this in an enricher targeting a flowVar to allow conversion to a map and keep the original payload intact

        <enricher target="#[flowVars.ownerAuthorisationJsonMap]"  doc:name="Copy to JSON map"> 
             <json:json-to-object-transformer returnClass="java.util.HashMap" doc:name="JSON to Object"/>
        </enricher> 
    
  • I then call a subflow to get some further date to enrich later. The subflow only stores the result of an enricher in another flowVar

  • the 2nd Dataweave Transformer (gets exception) takes JSON schema output from the 1st Dataweave Transformer and enriches some missing values using data looked up by the subflow (the example shows one but there will be many in Map). This gets the exception.

However if I comment out the enricher with nested json-to-object-transformer

         <enricher target="#[flowVars.ownerAuthorisationJsonMap]"  doc:name="Copy to JSON map"> 
             <json:json-to-object-transformer returnClass="java.util.HashMap" doc:name="JSON to Object"/>
        </enricher> 

the 2nd dataweave works fine (but then I can't access the data in the JSON). It seems the json-to-object-transformer upsets things, but if I debug the payload type and java object id remains the same throughout.

I'm only using the json-to-object-transformer to allow MEL access to the JSON as seems to be advised as JSON-path is deprecated.

<?xml version="1.0" encoding="UTF-8"?>

<amqp:endpoint name="authorisationChangeQueueDef"
    queueName="authorisation-change-queue" queueDurable="true"
    exchangeName="authorisation-exchange" exchangeType="direct" exchangeDurable="true"
    connector-ref="AMQP_Connector"
    routingKey="authorisation-change-queue-routing-key" doc:name="authorisationChangeQueueDef">
       <properties>
            <spring:entry key="amqp-queue.x-dead-letter-exchange"    value="authorisation-exchange" />
            <spring:entry key="amqp-queue.x-dead-letter-routing-key" value="authorisation-change-queue-dead-routing-key" />
        </properties>
</amqp:endpoint>


<flow name="subscribe-to-changeFlow" processingStrategy="synchronous">

    <amqp:inbound-endpoint ref="authorisationChangeQueueDef" responseTimeout="10000" doc:name="authorisation-change-consumer" />

    <set-variable variableName="authorisationId" value="#[message.payloadAs(java.lang.String)]" doc:name="set authorisation id"/>

    <logger message="#['Authorisation Changed Event received. authorisation id: ' + flowVars.authorisationId]" level="INFO" doc:name="Logger"/>

    <http:request config-ref="Mule_Authorisation_Management_API_HTTP_Request_Configuration" path="#['/authorisation/' + flowVars.authorisationId]" method="GET" doc:name="HTTP REST call to Authorisation API"/>

    <dw:transform-message doc:name="Transform to basic owner-authorisation"  metadata:id="1bdd6f7e-6b4b-415b-819d-e88b9ff7f92b">
        <dw:input-payload mimeType="application/json"/>
        <dw:set-payload resource="dwl/owner-authorisation-skeleton.dwl"/>
    </dw:transform-message>

         <logger message="#['type before ' + message.dataType.type]" level="INFO" doc:name="Logger"/>

    <!-- Enricher targeting flowVar to keep original payload type intact -->            
    <enricher target="#[flowVars.ownerAuthorisationJsonMap]"  doc:name="Copy to JSON map"> 
        <json:json-to-object-transformer returnClass="java.util.HashMap" doc:name="JSON to Object"/>
    </enricher> 

         <logger message="#['type after: ' + message.dataType.type]" level="INFO" doc:name="Logger"/>

    <!--  subflow that uses flowVars.ownerAuthorisationJsonMap and MEL --> 
    <flow-ref name="enrich-Sub_Flow" doc:name="enrich-Sub_Flow"/>


    <dw:transform-message doc:name="Final Transform Message" metadata:id="fa7e7663-50bb-4675-a8c8-e8cb311946a9">
        <dw:input-payload mimeType="application/json"/>
        <dw:set-payload resource="classpath:dwl/owner-authorisation-enrichment.dwl"/>
    </dw:transform-message>


    <logger  level="INFO" doc:name="Logger all "/>


    <!-- Does a manual ACK to RabbitMQ, only do once we know all the processing is complete -->
    <amqp:acknowledge-message doc:name="AMQP-0-9 Acknowledge Message"/>


    <rollback-exception-strategy maxRedeliveryAttempts="3" doc:name="Rollback Exception Strategy">
        <logger message="#['REQUESTING RETRY OF REQUEST... delivery-tag : ' + message.inboundProperties['delivery-tag']]" level="INFO" doc:name="Logger"/>
        <!-- re-queue message for re-tries -->
        <amqp:reject-message requeue="true" doc:name="AMQP-0-9 Reject Message" />

        <on-redelivery-attempts-exceeded>
            <logger message="REDELIVERY EXHAUSTED" level="ERROR" doc:name="Logger"/>
            <amqp:reject-message doc:name="AMQP-0-9 Reject Message" />
        </on-redelivery-attempts-exceeded>           
    </rollback-exception-strategy>

</flow>

<sub-flow name="enrich-Sub_Flow">
    <set-variable variableName="organisationId" value="#[ownerAuthorisationJsonMap.organisationId]" doc:name="Extract Organisation Id" encoding="UTF-8" mimeType="application/json"/>

    <enricher target="#[flowVars.organisationName]" source="#[payload.organisationLatestDetail.organisationName]" doc:name="Message Enricher">  
        <processor-chain doc:name="Processor Chain">
            <flow-ref name="get-organisation-details-SubFlow" doc:name="get-organisation-details-Sub_Flow" />
           <json:json-to-object-transformer returnClass="java.util.HashMap" doc:name="JSON to Object"/>
           <logger level="INFO" message="Org Name :- Id:- #[payload.organisationLatestDetail.organisationName]" doc:name="Logger"/>
       </processor-chain>
    </enricher>
</sub-flow>

<sub-flow name="get-organisation-details-SubFlow">
    <http:request config-ref="VOA_Mule_Customer_Management_API_HTTP_Request_Configuration" path="#['/organisation?organisationId='+ flowVars.organisationId]" method="GET"
        doc:name="HTTP - Get Customer details">
    </http:request>
</sub-flow>

Dataweave 1

%dw 1.0
%output application/json
---
{
    authorisationId: payload.id,
    uarn: payload.uarn,
    authorisedParties: payload.parties map ((party , indexOfParty) -> {
        organisationId: party.authorisedPartyOrganisationId,
        status: party.authorisedPartyStatus,
        startDate: party.startDate
    }),
    endDate: payload.endDate,
    organisationId: payload.authorisationOwnerOrganisationId,
    propertyLinkSubmissionId: payload.submissionId,
    startDate: payload.startDate,
    status: payload.authorisationStatus
}

Dataweave 2 (this will have further mappings from a Java Map too)

%dw 1.0
%output application/json
---
{
    authorisationId: payload.authorisationId,
    authorisedParties: payload.authorisedParties map ((authorisedParty , indexOfAuthorisedParty) -> {
        organisationId: authorisedParty.organisationId,
        startDate: authorisedParty.startDate,
        status: authorisedParty.status
    }),
    endDate: payload.endDate,
    organisationId: payload.organisationId,
    organisationName: flowVars.organisationName,
    propertyLinkSubmissionId: payload.propertyLinkSubmissionId,
    startDate: payload.startDate,
    status: payload.status,
    uarn: payload.uarn
}

Logs

INFO  2017-08-10 15:05:52,787 [amqpReceiver.02] org.mule.api.processor.LoggerMessageProcessor: Authorisation Changed Event received. authorisation id: 1
INFO  2017-08-10 15:05:57,844 [amqpReceiver.02] org.mule.api.processor.LoggerMessageProcessor: type before class com.mulesoft.weave.reader.ByteArraySeekableStream
INFO  2017-08-10 15:06:02,058 [amqpReceiver.02] org.mule.api.processor.LoggerMessageProcessor: type after: class com.mulesoft.weave.reader.ByteArraySeekableStream
INFO  2017-08-10 15:06:13,309 [amqpReceiver.02] org.mule.api.processor.LoggerMessageProcessor: Org Name :- Id:- Org Name 3

回答1:

The issue is caused by the json payload stream being consumed by the json-to-object-transformer and not being reset, despite it being in a message enricher. See this issue - https://www.mulesoft.org/jira/browse/MULE-10623.

The simplest solutions are to reset the stream after the message enricher:

...
<enricher target="#[flowVars.myMar]" doc:name="Message Enricher"> 
    <json:json-to-object-transformer doc:name="JSON to Object"/>
</enricher>
<expression-component doc:name="Expression">
    <![CDATA[payload.seek(0);]]>
</expression-component>
...

or turn the payload to a string before using the message enricher:

...
<object-to-string-transformer doc:name="Object to String"/>
<enricher target="#[flowVars.myMar]"  doc:name="Message Enricher"> 
...

Just for reference I replicated the issue with this simpler flow:

<flow name="flow">

    <dw:transform-message doc:name="Transform Message">
        <dw:set-payload>
<![CDATA[%dw 1.0
%output application/json
---
{
    foo:0
}]]>
        </dw:set-payload>
    </dw:transform-message>
    <enricher target="#[flowVars.myVar]" doc:name="Message Enricher"> 
        <json:json-to-object-transformer doc:name="JSON to Object"/>
    </enricher>
    <dw:transform-message doc:name="Transform Message">
        <dw:set-payload>
<![CDATA[%dw 1.0
%output application/json
---
payload
]]> 
        </dw:set-payload>
    </dw:transform-message>
</flow>


标签: mule mule-el