Make R2DBC use the same connection for all interactions with the database in a @Transactional method (Java, Sprint)
Asked Answered
L

1

2

Motivation

I have a service which I want to make @Transactional. My service is storing complex data in multiple tables, there is a referential integrity between the tables.

public class MyData { // simplified
    MasterData masterData;
    List<Detail> details;

    public static class MasterData {
        UUID id;
        String field1;
        String field2;
        String field3;
    }

    public static class Detail {
        UUID masterId;
        String fieldA;
        String fieldB;
    }
}
  1. First I save the master data into one table using R2dbcRepository<MasterData, UUID>. The INSERT command is simple and I may use the R2dbcRepository.
  2. Then a list of details into another table using DatabaseClient. Each detail has a foreign key constraint to the master table. I want to use batch INSERT and I complete the SQL using more complex approach in DatabaseClient.

Problem

The problem is that I cannot save the detail data - I get the error

insert or update on table "detail" violates foreign key constraint

I suspect that the reason is that each SQL command is executed in a different connection so the master data are not yet visible when the details are stored.

Question

Is it really the root cause? How to make R2DBC always use the same connection across all the calls to the database inside one @Transactional service call, even if it goes via various instances of R2dbcRepository and DatabaseClient?

If the solution is completely wrong, how to correctly implement @Transactional in R2DBC?

I prefer calling all the INSERTs into the detail table in a batch.

Code

My (simplified) code looks like this:

@Service
public class MyService {
    private final MasterRepository masterRepository;
    private final DbConnector dbConnector;

    @Transactional
    public Mono<Void> saveMasterAndDetails(MyData data) {
        return Mono.just(data)
                .map(MyData::getMaster)
                .flatMap(masterRepository::insertMasterData)
                .thenReturn(data)

                .map(MyData::getDetails)
                .flatMap(dbConnector::insertDetails)
                .then()
                ;
    }
}

The code of MasterRepository is something like

import org.springframework.data.r2dbc.repository.R2dbcRepository;

public interface MasterRepository extends R2dbcRepository<MasterData, UUID> {
    @Query("""
            INSERT INTO master(id, col_1, col_2, col_3)
                VALUES (
                    :#{#masterData.id},
                    :#{#masterData.field1},
                    :#{#masterData.field2},
                    :#{#masterData.field3})
            """)
    Mono<Void> insertMasterData(MasterData masterData);
}

And the code of DbConnector is more complex - but maybe overly complex? There is still missing direct support for batches and prepared statements in DatabaseClient: spring-data-r2dbc-259, spring-framework-27229

import org.springframework.r2dbc.core.DatabaseClient;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Statement;

public class DbConnector {
    private final DatabaseClient databaseClient;

    public Mono<Integer> insertDetails(List<Detail> details) {
        // usingWhen() is the reactive analogy of "use-with-resources"
        return Flux.usingWhen(
                // "try(Create the resource)"
                databaseClient.getConnectionFactory().create(),

                // "{ the body }"
                connection -> {
                    final Statement statement = connection.createStatement("""
                            insert into detail (masterId, col_A, col_B)
                            values ($1, $2, $3)
                            """);
                    details.forEach(detail ->
                            statement
                                    .bind("$1", detail.getMasterId())
                                    .bind("$2", detail.getColA())
                                    .bind("$3", detail.getColB())
                                    .add()
                    );
                    return statement.execute();
                },

                // "finally close()"
                Connection::close)

        .flatMap(Result::getRowsUpdated)
        .reduce(0, Integer::sum);
    }
}
Lanciform answered 13/5, 2022 at 19:37 Comment(0)
L
0

TL;DR:

I replaced

return Flux.usingWhen(
    databaseClient.getConnectionFactory().create(),

    connection -> {
        statement = ... // prepare the statement
        return statement.execute();
    },

    Connection::close
)

with

return databaseClient.inConnection(connection -> {
    statement = ... // prepare the statement
    return statement.execute();
);

Detailed answer

I have discovered a method which I was not aware of: DatabaseConnection.inConnection(). The DatabaseConnection interface inherits it from ConnectionAccessor:

Execute a callback Function within a Connection scope. The function is responsible for creating a Mono. The connection is released after the Mono terminates (or the subscription is cancelled). Connection resources must not be passed outside of the Function closure, otherwise resources may get defunct.

I changed my code using the DatabaseClient and it seems that the SQL commands are executed in the same connection.

However, I would still like to understand it better. I am not sure, if I am just lucky now and if it can change with the next implementation. I still do not know how to have the full control over the connections and hence over the transactional code.

import org.springframework.r2dbc.core.DatabaseClient;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Statement;

public class DbConnector {
    private final DatabaseClient databaseClient;

    public Mono<Integer> insertDetails(List<Detail> details) {
        return databaseClient.inConnection(connection -> {
            final Statement statement = connection.createStatement("""
                insert into detail (masterId, col_A, col_B)
                values ($1, $2, $3)
                """);
            details.forEach(detail ->
                statement
                    .bind("$1", detail.getMasterId())
                    .bind("$2", detail.getColA())
                    .bind("$3", detail.getColB())
                    .add()
            );
            return Flux.from(statement.execute())
                .flatMap(Result::getRowsUpdated)
                .reduce(0, Integer::sum)
                ;
    });
}
Lanciform answered 16/5, 2022 at 6:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.