I'm trying to find a workaround to a problem I described in an earlier question in which DataWeave isn't reading a CSV file that doesn't comply with RFC 4180. I decided I'd write a Java class that implements org.mule.api.lifecycle.Callable
that would read the InputStream
I had (which might be from a file or from an HTTP response), remove the lines that can't be processed (I know the implementation here pulls out too much), and pipe readable lines into a new stream. Unfortunately, I keep getting a 0 byte output from DataWeave and an exception.
edit: To clarify, this question could exist independently of the other question, but I have wondered how to do something like this before. It just so happens that I'm trying to solve this problem as a workaround solution for another problem.
Here's my CSV file:
Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,Something Weird",C,D
A,B,Something Else" Weird,D,
A,",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",C,D
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,
Here's my flow:
<flow name="nonrfcFlow">
<file:inbound-endpoint path="C:\tmp\" moveToPattern="nonrfc-read.csv" moveToDirectory="C:\tmp\" connector-ref="File-Configuration" responseTimeout="10000" mimeType="application/csv" metadata:id="a344bc19-5643-4bfb-b8c2-2994d7997c75" doc:name="File">
<file:filename-regex-filter pattern="nonrfc\.csv" caseSensitive="true"/>
</file:inbound-endpoint>
<flow-ref name="removequotedlines" doc:name="removequotedlines"/>
<dw:transform-message doc:name="Transform Message" metadata:id="5b01fcc4-2a1c-42fb-9cab-2defed9a1161">
<dw:set-payload><![CDATA[%dw 1.0
%input payload application/csv
%output application/json
---
payload
]]></dw:set-payload>
</dw:transform-message>
<file:outbound-endpoint path="C:\tmp" outputPattern="nonrfc-output.json" connector-ref="OutputFileConfiguration" responseTimeout="10000" doc:name="File"/>
</flow>
<sub-flow name="removequotedlines">
<component class="com.stackoverflow.removeLinesWithQuotes" doc:name="Remove Lines with Quotes"/>
</sub-flow>
Here's the Java file:
package com.stackoverflow;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import org.apache.log4j.Logger;
import org.jfree.data.io.CSV;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.transformer.types.ListDataType;
public class removeLinesWithQuotes implements Callable {
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
Logger logger = Logger.getLogger(removeLinesWithQuotes.class);
InputStream is = (InputStream) eventContext.getMessage().getPayload();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream();
pis.connect(pos);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(pos));
String line;
while ((line = br.readLine()) != null) {
if (!line.contains("\"")) {
bw.write(line);
logger.debug(line);
bw.write("\r\n");
bw.flush();
}
}
is.close();
//eventContext.getMessage().setPayload(pis); // doesn't work
//eventContext.getMessage().setPayload(pis, new ListDataType<CSV>(CSV.class, "application/csv")); // doesn't work
return pis; // doesn't work
}
}
According to the this answer, I should be able to return an InputStream
, which a PipedInputStream
is. If I comment out either the flow reference or the DataWeave, I get output -- respectively JSON with nonsense in it (because the DataWeave CSV parser behaves strangely when the RFC isn't followed) or a CSV with the rows removed. That is, independently, the Java class or the DataWeave component work, but not together.
Here's the output when the Java class is omitted from the flow:
[
{
"Column A": "A",
"Column B": "B",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "BB",
"Column C": "CCCC",
"Column D": "DDDDDDDD"
},
{
"Column A": "A",
"Column B": "BBB",
"Column C": "CCCCCCCCC",
"Column D": "DDDDDDDDDDDDDDDDDDDDDDDDDDD"
},
{
"Column A": "A",
"Column B": ",C,D\r\nA,B,Something Else",
"Column C": "D",
"Column D": ""
},
{
"Column A": "A",
"Column B": ",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",
"Column C": "C",
"Column D": "D "
},
{
"Column A": "A",
"Column B": "B",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "B",
"Column C": "C",
"Column D": ""
},
{
"Column A": "A",
"Column B": "B",
"Column C": "",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "B",
"Column C": "",
"Column D": ""
},
{
"Column A": "A",
"Column B": "",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "",
"Column C": "C",
"Column D": ""
},
{
"Column A": "A",
"Column B": "",
"Column C": "",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "",
"Column C": "",
"Column D": ""
},
{
"Column A": "",
"Column B": "B",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "",
"Column B": "B",
"Column C": "C",
"Column D": ""
},
{
"Column A": "",
"Column B": "B",
"Column C": "",
"Column D": "D"
},
{
"Column A": "",
"Column B": "B",
"Column C": "",
"Column D": ""
},
{
"Column A": "",
"Column B": "",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "",
"Column B": "",
"Column C": "C",
"Column D": ""
},
{
"Column A": "",
"Column B": "",
"Column C": "",
"Column D": "D"
},
{
"Column A": "",
"Column B": "",
"Column C": "",
"Column D": ""
}
]
And here's the output when the DataWeave is omitted:
Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,
Here's the exception:
INFO 2015-12-03 17:01:54,716 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.transport.file.FileConnector: Writing file to: C:\tmp\nonrfc-output.json
ERROR 2015-12-03 17:02:56,817 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
name=OutputFileConfiguration
lifecycle=start
this=6f3e19b3
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[file]
serviceOverrides=<none>
}
, name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler
Type : org.mule.api.transport.DispatchException
Code : MULE_ERROR--2
JavaDoc : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html
Payload : com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler@3e274324
********************************************************************************
Exception stack is:
1. Pipe broken (java.io.IOException)
java.io.PipedInputStream:-1 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
name=OutputFileConfiguration
lifecycle=start
this=6f3e19b3
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[file]
serviceOverrides=<none>
}
, name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler (org.mule.api.transport.DispatchException)
org.mule.transport.AbstractMessageDispatcher:117 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
********************************************************************************
Root Exception stack trace:
java.io.IOException: Pipe broken
at java.io.PipedInputStream.read(Unknown Source)
at java.io.PipedInputStream.read(Unknown Source)
at com.mulesoft.weave.reader.DefaultSeekableStream.readUntil(SeekableStream.scala:146)
at com.mulesoft.weave.reader.DefaultSeekableStream.delegate$lzycompute(SeekableStream.scala:153)
at com.mulesoft.weave.reader.DefaultSeekableStream.delegate(SeekableStream.scala:151)
at com.mulesoft.weave.reader.DefaultSeekableStream.seek(SeekableStream.scala:189)
at com.mulesoft.weave.reader.UTF8StreamSourceReader.seek(StreamSourceReader.scala:121)
at com.mulesoft.weave.reader.csv.parser.CSVParser.parse(CSVParser.scala:93)
at com.mulesoft.weave.reader.csv.parser.CSVParser.elementAt(CSVParser.scala:54)
at com.mulesoft.weave.reader.csv.parser.CSVParser.contains(CSVParser.scala:38)
at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.hasNext(CSVReader.scala:52)
at scala.collection.Iterator$class.toStream(Iterator.scala:1188)
at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.toStream(CSVReader.scala:50)
at com.mulesoft.weave.writer.json.JsonWriter.writeArray(JsonWriter.scala:156)
at com.mulesoft.weave.writer.json.JsonWriter.writeValue(JsonWriter.scala:137)
at com.mulesoft.weave.model.values.Value$class.write(Value.scala:31)
at com.mulesoft.weave.reader.csv.CSVRecordsValue.write(CSVReader.scala:47)
at com.mulesoft.weave.model.values.wrappers.DelegateValue$class.write(DelegateValue.scala:29)
at com.mulesoft.weave.engine.ast.variables.VariableReferenceNode.write(VariableReferenceNode.scala:9)
at com.mulesoft.weave.engine.Engine.internalExecute(Engine.scala:89)
at com.mulesoft.weave.engine.Engine.execute(Engine.scala:54)
at com.mulesoft.weave.engine.Engine.execute(Engine.scala:169)
at com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler.write(WeaveMessageProcessor.scala:159)
at org.mule.transport.file.FileMessageDispatcher.doDispatch(FileMessageDispatcher.java:75)
at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:107)
at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2686)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
at org.mule.processor.BlockingProcessorExecutor.executeNext(BlockingProcessorExecutor.java:94)
at org.mule.processor.BlockingProcessorExecutor.execute(BlockingProcessorExecutor.java:56)
at org.mule.interceptor.AbstractEnvelopeInterceptor.processBlocking(AbstractEnvelopeInterceptor.java:58)
at org.mule.processor.AbstractRequestResponseMessageProcessor.process(AbstractRequestResponseMessageProcessor.java:47)
at org.mule.processor.AsyncInterceptingMessageProcessor.processNextTimed(AsyncInterceptingMessageProcessor.java:123)
at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:208)
at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:201)
at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:16)
at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:35)
at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:22)
at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:30)
at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:14)
at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:67)
at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:44)
at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:50)
at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:40)
at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(IsolateCurrentTransactionInterceptor.java:41)
at org.mule.execution.ExternalTransactionInterceptor.execute(ExternalTransactionInterceptor.java:48)
at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:28)
at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:13)
at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:110)
at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:30)
at ...
********************************************************************************