Mulesoft SFTP connector

2019-08-19 03:16发布

问题:

I am using Anypoint Studio to upsert product items from SFTP to Salesforce. I created a class mySftpMessageReceiver that extends sftpMessageReceiver and overrides the poll() function.

In the poll() function I wrote, I am creating an instance of the class JSchSFTPBrowser I wrote. It uses the JSch Library. In this class, there's a function that takes rootDirectory and fileName as parameters. It looks recursively in all subfolders of rootDirectory for files having name containing fileName and returns the paths in an array list of strings.

After calling this function in the overridden poll() function, I route files one by one.

And then a Salesforce Connector upsert products into Salesforce.

Below is my code:

Mule Application configuration XML:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw" xmlns:sfdc="http://www.mulesoft.org/schema/mule/sfdc" xmlns:sftp="http://www.mulesoft.org/schema/mule/sftp" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:spring="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/sftp http://www.mulesoft.org/schema/mule/sftp/current/mule-sftp.xsd
http://www.mulesoft.org/schema/mule/sfdc http://www.mulesoft.org/schema/mule/sfdc/current/mule-sfdc.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd">
    <sfdc:config name="Salesforce__Basic_Authentication" username="username" password="pass" securityToken="securitytoken" url="url" doc:name="Salesforce: Basic Authentication"/>
    <sftp:connector name="SFTPConfig" validateConnections="true" autoDelete="false" doc:name="SFTP">
            <service-overrides messageReceiver="my.package.MySftpMessageReceiver"/>
    </sftp:connector>
    <flow name="myFlow">
        <sftp:inbound-endpoint host="host" port="##"  user="user" password="pass" responseTimeout="10000" doc:name="SFTP" autoDelete="false" connector-ref="SFTPConfig" pollingFrequency="15000" path="/"/>
        <dw:transform-message doc:name="Transform Message" metadata:id="007db759-7ccf-44fd-b053-0a67709fd56b">
            <dw:input-payload mimeType="application/csv"/>
            <dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
payload map ((payload01 , indexOfPayload01) -> {
    Name: payload01.ITEM_DESC,
    Product_Number__c: payload01.ITEM_NUMBER,
    Primary_VPN__c: payload01.PRIMARY_VPN,
    Department_ID__c: payload01.DEPARTMENT_ID,
    Department_Name__c: payload01.DEPARTMENT_NAME,
    Class_ID__c: payload01.CLASS_ID,
    SubClass_ID__c: payload01.SUBCLASS_ID,
    SubClass_Name__c: payload01.SUBCLASS_NAME,
    Department_Brand__c: payload01.DEPARTMENT_BRAND,
    Gender__c: payload01.GENDER,
    Season__c: payload01.SEASON,
    Size__c: payload01.SIZE,
    Color__c: payload01.COLOR,
    Line__c: payload01.LINE,
    Sub_Line__c: payload01."SUB LINE",
    Supplier_Class__c: payload01."SUPPLIER CLASS",
    Supplier_SubClass__c: payload01."SUPPLIER SUBCLASS"
})]]></dw:set-payload>
        </dw:transform-message>
        <sfdc:upsert-bulk config-ref="Salesforce__Basic_Authentication" type="Product__c" externalIdFieldName="Product_Number__c" doc:name="Salesforce">
            <sfdc:objects ref="#[payload]"/>
        </sfdc:upsert-bulk>
    </flow>

</mule>

MySftpMessageReceiver.java

package my.package;

import org.mule.api.construct.FlowConstruct;
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.lifecycle.CreateException;
import org.mule.transport.sftp.SftpConnector;
import org.mule.transport.sftp.SftpMessageReceiver;

public class MySftpMessageReceiver extends SftpMessageReceiver {

    private JSchSFTPBrowser jschSftpBrowser = null;
    public MySftpMessageReceiver(SftpConnector connector, FlowConstruct flow, InboundEndpoint endpoint)
    throws CreateException {
        super(connector, flow, endpoint);
        jschSftpBrowser = new JSchSFTPBrowser(); 

    }

    public MySftpMessageReceiver(SftpConnector connector, FlowConstruct flow, InboundEndpoint endpoint,
        long frequency) throws CreateException {
        super(connector, flow, endpoint, frequency);
        jschSftpBrowser = new JSchSFTPBrowser(); 
    }

    @Override
    public void poll() throws Exception {

        //Get Paths               
            this.jschSftpBrowser.lookForFile("/Salesforce", "ITEM_MASTER");

        int count = jschSftpBrowser.getFilePaths().size();
        if (count == 0) {
            System.out.println("\nNo result found!");
        } else {
            System.out.println("\nFound file" + count + " times!\n");

            //route them
            for (String file: jschSftpBrowser.getFilePaths()) {
                routeFile(file);
            }
        }

        //jschSftpBrowser.disconnect();

    }
}

