Spring WebFlux - how to get data from DB to use in the next step
Asked Answered
M

1

1

I use Spring WebFlux (Project Reactor) and I'm facing the following problem: I have to get some data from db to use them to call another service - everything in one stream. How to do that?

public Mono<MyObj> saveObj(Mono<MyObj> obj) {
    return obj
        .flatMap(
            ob->
                Mono.zip(
                        repo1.save(
                          ...),
                        repo2
                            .saveAll(...)
                            .collectList(),
                        repo3
                            .saveAll(...)
                            .collectList())
                    .map(this::createSpecificObject))
        .doOnNext(item-> createObjAndCallAnotherService(item));
  }



private void createObjAndCallAnotherService(Prot prot){
myRepository
        .findById(
            prot.getDomCred().stream()
                .filter(Objects::nonNull)
                .findFirst()
                .map(ConfDomCred::getCredId)
                .orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")))
        .doOnNext( //one value is returned from myRepository -> Flux<MyObjectWithNeededData>
            confCred-> {//from this point the code is unreachable!!! - why????
              Optional<ConfDomCred> confDomCred=
                  prot.getDomCreds().stream().filter(Objects::nonNull).findFirst();

              confDomCred.ifPresent(
                  domCred -> {
                    ProtComDto com=
                        ProtComDto.builder()
                            .userName(confCred.getUsername())
                            .password(confCred.getPassword())                          
                            .build();
                    clientApiToAnotherService.callEndpintInAnotherService(com); //this is a client like Feign that invokes method in another service
                  });
            });
}

UPDATE

When I invoke

    Flux<MyObj> myFlux =  myRepository
            .findById(
                prot.getDomCred().stream()
                    .filter(Objects::nonNull)
                    .findFirst()
                    .map(ConfDomCred::getCredId)
                    .orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")));

myFlux.subscribe(e -> e.getPassword()) 

then the value is printed

UPDATE2

So as a recap - I think the code below is asynchronous/non-blocking - am I right? In my

ProtectionCommandService

I had to use subscribe() twice - only then I can call my other service and store them my object: commandControllerApi.createNewCommand

public Mono<Protection> saveProtection(Mono<Protection> newProtection) {
    return newProtection.flatMap(
        protection ->
            Mono.zip(
                    protectorRepository.save(//some code),
                    domainCredentialRepository
                        .saveAll(//some code)
                        .collectList(),
                    protectionSetRepository
                        .saveAll(//some code)
                        .collectList())
                .map(this::createNewObjectWrapper)
                .doOnNext(protectionCommandService::createProtectionCommand));
  }

ProtectionCommandService class:

public class ProtectionCommandService {

  private final ProtectionCommandStrategyFactory protectionCommandFactory;
  private final CommandControllerApi commandControllerApi;

  public Mono<ProtectionObjectsWrapper> createProtectionCommand(
      ProtectionObjectsWrapper protection) {
    ProductType productType = protection.getProtector().getProductType();

    Optional<ProtectionCommandFactory> commandFactory = protectionCommandFactory.get(productType);

    commandFactory
        .get()
        .createCommandFromProtection(protection)
        .subscribe(command -> commandControllerApi.createNewCommand(command).subscribe());
    return Mono.just(protection);
  }
}

And one of 2 factories:

@Component
@AllArgsConstructor
@Slf4j
public class VmWareProtectionCommandFactory implements ProtectionCommandFactory {

  private static final Map<ProductType, CommandTypeEnum> productTypeToCommandType =
      ImmutableMap.of(...//some values);

  private final ConfigurationCredentialRepository configurationCredentialRepository;

  @Override
  public Mono<CommandDetails> createCommandFromProtection(ProtectionObjectsWrapper protection) {
    Optional<DomainCredential> domainCredential =
        protection.getDomainCredentials().stream().findFirst();

    return configurationCredentialRepository
        .findByOwnerAndId(protection.getOwner(), domainCredential.get().getCredentialId())
        .map(credential -> createCommand(protection, credential, domainCredential.get()));
  }

and createCommand method returns Mono object as a result of this factory.

private Mono<CommandDetails> createCommand(Protection protection
     //other parameters) {

    CommandDto commandDto =
        buildCommandDto(protection, confCredential, domainCredentials);

    String commands = JsonUtils.toJson(commandDto);
    CommandDetails details = new CommandDetails();
    details.setAgentId(protection.getProtector().getAgentId().toString());
    details.setCommandType(///some value);
    details.setArguments(//some value);
    return Mono.just(details);

UPDATE3

My main method that calls everything has been changed a little bit:

public Mono<MyObj> saveObj(Mono<MyObj> obj) {
    return obj
        .flatMap(
            ob->
                Mono.zip(
                        repo1.save(
                          ...),
                        repo2
                            .saveAll(...)
                            .collectList(),
                        repo3
                            .saveAll(...)
                            .collectList())
.map(this::wrapIntoAnotherObject)
.flatMap(protectionCommandService::createProtectionCommand)
.map(this::createMyObj));
Mitchmitchael answered 27/4, 2020 at 19:24 Comment(2)
you have several issues in your code, for example in your flatMap, you are performing a Mono.zip without returning anything, this breaks the reactive chain. Also you are performing reactive actions in a Void function. Void functions also break the reactive chain and don't return anything to trigger the continuation of the reactive chain, you should instead return a Mono<Void> using a Mono.empty() or such. You should also not not subscribe in your application, the calling client is the subscriber, your application is the producer.Humfrid
So How can I get value from DB and then use it later on in my code?Mitchmitchael
H
6

Stop breaking the chain

This is a pure function it returns something, and always returns the same something whatever we give it. It has no side effect.

public Mono<Integer> fooBar(int number) {
    return Mono.just(number);
}

we can call it and chain on, because it returns something.

foobar(5).flatMap(number -> { ... }).subscribe();

This is a non pure function, we can't chain on, we are breaking the chain. We can't subscribe, and nothing happens until we subscribe.

public void fooBar(int number) {
    Mono.just(number)
}

fooBar(5).subscribe(); // compiler error

but i want a void function, i want, i want i want.... wuuaaa wuaaaa

We always need something to be returned so that we can trigger the next part in the chain. How else would the program know when to run the next section? But lets say we want to ignore the return value and just trigger the next part. Well we can then return a Mono<Void>.

public Mono<Void> fooBar(int number) {
    System.out.println("Number: " + number);
    return Mono.empty();
}

foobar(5).subscribe(); // Will work we have not broken the chain

your example:

private void createObjAndCallAnotherService(Prot prot){
    myRepository.findById( ... ) // breaking the chain, no return
}

And some other tips:

  • Name your objects correctly not MyObj and saveObj, myRepository
  • Avoid long names createObjAndCallAnotherService
  • Follow single responsibility createObjAndCallAnotherService this is doing 2 things, hence the name.
  • Create private functions, or helper functions to make your code more readable don't inline everything.

UPDATE

You are still making the same misstake.

commandFactory // Here you are breaking the chain because you are ignoring the return type
    .get()
    .createCommandFromProtection(protection)
    .subscribe(command -> commandControllerApi.createNewCommand(command)
.subscribe()); // DONT SUBSCRIBE you are not the consumer, the client that initiated the call is the subscriber
return Mono.just(protection);

What you want to do is:

return commandFactory.get()
    .createCommandFrom(protection)
    .flatMap(command -> commandControllerApi.createNewCommand(command))
    .thenReturn(protection);

Stop breaking the chain, and don't subscribe unless your service is the final consumer, or the one initiating a call.

Humfrid answered 1/5, 2020 at 11:11 Comment(5)
Thomas Andolf can you please take a look at UPDATE2? There are 2 importantissues: I hav to fecth data from database cassandra and at the end call method in another service to store my newly created object (commandControllerApi.createNewCommand). I'm new with Webflux so I beg your indulgence:)Mitchmitchael
Dont subscribe! Your application is a producer. The consumer subscribes. Who is initiating the call? Your client, the webpage, postman, curl. They are subscribing. If you are subscribing you are basically listening to your own application. The simple rule is whomever started the request is the subscriber I have update my answer, you are still making the same misstake!Humfrid
Thank you for your answer:) BUT - i tested your solution. Unfortunately commandControllerApi.createNewCommand is not invoked and therefore no command is stored in the external service:( WHY? I have to invoke commandControllerApi.createNewCommand - its just a url request to another service and this method returns Mono<CommandDetails> My consumer is just an Angular website... I spent so much time on Webflux - I understand really much more thanks to your help but still not everything is 100% clear for meMitchmitchael
Thomas Andolf please take a look at my UPDATE - I added implementation of private Mono<CommandDetails> createCommand - maybe that's important...Mitchmitchael
Thomas Andolf - WORKS!!! Your solution works!!! I'm so sorry it was my fault, I had some stupid bug in my code, YOUR SOLUTION WORKS:) Thank youMitchmitchael

© 2022 - 2024 — McMap. All rights reserved.