Like dsillman2000 commented, I also found this answer could take more of an explanation.
After figuring this out based on the examples here, here is my extended example that works for me, with extracted variables and methods that (hopefully) clearly say what the individual parts are or do.
Dependencies: Mostly org.springframework.integration:spring-integration-sftp:2.6.6
package com.example.sftp.incoming;
import com.jcraft.jsch.ChannelSftp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.file.filters.AbstractFileListFilter;
import org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.sftp.dsl.Sftp;
import org.springframework.integration.sftp.dsl.SftpOutboundGatewaySpec;
import org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.messaging.Message;
import java.util.function.Consumer;
import java.util.function.Function;
@Configuration
public class SftpIncomingRecursiveConfiguration {
private static final String BASE_REMOTE_FOLDER_TO_GET_FILES_FROM = "/tmp/sftptest/";
private static final String REMOTE_FOLDER_PATH_AS_EXPRESSION = "'" + BASE_REMOTE_FOLDER_TO_GET_FILES_FROM + "'";
private static final String INBOUND_CHANNEL_NAME = "sftpGetInputChannel";
@Value("${demo.sftp.host}")
private String sftpHost;
@Value("${demo.sftp.user}")
private String sftpUser;
@Value("${demo.sftp.password}")
private String sftpPassword;
@Value("${demo.sftp.port}")
private String sftpPort;
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftpHost);
factory.setPort(Integer.parseInt(sftpPort));
factory.setUser(sftpUser);
factory.setPassword(sftpPassword);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
// poll for new files every 500ms
@InboundChannelAdapter(value = INBOUND_CHANNEL_NAME, poller = @Poller(fixedDelay = "500"))
public String filesForSftpGet() {
return BASE_REMOTE_FOLDER_TO_GET_FILES_FROM;
}
@Bean
public IntegrationFlow sftpGetFlow(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
return IntegrationFlows
.from(INBOUND_CHANNEL_NAME)
.handle(listRemoteFiles(sessionFactory))
.split()
.log(logTheFilePath())
.enrichHeaders(addAMessageHeader())
.log(logTheMessageHeaders())
.handle(getTheFile(sessionFactory))
.split(splitContentIntoLines())
.log(logTheFileContent())
.get();
}
private SftpOutboundGatewaySpec listRemoteFiles(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
return Sftp.outboundGateway(sessionFactory,
AbstractRemoteFileOutboundGateway.Command.LS, REMOTE_FOLDER_PATH_AS_EXPRESSION)
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE, AbstractRemoteFileOutboundGateway.Option.NAME_ONLY)
.filter(onlyFilesWeHaveNotSeenYet())
.filter(onlyTxtFiles());
}
/* Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file.
Without it, the program would report the same file over and over again. */
private SftpPersistentAcceptOnceFileListFilter onlyFilesWeHaveNotSeenYet() {
return new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "keyPrefix");
}
private AbstractFileListFilter<ChannelSftp.LsEntry> onlyTxtFiles() {
return new AbstractFileListFilter<>() {
@Override
public boolean accept(ChannelSftp.LsEntry file) {
return file.getFilename().endsWith(".txt");
}
};
}
private Function<Message<Object>, Object> logTheFilePath() {
return message -> "### File path: " + message.getPayload();
}
private Consumer<HeaderEnricherSpec> addAMessageHeader() {
return headers -> headers.headerExpression("fileToRemove", REMOTE_FOLDER_PATH_AS_EXPRESSION + " + payload");
}
private Function<Message<Object>, Object> logTheMessageHeaders() {
return message -> "### Header file info: " + message.getHeaders();
}
private SftpOutboundGatewaySpec getTheFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
return Sftp.outboundGateway(sessionFactory, AbstractRemoteFileOutboundGateway.Command.GET,
REMOTE_FOLDER_PATH_AS_EXPRESSION + " + payload")
.options(AbstractRemoteFileOutboundGateway.Option.STREAM);
}
private FileSplitter splitContentIntoLines() {
return new FileSplitter();
}
private Function<Message<Object>, Object> logTheFileContent() {
return message -> "### File content line: '" + message.getPayload() + "'";
}
}
EDIT: Please note, there is a difference here. The other example uses a poller to generate the message with the remote file path from "filesForMGET" over and over again, and that message payload (file path) gets used as an argument to "ls". I'm hard-coding it here, ignoring the message content from the poller.