Mule callable that returns PipedInputStream to Dat

2019-08-10 14:18发布

问题:

I'm trying to find a workaround to a problem I described in an earlier question in which DataWeave isn't reading a CSV file that doesn't comply with RFC 4180. I decided I'd write a Java class that implements org.mule.api.lifecycle.Callable that would read the InputStream I had (which might be from a file or from an HTTP response), remove the lines that can't be processed (I know the implementation here pulls out too much), and pipe readable lines into a new stream. Unfortunately, I keep getting a 0 byte output from DataWeave and an exception.

edit: To clarify, this question could exist independently of the other question, but I have wondered how to do something like this before. It just so happens that I'm trying to solve this problem as a workaround solution for another problem.

Here's my CSV file:

Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,Something Weird",C,D
A,B,Something Else" Weird,D,
A,",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",C,D   
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,

Here's my flow:

<flow name="nonrfcFlow">
    <file:inbound-endpoint path="C:\tmp\" moveToPattern="nonrfc-read.csv" moveToDirectory="C:\tmp\" connector-ref="File-Configuration" responseTimeout="10000" mimeType="application/csv" metadata:id="a344bc19-5643-4bfb-b8c2-2994d7997c75" doc:name="File">
        <file:filename-regex-filter pattern="nonrfc\.csv" caseSensitive="true"/>
    </file:inbound-endpoint>
    <flow-ref name="removequotedlines" doc:name="removequotedlines"/>
    <dw:transform-message doc:name="Transform Message" metadata:id="5b01fcc4-2a1c-42fb-9cab-2defed9a1161">
        <dw:set-payload><![CDATA[%dw 1.0
%input payload application/csv
%output application/json
---
payload
]]></dw:set-payload>
    </dw:transform-message>
    <file:outbound-endpoint path="C:\tmp" outputPattern="nonrfc-output.json" connector-ref="OutputFileConfiguration" responseTimeout="10000" doc:name="File"/>
</flow>

<sub-flow name="removequotedlines">
    <component class="com.stackoverflow.removeLinesWithQuotes" doc:name="Remove Lines with Quotes"/>
</sub-flow>

Here's the Java file:

package com.stackoverflow;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import org.apache.log4j.Logger;
import org.jfree.data.io.CSV;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.transformer.types.ListDataType;

public class removeLinesWithQuotes implements Callable {

    @Override
    public Object onCall(MuleEventContext eventContext) throws Exception {
        Logger logger = Logger.getLogger(removeLinesWithQuotes.class);
        InputStream is = (InputStream) eventContext.getMessage().getPayload();
        BufferedReader br = new BufferedReader(new InputStreamReader(is));
        PipedOutputStream pos = new PipedOutputStream();
        PipedInputStream pis = new PipedInputStream();
        pis.connect(pos);
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(pos));
        String line;
        while ((line = br.readLine()) != null) {
            if (!line.contains("\"")) {
                bw.write(line);
                logger.debug(line);
                bw.write("\r\n");
                bw.flush();
            }
        }
        is.close();

        //eventContext.getMessage().setPayload(pis); // doesn't work
        //eventContext.getMessage().setPayload(pis, new ListDataType<CSV>(CSV.class, "application/csv")); // doesn't work
        return pis; // doesn't work
    }
}

According to the this answer, I should be able to return an InputStream, which a PipedInputStream is. If I comment out either the flow reference or the DataWeave, I get output -- respectively JSON with nonsense in it (because the DataWeave CSV parser behaves strangely when the RFC isn't followed) or a CSV with the rows removed. That is, independently, the Java class or the DataWeave component work, but not together.

Here's the output when the Java class is omitted from the flow:

[
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "BB",
    "Column C": "CCCC",
    "Column D": "DDDDDDDD"
  },
  {
    "Column A": "A",
    "Column B": "BBB",
    "Column C": "CCCCCCCCC",
    "Column D": "DDDDDDDDDDDDDDDDDDDDDDDDDDD"
  },
  {
    "Column A": "A",
    "Column B": ",C,D\r\nA,B,Something Else",
    "Column C": "D",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": ",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",
    "Column C": "C",
    "Column D": "D   "
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "",
    "Column D": ""
  }
]

And here's the output when the DataWeave is omitted:

Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,

Here's the exception:

