Calling methods in two different ReactiveMongoRepository's in a transaction using Spring Data MongoDB?
Asked Answered
R

2

6

When using the reactive programming model with Spring Data MongoDB it's possible to execute transactions like this:

Mono<DeleteResult> result = template.inTransaction()                                      
    .execute(action -> action.remove(query(where("id").is("step-1")), Step.class)); 

But Spring Data MongoDB also has support for "reactive repositories", for example:

public interface PersonRepository extends ReactiveMongoRepository<Person, String>

  Flux<Person> findByLocationNear(Point location, Distance distance);
}

and

public interface CarRepository extends ReactiveMongoRepository<Car, String>

  Flux<Car> findByYear(int year);
}

My question is, given that you have ReactiveMongoRepository's, can you somehow leverage MongoDB transactions and e.g. insert both a Person and Car in the same transaction (using PersonRepository and CarRepository in the case)? If so, how do you do this?

Robbert answered 29/5, 2019 at 11:57 Comment(4)
any help on this mate? @RobbertHorsy
Sorry but I don't recall that I found a solution to it.Robbert
luckily after messaging you I figured that out, will shortly share that with youHorsy
see the answer belowHorsy
H
12

I had also been trying hard to find solution for the Transactional support in Reactive style of Mongo DB & Spring Boot

But luckily I figured it myself. Though few of the things from google were also helpful but those were non reactive.

Important Note - For Spring boot 2.2.x it works well, but with spring boot 2.3.x it has some other issues, it has internal re-write & changes all together

  • You need to use ReactiveMongoTransactionManager along with ReactiveMongoDatabaseFactory, most of the details at the end, also sharing the code repo for the same

  • For getting the mongo db to support the Transactions we need to make sure that the DB should be running in replica mode.

    Why we need that? Because you will get some error like this otherwise:-

    Sessions are not supported by the MongoDB cluster to which this client is connected

The instructions for the same are below:-

  1. run the docker-compose based mongo db server using docker-compose.yml as shared below:-
version: "3"
services:
    mongo:
        hostname: mongo
        container_name: localmongo_docker
        image: mongo
        expose:
          - 27017
        ports:
          - 27017:27017
        restart: always
        entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
        volumes:
          - ./mongodata:/data/db # need to create a docker volume named as mongodata first

  1. After the image comes up, execute the command(here localmongo_docker is the name of the container):-
docker exec -it localmongo_docker mongo
  1. Copy and paste the command below and execute that
rs.initiate(
   {
     _id : 'rs0',
     members: [
       { _id : 0, host : "mongo:27017" }
     ]
   }
 )
  1. And then exit the execution by entering exit

Important - The code repo can be found here on my github - https://github.com/krnbr/mongo-spring-boot-template

Important notes for the code are as below:-

  • MongoConfiguration class in the config package is the important part to make the transactions working, link to the configuration class is here

  • Main part is the Bean

     @Bean
     ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
         return new ReactiveMongoTransactionManager(dbFactory);
     }
    
  • For checking the working of the code's Transactional requirement you may go through the class UserService in service package here

Code shared in case the links do not work for someone:-

The Configuration and inside the Beans

@Configuration
public class MongoConfiguration extends AbstractMongoClientConfiguration {

    @Autowired
    private MongoProperties mongoProperties;

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
        return new ReactiveMongoTransactionManager(dbFactory);
    }

    @Override
    protected String getDatabaseName() {
        return mongoProperties.getDatabase();
    }

    @Override
    public MongoClient mongoClient() {
        return MongoClients.create(mongoProperties.getUri());
    }
}

application.properties (related to mongo db)

spring.data.mongodb.database=mongo
spring.data.mongodb.uri=mongodb://localhost:27017/mongo?replicaSet=rs0

Document Classes

Role Class

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "roles")
@TypeAlias("role")
public class Role implements Persistable<String> {

    @Id
    private String id;

    @Field("role_name")
    @Indexed(unique = true)
    private String role;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}

User Class

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "users")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user")
public class User implements Persistable<String> {

    @Id()
    private String id;

    @Field("username")
    @Indexed(unique = true)
    @JsonProperty("username")
    private String userName;

    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    private String password;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @DBRef(lazy = true)
    @JsonProperty("roles")
    private List<Role> roles = new ArrayList();

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}

