Expected behavior

Using reactive transaction management, when a commit step fails in the TransactionalOperator, then the commit failure exception should be propagated. This can be for example a ConcurrencyFailureException.

Actual behavior (Spring 5.3.9)

When a commit fails, the commit exception is logged & dropped in TransactionalOperatorImpl, and instead an IllegalTransactionStateException is propagated.

Practical case

When under high concurrency, our application is observing uninformative IllegalTransactionStateException like this:

org.springframework.transaction.IllegalTransactionStateException: Transaction is already completed - do not call commit or rollback more than once per transaction
    at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.rollback(AbstractReactiveTransactionManager.java:492)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoLift] :
    reactor.core.publisher.Mono.error
    org.springframework.transaction.reactive.AbstractReactiveTransactionManager.rollback(AbstractReactiveTransactionManager.java:492)
Error has been observed at the following site(s):
    |_          Mono.error ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.rollback(AbstractReactiveTransactionManager.java:492)
    |_     Mono.onErrorMap ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.rollbackOnException(TransactionalOperatorImpl.java:119)
    |_           Mono.then ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$2(TransactionalOperatorImpl.java:83)
    |_  Mono.onErrorResume ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$3(TransactionalOperatorImpl.java:83)
    |_        Mono.flatMap ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$transactional$4(TransactionalOperatorImpl.java:81)
    |_        Mono.flatMap ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.transactional(TransactionalOperatorImpl.java:75)
    |_   Mono.contextWrite ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.transactional(TransactionalOperatorImpl.java:85)
    |_   Mono.contextWrite ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.transactional(TransactionalOperatorImpl.java:86)

The above illegal transaction state seems to be overriding the propagation of this exception:

ERROR --- o.s.t.r.TransactionalOperatorImpl : Application exception overridden by rollback exception

java.lang.RuntimeException: Async resource cleanup failed after onComplete
    at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:533)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoLift] :
    reactor.core.publisher.Mono.usingWhen
    org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$3(TransactionalOperatorImpl.java:81)
Error has been observed at the following site(s):
    |_ Mono.usingWhen ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$3(TransactionalOperatorImpl.java:81)
Stack trace:
        at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:533)
        ...
        at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:99)
        at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:719)
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:984)
        ...
Caused by: org.springframework.dao.ConcurrencyFailureException: R2DBC commit; could not serialize access due to read/write dependencies among transactions;
    at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:218)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoLift] :
    reactor.core.publisher.Mono.onErrorMap
    org.springframework.r2dbc.connection.R2dbcTransactionManager.doCommit(R2dbcTransactionManager.java:279)
Error has been observed at the following site(s):
    |_    Mono.onErrorMap ⇢ at org.springframework.r2dbc.connection.R2dbcTransactionManager.doCommit(R2dbcTransactionManager.java:279)
    |_                    ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.lambda$processCommit$21(AbstractReactiveTransactionManager.java:445)
    |_         Mono.defer ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:439)
    |_          Mono.then ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:439)
    |_          Mono.then ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:448)
    |_          Mono.then ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:474)
    |_         Mono.error ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.lambda$processCommit$25(AbstractReactiveTransactionManager.java:480)
    |_          Mono.then ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.lambda$processCommit$25(AbstractReactiveTransactionManager.java:480)
    |_ Mono.onErrorResume ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:479)
    |_          Mono.then ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:480)
    |_                    ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.lambda$commit$20(AbstractReactiveTransactionManager.java:420)
    |_       Mono.flatMap ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.commit(AbstractReactiveTransactionManager.java:412)
Stack trace:
        at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:218)
        at org.springframework.r2dbc.connection.R2dbcTransactionManager.translateException(R2dbcTransactionManager.java:439)
        at org.springframework.r2dbc.connection.R2dbcTransactionManager.lambda$doCommit$9(R2dbcTransactionManager.java:279)
        ...

Investigation of the cause

The problem described above matches this warning on ReactiveTransactionManager#rollback: https://github.com/spring-projects/spring-framework/blob/fb7eea9757222ed52d679a8a93a61a6d9e0b5240/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java#L99-L102

Looking at the current implementation of TransactionalOperatorImpl, it looks like indeed this can happen; if transactionManager::commit emits an error then a rollback will be attempted by the onErrorResume: https://github.com/spring-projects/spring-framework/blob/fb7eea9757222ed52d679a8a93a61a6d9e0b5240/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java#L74-L84