INFO  2015-12-03 17:01:54,716 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.transport.file.FileConnector: Writing file to: C:\tmp\nonrfc-output.json
ERROR 2015-12-03 17:02:56,817 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.exception.DefaultMessagingExceptionStrategy: 
********************************************************************************
Message               : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
  name=OutputFileConfiguration
  lifecycle=start
  this=6f3e19b3
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[file]
  serviceOverrides=<none>
}
,  name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler
Type                  : org.mule.api.transport.DispatchException
Code                  : MULE_ERROR--2
JavaDoc               : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html
Payload               : com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler@3e274324
********************************************************************************
Exception stack is:
1. Pipe broken (java.io.IOException)
  java.io.PipedInputStream:-1 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
  name=OutputFileConfiguration
  lifecycle=start
  this=6f3e19b3
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[file]
  serviceOverrides=<none>
}
,  name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler (org.mule.api.transport.DispatchException)
  org.mule.transport.AbstractMessageDispatcher:117 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
********************************************************************************
Root Exception stack trace:
java.io.IOException: Pipe broken
    at java.io.PipedInputStream.read(Unknown Source)
    at java.io.PipedInputStream.read(Unknown Source)
    at com.mulesoft.weave.reader.DefaultSeekableStream.readUntil(SeekableStream.scala:146)
    at com.mulesoft.weave.reader.DefaultSeekableStream.delegate$lzycompute(SeekableStream.scala:153)
    at com.mulesoft.weave.reader.DefaultSeekableStream.delegate(SeekableStream.scala:151)
    at com.mulesoft.weave.reader.DefaultSeekableStream.seek(SeekableStream.scala:189)
    at com.mulesoft.weave.reader.UTF8StreamSourceReader.seek(StreamSourceReader.scala:121)
    at com.mulesoft.weave.reader.csv.parser.CSVParser.parse(CSVParser.scala:93)
    at com.mulesoft.weave.reader.csv.parser.CSVParser.elementAt(CSVParser.scala:54)
    at com.mulesoft.weave.reader.csv.parser.CSVParser.contains(CSVParser.scala:38)
    at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.hasNext(CSVReader.scala:52)
    at scala.collection.Iterator$class.toStream(Iterator.scala:1188)
    at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.toStream(CSVReader.scala:50)
    at com.mulesoft.weave.writer.json.JsonWriter.writeArray(JsonWriter.scala:156)
    at com.mulesoft.weave.writer.json.JsonWriter.writeValue(JsonWriter.scala:137)
    at com.mulesoft.weave.model.values.Value$class.write(Value.scala:31)
    at com.mulesoft.weave.reader.csv.CSVRecordsValue.write(CSVReader.scala:47)
    at com.mulesoft.weave.model.values.wrappers.DelegateValue$class.write(DelegateValue.scala:29)
    at com.mulesoft.weave.engine.ast.variables.VariableReferenceNode.write(VariableReferenceNode.scala:9)
    at com.mulesoft.weave.engine.Engine.internalExecute(Engine.scala:89)
    at com.mulesoft.weave.engine.Engine.execute(Engine.scala:54)
    at com.mulesoft.weave.engine.Engine.execute(Engine.scala:169)
    at com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler.write(WeaveMessageProcessor.scala:159)
    at org.mule.transport.file.FileMessageDispatcher.doDispatch(FileMessageDispatcher.java:75)
    at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:107)
    at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2686)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.processor.BlockingProcessorExecutor.executeNext(BlockingProcessorExecutor.java:94)
    at org.mule.processor.BlockingProcessorExecutor.execute(BlockingProcessorExecutor.java:56)
    at org.mule.interceptor.AbstractEnvelopeInterceptor.processBlocking(AbstractEnvelopeInterceptor.java:58)
    at org.mule.processor.AbstractRequestResponseMessageProcessor.process(AbstractRequestResponseMessageProcessor.java:47)
    at org.mule.processor.AsyncInterceptingMessageProcessor.processNextTimed(AsyncInterceptingMessageProcessor.java:123)
    at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:208)
    at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:201)
    at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:16)
    at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:35)
    at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:22)
    at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:30)
    at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:14)
    at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:67)
    at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:44)
    at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:50)
    at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:40)
    at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(IsolateCurrentTransactionInterceptor.java:41)
    at org.mule.execution.ExternalTransactionInterceptor.execute(ExternalTransactionInterceptor.java:48)
    at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:28)
    at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:13)
    at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:110)
    at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:30)
    at ...
********************************************************************************