UserProfile Class

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "user_profiles")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user_profile")
public class UserProfile implements Persistable<String> {

    @Id
    private String id;

    @Indexed(unique = true)
    private String mobile;

    @Indexed(unique = true)
    private String email;

    private String address;

    private String firstName;

    private String lastName;

    @DBRef
    private User user;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }

}

ReactiveMongoRepository Interface(s)

RoleRepository

public interface RoleRepository extends ReactiveMongoRepository<Role, String> {

    Mono<Role> findByRole(String role);

    Flux<Role> findAllByRoleIn(List<String> roles);

}

UserRepository

public interface UserRepository extends ReactiveMongoRepository<User, String> {

    Mono<User> findByUserName(String userName);

}

UserProfileRepository

public interface UserProfileRepository extends ReactiveMongoRepository<UserProfile, String> {
}

The User Service Class Need to create your own RuntimeException Class here, here it is AppRuntimeException Class, I had been using

@Slf4j
@Service
public class UserService {

    @Autowired
    private RoleRepository roleRepository;

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private UserProfileRepository userProfileRepository;

    @Transactional
    public Mono<UserProfile> saveUserAndItsProfile(final UserRequest userRequest) {

        Mono<Role> roleMono = roleRepository.findByRole("USER");

        Mono<User> userMono = roleMono.flatMap(r -> {
            User user = new User()
                    .setUserName(userRequest.getUsername())
                    .setPassword(userRequest.getPassword());
            user.setRoles(Arrays.asList(r));
            return userRepository.save(user);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the username '"+userRequest.getUsername()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        Mono<UserProfile> userProfileMono = userMono.flatMap(u -> {
            UserProfile userProfile = new UserProfile()
                    .setAddress(userRequest.getAddress())
                    .setEmail(userRequest.getEmail())
                    .setMobile(userRequest.getMobile())
                    .setUser(u);
            return userProfileRepository.save(userProfile);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the profile mobile'"+userRequest.getMobile()+"' and/or - email '"+userRequest.getEmail()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        return userProfileMono;

    }

}

Controller and the Model Class

UserRequest Model Class

@Getter
@Setter
@Accessors(chain = true)
@Slf4j
@JsonInclude(JsonInclude.Include.NON_NULL)
public class UserRequest {

    private String username;
    private String password;
    private String mobile;
    private String email;
    private String address;
    private String firstName;
    private String lastName;

}

UserProfileApisController class

@Slf4j
@RestController
@RequestMapping("/apis/user/profile")
public class UserProfileApisController {

    @Autowired
    private UserService userService;

    @PostMapping
    public Mono<UserProfile> saveUserProfile(final @RequestBody UserRequest userRequest) {
        return userService.saveUserAndItsProfile(userRequest);
    }

}
Horsy answered 8/5, 2020 at 9:39 Comment(2)
@Robbert I am still in the process of doing the editing. But please go through the details I shared in the answerHorsy
Thanks @kakabali, but could you please include the gist of your code examples here? I think this is custom for stackoverflow answers since remote links may cease to exist.Robbert
A
2

Just an addition to the accepted answer regarding MongoDB replica set initialization.

  1. If one needs a non-fixed port single replica set for testing they might use the Testcontainers’ MongoDB Module that encapsulates such initialization:
final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.2.8");

We can start it via ‘mongoDBContainer.start()’ and stop it via try-with-resources or ‘mongoDBContainer.stop()’. See more details on this module and Spring Data MongoDB here.

  1. If one needs a non-fixed port multi-node replica set for testing complex production issues, they might use this project, for example:
try (
  //create a PSA mongoDbReplicaSet and auto-close it afterwards
  final MongoDbReplicaSet mongoDbReplicaSet = MongoDbReplicaSet.builder()
    //with 2 working nodes
    .replicaSetNumber(2)
    //with an arbiter node
    .addArbiter(true)
    //create a proxy for each node to simulate network partitioning
    .addToxiproxy(true)
    .build()
) {
  //start it
  mongoDbReplicaSet.start();
  assertNotNull(mongoDbReplicaSet.getReplicaSetUrl());
  //do some testing
}
Adna answered 30/7, 2020 at 10:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.