Streaming from remote SFTP directories and sub-directories with Spring Integration
Asked Answered
G

3

2

I am using Spring Integration Streaming Inbound Channel Adapter, to get stream from remote SFTP and parse every lines of content process.

I use :

IntegrationFlows.from(Sftp.inboundStreamingAdapter(template)
                          .filter(remoteFileFilter)
                          .remoteDirectory("test_dir"),
                        e -> e.id("sftpInboundAdapter")
                              .autoStartup(true)
                              .poller(Pollers.fixedDelay(fetchInt)))
                .handle(Files.splitter(true, true))
....

And it can work now. But I can only get file from test_dir directory, but I need to recursively get files from this dir and sub-directory and parse every line.

I noticed that the Inbound Channel Adapter which is Sftp.inboundAdapter(sftpSessionFactory).scanner(...) . It can scan sub-directory. But I didn't see anything for Streaming Inbound Channel Adapter.

So, how can I implement the 'recursively get files from dir' in Streaming Inbound Channel Adapter?

Thanks.

Gupton answered 2/4, 2020 at 9:4 Comment(0)
Y
3

You can use a two outbound gateways - the first doing ls -R (recursive list); split the result and use a gateway configured with mget -stream to get each file.

EDIT

@SpringBootApplication
public class So60987851Application {

    public static void main(String[] args) {
        SpringApplication.run(So60987851Application.class, args);
    }

    @Bean
    IntegrationFlow flow(SessionFactory<LsEntry> csf) {
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5_000)))
                .handle(Sftp.outboundGateway(csf, Command.LS, "payload")
                        .options(Option.RECURSIVE, Option.NAME_ONLY)
                        // need a more robust metadata store for persistence, unless the files are removed
                        .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test")))
                .split()
                .log()
                .enrichHeaders(headers -> headers.headerExpression("fileToRemove", "'foo/' + payload"))
                .handle(Sftp.outboundGateway(csf, Command.GET, "'foo/' + payload")
                        .options(Option.STREAM))
                .split(new FileSplitter())
                .log()
                // instead of a filter, we can remove the remote file.
                // but needs some logic to wait until all lines read
//              .handle(Sftp.outboundGateway(csf, Command.RM, "headers['fileToRemove']"))
//              .log()
                .get();
    }

    @Bean
    CachingSessionFactory<LsEntry> csf(DefaultSftpSessionFactory sf) {
        return new CachingSessionFactory<>(sf);
    }

    @Bean
    DefaultSftpSessionFactory sf() {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost("10.0.0.8");
        sf.setUser("gpr");
        sf.setPrivateKey(new FileSystemResource(new File("/Users/grussell/.ssh/id_rsa")));
        sf.setAllowUnknownKeys(true);
        return sf;
    }

}
Yautia answered 2/4, 2020 at 14:14 Comment(2)
I am new to spring integration, is there any example for this?Gupton
I added an example to my answer.Yautia
P
1

It works for me, this is my full code

@Configuration
public class SftpIFConfig {

@InboundChannelAdapter(value = "sftpMgetInputChannel",
        poller = @Poller(fixedDelay = "5000"))
public String filesForMGET(){
    return "/upload/done";
}


@Bean
public IntegrationFlow sftpMGetFlow(SessionFactory<ChannelSftp.LsEntry> csf) {
    return IntegrationFlows.from("sftpMgetInputChannel")
            .handle(Sftp.outboundGateway(csf,
                            AbstractRemoteFileOutboundGateway.Command.LS, "payload")
                    .options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE,  AbstractRemoteFileOutboundGateway.Option.NAME_ONLY)
                    //Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file.
                    .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test")))
            .split()
            .log(message -> "file path -> "+message.getPayload())
            .enrichHeaders(headers -> headers.headerExpression("fileToRemove", "'/upload/done/' + payload"))
            .log(message -> "Heder file info -> "+message.getHeaders())
            .handle(Sftp.outboundGateway(csf, AbstractRemoteFileOutboundGateway.Command.GET, "'/upload/done/' + payload")
                    .options(AbstractRemoteFileOutboundGateway.Option.STREAM))
            .split(new FileSplitter())
            .log(message -> "File content -> "+message.getPayload())
            .get();
}

@Bean
CachingSessionFactory<ChannelSftp.LsEntry> csf(DefaultSftpSessionFactory sf) {
    return new CachingSessionFactory<>(sf);
}

@Bean
DefaultSftpSessionFactory sf() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
    factory.setHost("0.0.0.0");
    factory.setPort(2222);
    factory.setAllowUnknownKeys(true);
    factory.setUser("xxxx");
    factory.setPassword("xxx");
    return factory;
}
Procreant answered 30/10, 2021 at 20:22 Comment(1)
Please add more explanation as to why your code works such that it can be reproduced by the asker in their case.Protasis
S
0