JSchSFTPBrowser.java

package my.package;

import java.util.ArrayList;
import java.util.List;
import java.util.Vector;

import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;

public class JSchSFTPBrowser {

    /* Properties, Getters & Setters */
    private String SFTPHOST = "host";
    private String SFTPUSER = "user";
    private String SFTPPASS = "pass";    
    private int SFTPPORT = ##;

    Session session = null;
    Channel channel = null;
    ChannelSftp sftpChannel = null;

    private List<String> filePaths;

    public List<String> getFilePaths(){
        return this.filePaths;
    }

    /* Constructor */
    public JSchSFTPBrowser() {

        filePaths = new ArrayList <String> ();

        try {
            JSch jsch = new JSch();
            session = jsch.getSession(SFTPUSER, SFTPHOST, SFTPPORT);
            session.setPassword(SFTPPASS);
            java.util.Properties config = new java.util.Properties();
            config.put("StrictHostKeyChecking", "no");
            session.setConfig(config);
            session.connect();
            channel = session.openChannel("sftp");
            channel.connect();
            sftpChannel = (ChannelSftp) channel;
        }

        catch (Exception ex) {
            ex.printStackTrace();
        }
    }


    /* Looks for file in all sub-folders of given directory */
    @SuppressWarnings("unchecked")
    public void lookForFile(String sourcePath, String fileName) throws SftpException {

        try {

            Vector<ChannelSftp.LsEntry> list = sftpChannel.ls(sourcePath); // List source directory structure.
            for (ChannelSftp.LsEntry oListItem : list) 
            { // Iterate objects in the list to get file/folder names.
                if (!oListItem.getAttrs().isDir())  { // If it is a file (not a directory).
                    if (oListItem.getFilename().contains(fileName)){
                            String path = sourcePath + "/" + oListItem.getFilename();
                            System.out.println("File found! Path: " + path);
                            filePaths.add(path);
                    }
                } 
                else if (!(".".equals(oListItem.getFilename()) || "..".equals(oListItem.getFilename()))){
                        lookForFile(sourcePath + "/" + oListItem.getFilename(), fileName); // Enter found folder on server to read its contents.
                }
            }

        } catch (Exception ex) {
            ex.printStackTrace();
        }       
    }

    public void disconnect() {      
        while(session != null){
            System.out.println("Killing the session");
            this.session.disconnect();
            System.exit(0);
        }
    }
}

I am getting the following error:

org.mule.module.launcher.DeploymentStartException: NullPointerException: 
    at org.mule.module.launcher.application.DefaultMuleApplication.start(DefaultMuleApplication.java:178) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.artifact.ArtifactWrapper$4.execute(ArtifactWrapper.java:106) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.artifact.ArtifactWrapper.executeWithinArtifactClassLoader(ArtifactWrapper.java:137) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.artifact.ArtifactWrapper.start(ArtifactWrapper.java:101) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.DefaultArtifactDeployer.deploy(DefaultArtifactDeployer.java:73) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.DefaultArchiveDeployer.deployArtifact(DefaultArchiveDeployer.java:547) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.DefaultArchiveDeployer.deployArtifact(DefaultArchiveDeployer.java:325) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.DefaultArchiveDeployer.deployPackagedArtifact(DefaultArchiveDeployer.java:185) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.DefaultArchiveDeployer.deployPackagedArtifact(DefaultArchiveDeployer.java:274) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.DefaultArchiveDeployer.deployPackagedArtifact(DefaultArchiveDeployer.java:80) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.DefaultArchiveDeployer.deployPackagedArtifact(DefaultArchiveDeployer.java:577) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.DeploymentDirectoryWatcher.deployPackedApps(DeploymentDirectoryWatcher.java:283) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.DeploymentDirectoryWatcher.start(DeploymentDirectoryWatcher.java:155) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.MuleDeploymentService.start(MuleDeploymentService.java:135) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.MuleContainer.start(MuleContainer.java:172) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    at org.mule.tooling.server.application.ApplicationDeployer.main(ApplicationDeployer.java:15) ~[tooling-support-3.9.1.jar:?]
Caused by: org.mule.api.lifecycle.LifecycleException: Failed to start inbound endpoint "endpoint.sftp.213.42.122.6.22"
    at org.mule.endpoint.DefaultInboundEndpoint.start(DefaultInboundEndpoint.java:119) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.api.lifecycle.LifecycleUtils.startIfNeeded(LifecycleUtils.java:92) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startMessageSource(DefaultMuleContext.java:391) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startPipelineMessageSources(DefaultMuleContext.java:382) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startMessageSources(DefaultMuleContext.java:345) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.start(DefaultMuleContext.java:329) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.application.DefaultMuleApplication.start(DefaultMuleApplication.java:146) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    ... 15 more
