How to mock WebFluxRequestExecutingMessageHandler with MockIntegrationContext.substituteMessageHandlerFor
Asked Answered
P

1

8

I have implemented an IntegrationFlow where I want to do to following tasks:

  1. Poll for files from a directory
  2. Transform the file content to a string
  3. Send the string via WebFluxRequestExecutingMessageHandler to a REST-Endpoint and use an AdviceChain to handle success and error responses

Implementation

@Configuration
@Slf4j
public class JsonToRestIntegration {

    @Autowired
    private LoadBalancerExchangeFilterFunction lbFunction;

    @Value("${json_folder}")
    private String jsonPath;

    @Value("${json_success_folder}")
    private String jsonSuccessPath;

    @Value("${json_error_folder}")
    private String jsonErrorPath;

    @Value("${rest-service-url}")
    private String restServiceUrl;

    @Bean
    public DirectChannel httpResponseChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel successChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel failureChannel() {
        return new DirectChannel();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(1000).get();
    }

   @Bean
public IntegrationFlow jsonFileToRestFlow() {
    return IntegrationFlows
            .from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))
            .transform(org.springframework.integration.file.dsl.Files.toStringTransformer())
            .enrichHeaders(s -> s.header("Content-Type", "application/json; charset=utf8"))
            .handle(reactiveOutbound())
            .log()
            .channel(httpResponseChannel())
            .get();
}

    @Bean
    public FileReadingMessageSource fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File(jsonPath));
        source.setFilter(new SimplePatternFileListFilter("*.json"));
        source.setUseWatchService(true);
        source.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);

        return source;
    }

    @Bean
    public MessageHandler reactiveOutbound() {
        WebClient webClient = WebClient.builder()
                .baseUrl("http://jsonservice")
                .filter(lbFunction)
                .build();

        WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler(restServiceUrl, webClient);

        handler.setHttpMethod(HttpMethod.POST);
        handler.setCharset(StandardCharsets.UTF_8.displayName());
        handler.setOutputChannel(httpResponseChannel());
        handler.setExpectedResponseType(String.class);
        handler.setAdviceChain(singletonList(expressionAdvice()));

        return handler;
    }

    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();

        advice.setTrapException(true);
        advice.setSuccessChannel(successChannel());
        advice.setOnSuccessExpressionString("payload + ' war erfolgreich'");
        advice.setFailureChannel(failureChannel());
        advice.setOnFailureExpressionString("payload + ' war nicht erfolgreich'");

        return advice;
    }

    @Bean
    public IntegrationFlow loggingFlow() {
        return IntegrationFlows.from(httpResponseChannel())
                .handle(message -> {
                    String originalFileName = (String) message.getHeaders().get(FileHeaders.FILENAME);
                    log.info("some log");
                })
                .get();
    }

    @Bean
    public IntegrationFlow successFlow() {
        return IntegrationFlows.from(successChannel())
                .handle(message -> {
                    MessageHeaders messageHeaders = ((AdviceMessage) message).getInputMessage().getHeaders();

                    File originalFile = (File) messageHeaders.get(ORIGINAL_FILE);
                    String originalFileName = (String) messageHeaders.get(FILENAME);

                    if (originalFile != null && originalFileName != null) {

                        File jsonSuccessFolder = new File(jsonSuccessPath);
                        File jsonSuccessFile = new File(jsonSuccessFolder, originalFileName);

                        try {
                            Files.move(originalFile.toPath(), jsonSuccessFile.toPath());
                        } catch (IOException e) {
                            log.error("some log", e);
                        }
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow failureFlow() {
        return IntegrationFlows.from(failureChannel())
                .handle(message -> {
                    Message<?> failedMessage = ((MessagingException) message.getPayload()).getFailedMessage();

                    if (failedMessage != null) {

                        File originalFile = (File) failedMessage.getHeaders().get(FileHeaders.ORIGINAL_FILE);
                        String originalFileName = (String) failedMessage.getHeaders().get(FileHeaders.FILENAME);

                        if (originalFile != null && originalFileName != null) {

                            File jsonErrorFolder = new File(tonisJsonErrorPath);
                            File jsonErrorFile = new File(jsonErrorFolder, originalFileName);

                            try {
                                Files.move(originalFile.toPath(), jsonErrorFile.toPath());
                            } catch (IOException e) {
                                log.error("some log", e);
                            }
                        }
                    }
                })
                .get();
    }
}

So far it seems to work in production. In the test I want to do the following steps:

  1. Copy JSON-Files to the input directory
  2. Start the polling for the json files
  3. Do assertions on the HTTP-Response from the WebFluxRequestExecutingMessageHandler which are routed through my advice chain

But I'm struggling in the test with the following tasks:

  1. Mocking the WebFluxRequestExecutingMessageHandler with the MockIntegrationContext.substituteMessageHandlerFor()-method
  2. Manually start the polling of the json files

Test

@RunWith(SpringRunner.class)
@SpringIntegrationTest()
@Import({JsonToRestIntegration.class})
@JsonTest
public class JsonToRestIntegrationTest {

    @Autowired
    public DirectChannel httpResponseChannel;

    @Value("${json_folder}")
    private String jsonPath;

    @Value("${json_success_folder}")
    private String jsonSuccessPath;

    @Value("${json_error_folder}")
    private String jsonErrorPath;

    @Autowired
    private MockIntegrationContext mockIntegrationContext;

    @Autowired
    private MessageHandler reactiveOutbound;

    @Before
    public void setUp() throws Exception {
        Files.createDirectories(Paths.get(jsonPath));
        Files.createDirectories(Paths.get(jsonSuccessPath));
        Files.createDirectories(Paths.get(jsonErrorPath));
    }

    @After
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(new File(jsonPath));
        FileUtils.deleteDirectory(new File(jsonSuccessPath));
        FileUtils.deleteDirectory(new File(jsonErrorPath));
    }

    @Test
    public void shouldSendJsonToRestEndpointAndReceiveOK() throws Exception {
        File jsonFile = new ClassPathResource("/test.json").getFile();
        Path targetFilePath = Paths.get(jsonPath + "/" + jsonFile.getName());
        Files.copy(jsonFile.toPath(), targetFilePath);

        httpResponseChannel.subscribe(httpResponseHandler());

        this.mockIntegrationContext.substituteMessageHandlerFor("", reactiveOutbound);
    }

    private MessageHandler httpResponseHandler() {
        return message -> Assert.assertThat(message.getPayload(), is(notNullValue()));
    }

    @Configuration
    @Import({JsonToRestIntegration.class})
    public static class JsonToRestIntegrationTest {

        @Autowired
        public MessageChannel httpResponseChannel;

        @Bean
        public MessageHandler reactiveOutbound() {
            ArgumentCaptor<Message<?>> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);

            MockMessageHandler mockMessageHandler = mockMessageHandler(messageArgumentCaptor).handleNextAndReply(m -> m);
            mockMessageHandler.setOutputChannel(httpResponseChannel);
            return mockMessageHandler;
        }

    }

}

Updated Working Example with mocked WebFluX web client:

Implementation

public class JsonToRestIntegration {

    private final LoadBalancerExchangeFilterFunction lbFunction;

    private final BatchConfigurationProperties batchConfigurationProperties;

    @Bean
    public DirectChannel httpResponseChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel errorChannel() {
        return new DirectChannel();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(100, TimeUnit.MILLISECONDS).get();
    }

    @Bean
    public IntegrationFlow jsonFileToRestFlow() {
        return IntegrationFlows
                .from(fileReadingMessageSource(),  e -> e.id("fileReadingEndpoint"))
                .transform(org.springframework.integration.file.dsl.Files.toStringTransformer("UTF-8"))
                .enrichHeaders(s -> s.header("Content-Type", "application/json; charset=utf8"))
                .handle(reactiveOutbound())
                .channel(httpResponseChannel())
                .get();
    }

    @Bean
    public FileReadingMessageSource fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File(batchConfigurationProperties.getJsonImportFolder()));
        source.setFilter(new SimplePatternFileListFilter("*.json"));
        source.setUseWatchService(true);
        source.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);

        return source;
    }

    @Bean
    public WebFluxRequestExecutingMessageHandler reactiveOutbound() {
        WebClient webClient = WebClient.builder()
                .baseUrl("http://service")
                .filter(lbFunction)
                .build();

        WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler(batchConfigurationProperties.getServiceUrl(), webClient);

        handler.setHttpMethod(HttpMethod.POST);
        handler.setCharset(StandardCharsets.UTF_8.displayName());
        handler.setOutputChannel(httpResponseChannel());
        handler.setExpectedResponseType(String.class);
        handler.setAdviceChain(singletonList(expressionAdvice()));

        return handler;
    }

    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();

        advice.setTrapException(true);
        advice.setFailureChannel(errorChannel());

        return advice;
    }

    @Bean
    public IntegrationFlow responseFlow() {
        return IntegrationFlows.from(httpResponseChannel())
                .handle(message -> {
                    MessageHeaders messageHeaders = message.getHeaders();
                    File originalFile = (File) messageHeaders.get(ORIGINAL_FILE);
                    String originalFileName = (String) messageHeaders.get(FILENAME);                        

                    if (originalFile != null && originalFileName != null) {

                        File jsonSuccessFolder = new File(batchConfigurationProperties.getJsonSuccessFolder());
                        File jsonSuccessFile = new File(jsonSuccessFolder, originalFileName);

                        try {
                            Files.move(originalFile.toPath(), jsonSuccessFile.toPath());
                        } catch (IOException e) {
                            log.error("Could not move file", e);
                        }
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow failureFlow() {
        return IntegrationFlows.from(errorChannel())
                .handle(message -> {
                    Message<?> failedMessage = ((MessagingException) message.getPayload()).getFailedMessage();

                    if (failedMessage != null) {

                        File originalFile = (File) failedMessage.getHeaders().get(ORIGINAL_FILE);
                        String originalFileName = (String) failedMessage.getHeaders().get(FILENAME);                            

                        if (originalFile != null && originalFileName != null) {

                            File jsonErrorFolder = new File(batchConfigurationProperties.getJsonErrorFolder());
                            File jsonErrorFile = new File(jsonErrorFolder, originalFileName);

                            try {
                                Files.move(originalFile.toPath(), jsonErrorFile.toPath());
                            } catch (IOException e) {
                                log.error("Could not move file", originalFileName, e);
                            }
                        }
                    }
                })
                .get();
    }
}

Test

@RunWith(SpringRunner.class)
@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")
@Import({JsonToRestIntegration.class, BatchConfigurationProperties.class})
@JsonTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class JsonToRestIntegrationIT {

    private static final FilenameFilter JSON_FILENAME_FILTER = (dir, name) -> name.endsWith(".json");

    @Autowired
    private BatchConfigurationProperties batchConfigurationProperties;

    @Autowired
    private ObjectMapper om;

    @Autowired
    private MessageHandler reactiveOutbound;

    @Autowired
    private DirectChannel httpResponseChannel;

    @Autowired
    private DirectChannel errorChannel;

    @Autowired
    private FileReadingMessageSource fileReadingMessageSource;

    @Autowired
    private SourcePollingChannelAdapter fileReadingEndpoint;

    @MockBean
    private LoadBalancerExchangeFilterFunction lbFunction;

    private String jsonImportPath;
    private String jsonSuccessPath;
    private String jsonErrorPath;

    @Before
    public void setUp() throws Exception {
        jsonImportPath = batchConfigurationProperties.getJsonImportFolder();
        jsonSuccessPath = batchConfigurationProperties.getJsonSuccessFolder();
        jsonErrorPath = batchConfigurationProperties.getJsonErrorFolder();

        Files.createDirectories(Paths.get(jsonImportPath));
        Files.createDirectories(Paths.get(jsonSuccessPath));
        Files.createDirectories(Paths.get(jsonErrorPath));
    }

    @After
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(new File(jsonImportPath));
        FileUtils.deleteDirectory(new File(jsonSuccessPath));
        FileUtils.deleteDirectory(new File(jsonErrorPath));
    }

    @Test
    public void shouldMoveJsonFileToSuccessFolderWhenHttpResponseIsOk() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);

        httpResponseChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                latch.countDown();
                super.postSend(message, channel, sent);
            }
        });
        errorChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                fail();
            }
        });

        ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
            response.setStatusCode(HttpStatus.OK);
            response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);

            DataBufferFactory bufferFactory = response.bufferFactory();

            String valueAsString = null;
            try {
                valueAsString = om.writeValueAsString(new ResponseDto("1"));
            } catch (JsonProcessingException e) {
                fail();
            }
            return response.writeWith(Mono.just(bufferFactory.wrap(valueAsString.getBytes())))
                    .then(Mono.defer(response::setComplete));

        });

        WebClient webClient = WebClient.builder()
                .clientConnector(httpConnector)
                .build();

        new DirectFieldAccessor(this.reactiveOutbound)
                .setPropertyValue("webClient", webClient);

        File jsonFile = new ClassPathResource("/test.json").getFile();
        Path targetFilePath = Paths.get(jsonImportPath + "/" + jsonFile.getName());
        Files.copy(jsonFile.toPath(), targetFilePath);

        fileReadingEndpoint.start();

        assertThat(latch.await(12, TimeUnit.SECONDS), is(true));

        File[] jsonImportFolder = new File(jsonImportPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonImportFolder, is(notNullValue()));
        assertThat(filesInJsonImportFolder.length, is(0));

        File[] filesInJsonSuccessFolder = new File(jsonSuccessPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonSuccessFolder, is(notNullValue()));
        assertThat(filesInJsonSuccessFolder.length, is(1));

        File[] filesInJsonErrorFolder = new File(jsonErrorPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonErrorFolder, is(notNullValue()));
        assertThat(filesInJsonErrorFolder.length, is(0));
    }

    @Test
    public void shouldMoveJsonFileToErrorFolderWhenHttpResponseIsNotOk() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);

        errorChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                latch.countDown();
                super.postSend(message, channel, sent);
            }
        });
        httpResponseChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                fail();
            }
        });

        ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
            response.setStatusCode(HttpStatus.BAD_REQUEST);
            response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);

            DataBufferFactory bufferFactory = response.bufferFactory();

            return response.writeWith(Mono.just(bufferFactory.wrap("SOME BAD REQUEST".getBytes())))
                    .then(Mono.defer(response::setComplete));

        });

        WebClient webClient = WebClient.builder()
                .clientConnector(httpConnector)
                .build();

        new DirectFieldAccessor(this.reactiveOutbound)
                .setPropertyValue("webClient", webClient);

        File jsonFile = new ClassPathResource("/error.json").getFile();
        Path targetFilePath = Paths.get(jsonImportPath + "/" + jsonFile.getName());
        Files.copy(jsonFile.toPath(), targetFilePath);

        fileReadingEndpoint.start();

        assertThat(latch.await(11, TimeUnit.SECONDS), is(true));

        File[] filesInJsonImportFolder = new File(jsonImportPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonImportFolder, is(notNullValue()));
        assertThat(filesInJsonImportFolder.length, is(0));

        File[] filesInJsonSuccessFolder = new File(jsonSuccessPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonSuccessFolder, is(notNullValue()));
        assertThat(filesInJsonSuccessFolder.length, is(0));

        File[] filesInJsonErrorFolder = new File(jsonErrorPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonErrorFolder, is(notNullValue()));
        assertThat(filesInJsonErrorFolder.length, is(1));
    }
}
Palaeozoic answered 31/8, 2018 at 11:24 Comment(0)
P
4
  this.mockIntegrationContext.substituteMessageHandlerFor("", reactiveOutbound);