Looks related to the discussion in https://github.com/spring-projects/spring-framework/pull/23562

Possible solution

The operators in TransactionalOperatorImpl#transactional could be restructured in such a way that rollbacks are not attempted after a failure to commit. Maybe this could be accomplished by moving the rollbackOnException to the asyncError parameter in the usingWhen. A similar approach should be applied in TransactionalOperatorImpl#execute.

Additionally, would it make sense to add the application exception as suppressed when the rollback in rollbackOnException emits an exception? This could make it easier to debug this and similar issues because currently the application exception is only linked when the rollback emits an TransactionSystemException, but not in other cases like IllegalTransactionStateException.

Comment From: jhoeller

@mp911de any advice on this?

Comment From: mp911de

The main difference between the imperative and the reactive transaction handling is that TransactionTemplate handles exceptions first and then falls back to commit when no exception has happened. TransactionalOperatorImpl handles the commit before rollbackOnException.

Reactor's usingWhen suggests this type of design. Moving onErrorResume to inside the usingWhen action closure should address the issue. asyncError doesn't consider errors from asyncComplete so moving error handling to inside of resourceClosure looks like an appropriate fix.

Comment From: AntonioMorales97

What is the status on this? This is a very standard use case and our current solution is basically a copy-paste of the TransactionalOperatorImpl where we avoid rollbacks for ConcurrencyFailureException.

Comment From: sdeleuze

I have asked a feedback on the latest version of the PR here and will have a look after Mark feedback.

Comment From: mp911de

This issue is related to #28968, I'll have another look as this PR contains already fixes that I started preparing for #28968

Comment From: AntonioMorales97

We had to implement our custom reactive transaction manager for this but when we upgraded to Spring Boot 3 and updated r2dbc it stopped working. For some reason the isolation level cannot be set/passed to r2dbc. This can be reproduced with the following test (the second exception is thrown):

@Transactional(isolation = Isolation.SERIALIZABLE)
public Mono<Void> runAndEnsureSerializable() {
    return Mono.deferContextual(ctx -> {
        var transactionContext = ctx.get(TransactionContext.class);

        if (transactionContext.getCurrentTransactionIsolationLevel() != Isolation.SERIALIZABLE.value()) {
            throw new RuntimeException("Expected isolation level to be SERIALIZABLE");
        }

        return db.inConnection(connection -> {
            if (connection.getTransactionIsolationLevel() != IsolationLevel.SERIALIZABLE) {
                throw new RuntimeException("Expected isolation level to be SERIALIZABLE");
            }

            return Mono.empty();
        });
    });
}

and the following configuration:

@Bean
public ConnectionFactory connectionFactory(PostgreSQLContainer postgres) {
    return new ConnectionPool(ConnectionPoolConfiguration.builder()
            .initialSize(0)
            .validationDepth(ValidationDepth.REMOTE)
            .connectionFactory(ConnectionFactories.get(ConnectionFactoryOptions.builder()
                    .option(DRIVER, "postgresql")
                    .option(HOST, postgres.getHost())
                    .option(PORT, postgres.getFirstMappedPort())
                    .option(USER, postgres.getUsername())
                    .option(PASSWORD, postgres.getPassword())
                    .option(DATABASE, postgres.getDatabaseName())
                    .build()))
            .customizer(builder -> builder.releaseHandler(Connection::rollbackTransaction))
            .build());
}

We are using:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.0.2</version>
</parent>
...
<dependency>
   <groupId>io.r2dbc</groupId>
   <artifactId>r2dbc-spi</artifactId>
   <version>1.0.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
    <version>1.0.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-pool</artifactId>
</dependency>

The problem seems to be that autocommit is switched in spring-r2dbc:6.0.4 but not in spring-r2dbc:5.3.24:

spring-r2dbc:6.0.4:

connectionMono.flatMap(con -> switchAutoCommitIfNecessary(con, transaction)
      .then(Mono.from(doBegin(definition, con)))
      .then(prepareTransactionalConnection(con, definition))

spring-r2dbc:5.3.24:

connectionMono.flatMap(con -> prepareTransactionalConnection(con, definition, transaction)
      .then(Mono.from(con.beginTransaction()))
      .then(prepareTransactionalConnection(con, definition))

Any ideas?

Comment From: sdeleuze

@AntonioMorales97 Please create a dedicated issue if that's another bug.

This issue will be handle via #27572 so I close it as a duplicate.