Invoking non-blocking operations sequentially while consuming from a Flux including retries
Asked Answered
S

1

13

So my use-case is to consume messages from Kafka in a Spring Webflux application while programming in the reactive style using Project Reactor, and to perform a non-blocking operation for each message in the same order as the messages were received from Kafka. The system should also be able to recover on its own.

Here is the code snippet that is setup to consume from :

    Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
        KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
        return receiver.receive();
    });

    messages.map(this::transformToOutputFormat)
            .map(this::performAction)
            .flatMapSequential(receiverRecordMono -> receiverRecordMono)
            .doOnNext(record -> record.receiverOffset().acknowledge())
            .doOnError(error -> logger.error("Error receiving record", error))
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .subscribe();

As you can see, what I do is: take the message from Kafka, transform it into an object intended for a new destination, then send it to the destination, and then acknowledge the offset to mark the message as consumed and processed. It is critical to acknowledge the offset in the same order as the messages being consumed from Kafka so that we don't move the offset beyond messages that were not fully processed (including sending some data to the destination). Hence I'm using a flatMapSequential to ensure this.

For simplicity let's assume the transformToOutputFormat() method is an identity transform.

public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
    return record;
}

The performAction() method needs to do something over the network, say call an HTTP REST API. So the appropriate APIs return a Mono, which means the chain needs to be subscribed to. Also, I need the ReceiverRecord to be returned by this method so that the offset can be acknowledged in the flatMapSequential() operator above. Because I need the Mono subscribed to, I'm using flatMapSequential above. If not, I could have used a map instead.

public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
    return Mono.just(record)
            .flatMap(receiverRecord ->
                    HttpClient.create()
                            .port(3000)
                            .get()
                            .uri("/makeCall?data=" + receiverRecord.value().getData())
                            .responseContent()
                            .aggregate()
                            .asString()
            )
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .then(Mono.just(record));

I have two conflicting needs in this method: 1. Subscribe to the chain that makes the HTTP call 2. Return the ReceiverRecord

Using a flatMap() means my return type changes to a Mono. Using doOnNext() in the same place would retain the ReceiverRecord in the chain, but would not allow the HttpClient response to be subscribed to automatically.

I can't add .subscribe() after asString(), because I want to wait till the HTTP response is completely received before the offset is acknowledged.

I can't use .block() either since it runs on a parallel thread.

As a result, I need to cheat and return the record object from the method scope.

The other thing is that on a retry inside performAction it switches threads. Since flatMapSequential() eagerly subscribes to each Mono in the outer flux, this means that while acknowledgement of offsets can be guaranteed in order, we can't guarantee that the HTTP call in performAction will be performed in the same order.

So I have two questions.

  1. Is it possible to return record in a natural way rather than returning the method scope object?
  2. Is it possible to ensure that both the HTTP call as well as the offset acknowledgement are performed in the same order as the messages for which these operations are occurring?
Simonides answered 10/1, 2019 at 10:7 Comment(0)
S
11

Here is the solution I have come up with.

Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
    KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
    return receiver.receive();
});

messages.map(this::transformToOutputFormat)
        .delayUntil(this::performAction)
        .doOnNext(record -> record.receiverOffset().acknowledge())
        .doOnError(error -> logger.error("Error receiving record", error))
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
        .subscribe();

Instead of using flatMapSequential to subscribe to the performAction Mono and preserve sequence, what I've done instead is delayed the request for more messages from the Kafka receiver until the action is performed. This enables the one-at-a-time processing that I need.

As a result, performAction doesn't need to return a Mono of ReceiverRecord. I also simplified it to the following:

public Mono<String> performAction(ReceiverRecord<Integer, DataDocument> record) {
    HttpClient.create()
        .port(3000)
        .get()
        .uri("/makeCall?data=" + receiverRecord.value().getData())
        .responseContent()
        .aggregate()
        .asString()
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5));
}
Simonides answered 14/1, 2019 at 9:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.