Like dsillman2000 commented, I also found this answer could take more of an explanation.

After figuring this out based on the examples here, here is my extended example that works for me, with extracted variables and methods that (hopefully) clearly say what the individual parts are or do.

Dependencies: Mostly org.springframework.integration:spring-integration-sftp:2.6.6

package com.example.sftp.incoming;

import com.jcraft.jsch.ChannelSftp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.file.filters.AbstractFileListFilter;
import org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.sftp.dsl.Sftp;
import org.springframework.integration.sftp.dsl.SftpOutboundGatewaySpec;
import org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.messaging.Message;

import java.util.function.Consumer;
import java.util.function.Function;

@Configuration
public class SftpIncomingRecursiveConfiguration {

    private static final String BASE_REMOTE_FOLDER_TO_GET_FILES_FROM = "/tmp/sftptest/";
    private static final String REMOTE_FOLDER_PATH_AS_EXPRESSION = "'" + BASE_REMOTE_FOLDER_TO_GET_FILES_FROM + "'";

    private static final String INBOUND_CHANNEL_NAME = "sftpGetInputChannel";


    @Value("${demo.sftp.host}")
    private String sftpHost;

    @Value("${demo.sftp.user}")
    private String sftpUser;

    @Value("${demo.sftp.password}")
    private String sftpPassword;

    @Value("${demo.sftp.port}")
    private String sftpPort;

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(sftpHost);
        factory.setPort(Integer.parseInt(sftpPort));
        factory.setUser(sftpUser);
        factory.setPassword(sftpPassword);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }


    // poll for new files every 500ms
    @InboundChannelAdapter(value = INBOUND_CHANNEL_NAME, poller = @Poller(fixedDelay = "500"))
    public String filesForSftpGet() {
        return BASE_REMOTE_FOLDER_TO_GET_FILES_FROM;
    }

    @Bean
    public IntegrationFlow sftpGetFlow(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
        return IntegrationFlows
                .from(INBOUND_CHANNEL_NAME)
                .handle(listRemoteFiles(sessionFactory))
                .split()
                .log(logTheFilePath())
                .enrichHeaders(addAMessageHeader())
                .log(logTheMessageHeaders())
                .handle(getTheFile(sessionFactory))
                .split(splitContentIntoLines())
                .log(logTheFileContent())
                .get();
    }

    private SftpOutboundGatewaySpec listRemoteFiles(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
        return Sftp.outboundGateway(sessionFactory,
                        AbstractRemoteFileOutboundGateway.Command.LS, REMOTE_FOLDER_PATH_AS_EXPRESSION)
                .options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE, AbstractRemoteFileOutboundGateway.Option.NAME_ONLY)
                .filter(onlyFilesWeHaveNotSeenYet())
                .filter(onlyTxtFiles());
    }


    /* Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file.
    Without it, the program would report the same file over and over again. */
    private SftpPersistentAcceptOnceFileListFilter onlyFilesWeHaveNotSeenYet() {
        return new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "keyPrefix");
    }

    private AbstractFileListFilter<ChannelSftp.LsEntry> onlyTxtFiles() {
        return new AbstractFileListFilter<>() {
            @Override
            public boolean accept(ChannelSftp.LsEntry file) {
                return file.getFilename().endsWith(".txt");
            }
        };
    }

    private Function<Message<Object>, Object> logTheFilePath() {
        return message -> "### File path: " + message.getPayload();
    }

    private Consumer<HeaderEnricherSpec> addAMessageHeader() {
        return headers -> headers.headerExpression("fileToRemove", REMOTE_FOLDER_PATH_AS_EXPRESSION + " + payload");
    }

    private Function<Message<Object>, Object> logTheMessageHeaders() {
        return message -> "### Header file info: " + message.getHeaders();
    }

    private SftpOutboundGatewaySpec getTheFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
        return Sftp.outboundGateway(sessionFactory, AbstractRemoteFileOutboundGateway.Command.GET,
                        REMOTE_FOLDER_PATH_AS_EXPRESSION + " + payload")
                .options(AbstractRemoteFileOutboundGateway.Option.STREAM);
    }

    private FileSplitter splitContentIntoLines() {
        return new FileSplitter();
    }

    private Function<Message<Object>, Object> logTheFileContent() {
        return message -> "### File content line: '" + message.getPayload() + "'";
    }
}

EDIT: Please note, there is a difference here. The other example uses a poller to generate the message with the remote file path from "filesForMGET" over and over again, and that message payload (file path) gets used as an argument to "ls". I'm hard-coding it here, ignoring the message content from the poller.

Supraorbital answered 5/4, 2022 at 18:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.