Caused by: org.mule.retry.RetryPolicyExhaustedException: Fail to connect
    at org.mule.retry.policies.AbstractPolicyTemplate.execute(AbstractPolicyTemplate.java:111) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.transport.sftp.SftpMessageReceiver.doConnect(SftpMessageReceiver.java:296) ~[mule-transport-sftp-3.9.1.jar:3.9.1]
    at org.mule.transport.AbstractMessageReceiver.connectHandler(AbstractMessageReceiver.java:448) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.transport.AbstractTransportMessageHandler.connect(AbstractTransportMessageHandler.java:217) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.transport.AbstractConnector.registerListener(AbstractConnector.java:1299) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.endpoint.DefaultInboundEndpoint.start(DefaultInboundEndpoint.java:105) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.api.lifecycle.LifecycleUtils.startIfNeeded(LifecycleUtils.java:92) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startMessageSource(DefaultMuleContext.java:391) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startPipelineMessageSources(DefaultMuleContext.java:382) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startMessageSources(DefaultMuleContext.java:345) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.start(DefaultMuleContext.java:329) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.application.DefaultMuleApplication.start(DefaultMuleApplication.java:146) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    ... 15 more
Caused by: java.lang.Exception: Fail to connect
    at org.mule.transport.sftp.SftpMessageReceiver$2.doWork(SftpMessageReceiver.java:322) ~[mule-transport-sftp-3.9.1.jar:3.9.1]
    at org.mule.retry.policies.AbstractPolicyTemplate.execute(AbstractPolicyTemplate.java:63) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.transport.sftp.SftpMessageReceiver.doConnect(SftpMessageReceiver.java:296) ~[mule-transport-sftp-3.9.1.jar:3.9.1]
    at org.mule.transport.AbstractMessageReceiver.connectHandler(AbstractMessageReceiver.java:448) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.transport.AbstractTransportMessageHandler.connect(AbstractTransportMessageHandler.java:217) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.transport.AbstractConnector.registerListener(AbstractConnector.java:1299) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.endpoint.DefaultInboundEndpoint.start(DefaultInboundEndpoint.java:105) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.api.lifecycle.LifecycleUtils.startIfNeeded(LifecycleUtils.java:92) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startMessageSource(DefaultMuleContext.java:391) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startPipelineMessageSources(DefaultMuleContext.java:382) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startMessageSources(DefaultMuleContext.java:345) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.start(DefaultMuleContext.java:329) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.application.DefaultMuleApplication.start(DefaultMuleApplication.java:146) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    ... 15 more
Caused by: java.lang.NullPointerException
    at org.mule.transport.sftp.SftpConnector.releaseClient(SftpConnector.java:269) ~[mule-transport-sftp-3.9.1.jar:3.9.1]
    at org.mule.transport.sftp.SftpReceiverRequesterUtil.checkSFTPConnection(SftpReceiverRequesterUtil.java:169) ~[mule-transport-sftp-3.9.1.jar:3.9.1]
    at org.mule.transport.sftp.SftpMessageReceiver$2.doWork(SftpMessageReceiver.java:307) ~[mule-transport-sftp-3.9.1.jar:3.9.1]
    at org.mule.retry.policies.AbstractPolicyTemplate.execute(AbstractPolicyTemplate.java:63) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.transport.sftp.SftpMessageReceiver.doConnect(SftpMessageReceiver.java:296) ~[mule-transport-sftp-3.9.1.jar:3.9.1]
    at org.mule.transport.AbstractMessageReceiver.connectHandler(AbstractMessageReceiver.java:448) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.transport.AbstractTransportMessageHandler.connect(AbstractTransportMessageHandler.java:217) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.transport.AbstractConnector.registerListener(AbstractConnector.java:1299) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.endpoint.DefaultInboundEndpoint.start(DefaultInboundEndpoint.java:105) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.api.lifecycle.LifecycleUtils.startIfNeeded(LifecycleUtils.java:92) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startMessageSource(DefaultMuleContext.java:391) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startPipelineMessageSources(DefaultMuleContext.java:382) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.startMessageSources(DefaultMuleContext.java:345) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.DefaultMuleContext.start(DefaultMuleContext.java:329) ~[mule-core-3.9.1.jar:3.9.1]
    at org.mule.module.launcher.application.DefaultMuleApplication.start(DefaultMuleApplication.java:146) ~[mule-module-launcher-3.9.1.jar:3.9.1]
    ... 15 more
INFO  2018-11-25 18:17:33,627 [main] org.mule.module.launcher.DeploymentDirectoryWatcher: 

The above is all the code of the integration. This was my first test and first exception. Any help to make this integration work is appreciated. Thanks!