The first parameter of this method is an endpoint id. (I guess we are just missing Javadocs there on those methods...).

So, what you need is something like this:

.handle(reactiveOutbound(), e -> e.id("webFluxEndpoint"))

And then in that test-case you do:

 this.mockIntegrationContext.substituteMessageHandlerFor("webFluxEndpoint", reactiveOutbound);

You don't need to override bean in the test class config. The MockMessageHandler can be just used in the test method body.

You poll files via .from(fileReadingMessageSource()). To do a manual control you need to have it stopped in the beginning. For this purpose you add an endpoint id again:

.from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))

And then in the test configuration you do this:

@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")

Another approach for the WebFlux would be via customized WebClient to mock request to the server. For example:

ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
        response.setStatusCode(HttpStatus.OK);
        response.getHeaders().setContentType(MediaType.TEXT_PLAIN);

        DataBufferFactory bufferFactory = response.bufferFactory();
        return response.writeWith(Mono.just(bufferFactory.wrap("FOO\nBAR\n".getBytes())))
                .then(Mono.defer(response::setComplete));
    });

    WebClient webClient = WebClient.builder()
            .clientConnector(httpConnector)
            .build();

    new DirectFieldAccessor(this.reactiveOutbound)
            .setPropertyValue("webClient", webClient);
Par answered 31/8, 2018 at 13:49 Comment(4)
Hi Artem, thanks for your help so far. I have updated my code and test in my question (see above) but It still doesn't work.Palaeozoic
I think the @JsonTest breaks everything. You don't need there fully blown application context, including a Spring Integration infrastructure. So, or add there @EnableIntegration or remove this @JsonTest slice.Par
Hi Artem, I managed to get it working. It wasn't the @JsonTest. Please see the updated code in my original post above. The following post was also helpful: [#50972105 But I'm still wondering about the poller configuration and the test execution time. Becase both tests need each at least 10 seconds: assertThat(latch.await(11, TimeUnit.SECONDS), is(true)); Any idea why?Palaeozoic
Not sure... If you could share a simple Spring Boot project somewhere on GitHub, I would take a look to determine the culprit.Par

© 2022 - 2024 — McMap. All rights reserved.