Expected behavior
Using reactive transaction management, when a commit step fails in the TransactionalOperator
, then the commit failure exception should be propagated. This can be for example a ConcurrencyFailureException
.
Actual behavior (Spring 5.3.9)
When a commit fails, the commit exception is logged & dropped in TransactionalOperatorImpl
, and instead an IllegalTransactionStateException
is propagated.
Practical case
When under high concurrency, our application is observing uninformative IllegalTransactionStateException
like this:
org.springframework.transaction.IllegalTransactionStateException: Transaction is already completed - do not call commit or rollback more than once per transaction
at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.rollback(AbstractReactiveTransactionManager.java:492)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoLift] :
reactor.core.publisher.Mono.error
org.springframework.transaction.reactive.AbstractReactiveTransactionManager.rollback(AbstractReactiveTransactionManager.java:492)
Error has been observed at the following site(s):
|_ Mono.error ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.rollback(AbstractReactiveTransactionManager.java:492)
|_ Mono.onErrorMap ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.rollbackOnException(TransactionalOperatorImpl.java:119)
|_ Mono.then ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$2(TransactionalOperatorImpl.java:83)
|_ Mono.onErrorResume ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$3(TransactionalOperatorImpl.java:83)
|_ Mono.flatMap ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$transactional$4(TransactionalOperatorImpl.java:81)
|_ Mono.flatMap ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.transactional(TransactionalOperatorImpl.java:75)
|_ Mono.contextWrite ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.transactional(TransactionalOperatorImpl.java:85)
|_ Mono.contextWrite ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.transactional(TransactionalOperatorImpl.java:86)
The above illegal transaction state seems to be overriding the propagation of this exception:
ERROR --- o.s.t.r.TransactionalOperatorImpl : Application exception overridden by rollback exception
java.lang.RuntimeException: Async resource cleanup failed after onComplete
at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:533)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoLift] :
reactor.core.publisher.Mono.usingWhen
org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$3(TransactionalOperatorImpl.java:81)
Error has been observed at the following site(s):
|_ Mono.usingWhen ⇢ at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$3(TransactionalOperatorImpl.java:81)
Stack trace:
at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:533)
...
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:99)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:719)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:984)
...
Caused by: org.springframework.dao.ConcurrencyFailureException: R2DBC commit; could not serialize access due to read/write dependencies among transactions;
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:218)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoLift] :
reactor.core.publisher.Mono.onErrorMap
org.springframework.r2dbc.connection.R2dbcTransactionManager.doCommit(R2dbcTransactionManager.java:279)
Error has been observed at the following site(s):
|_ Mono.onErrorMap ⇢ at org.springframework.r2dbc.connection.R2dbcTransactionManager.doCommit(R2dbcTransactionManager.java:279)
|_ ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.lambda$processCommit$21(AbstractReactiveTransactionManager.java:445)
|_ Mono.defer ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:439)
|_ Mono.then ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:439)
|_ Mono.then ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:448)
|_ Mono.then ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:474)
|_ Mono.error ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.lambda$processCommit$25(AbstractReactiveTransactionManager.java:480)
|_ Mono.then ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.lambda$processCommit$25(AbstractReactiveTransactionManager.java:480)
|_ Mono.onErrorResume ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:479)
|_ Mono.then ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:480)
|_ ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.lambda$commit$20(AbstractReactiveTransactionManager.java:420)
|_ Mono.flatMap ⇢ at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.commit(AbstractReactiveTransactionManager.java:412)
Stack trace:
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:218)
at org.springframework.r2dbc.connection.R2dbcTransactionManager.translateException(R2dbcTransactionManager.java:439)
at org.springframework.r2dbc.connection.R2dbcTransactionManager.lambda$doCommit$9(R2dbcTransactionManager.java:279)
...
Investigation of the cause
The problem described above matches this warning on ReactiveTransactionManager#rollback
:
https://github.com/spring-projects/spring-framework/blob/fb7eea9757222ed52d679a8a93a61a6d9e0b5240/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java#L99-L102
Looking at the current implementation of TransactionalOperatorImpl
, it looks like indeed this can happen; if transactionManager::commit
emits an error then a rollback will be attempted by the onErrorResume
:
https://github.com/spring-projects/spring-framework/blob/fb7eea9757222ed52d679a8a93a61a6d9e0b5240/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java#L74-L84
Looks related to the discussion in https://github.com/spring-projects/spring-framework/pull/23562
Possible solution
The operators in TransactionalOperatorImpl#transactional
could be restructured in such a way that rollbacks are not attempted after a failure to commit. Maybe this could be accomplished by moving the rollbackOnException
to the asyncError
parameter in the usingWhen
. A similar approach should be applied in TransactionalOperatorImpl#execute
.
Additionally, would it make sense to add the application exception as suppressed when the rollback in rollbackOnException
emits an exception? This could make it easier to debug this and similar issues because currently the application exception is only linked when the rollback emits an TransactionSystemException
, but not in other cases like IllegalTransactionStateException
.
Comment From: jhoeller
@mp911de any advice on this?
Comment From: mp911de
The main difference between the imperative and the reactive transaction handling is that TransactionTemplate
handles exceptions first and then falls back to commit
when no exception has happened. TransactionalOperatorImpl
handles the commit
before rollbackOnException
.
Reactor's usingWhen
suggests this type of design. Moving onErrorResume
to inside the usingWhen
action closure should address the issue. asyncError
doesn't consider errors from asyncComplete
so moving error handling to inside of resourceClosure
looks like an appropriate fix.
Comment From: AntonioMorales97
What is the status on this? This is a very standard use case and our current solution is basically a copy-paste of the TransactionalOperatorImpl
where we avoid rollbacks for ConcurrencyFailureException
.
Comment From: sdeleuze
I have asked a feedback on the latest version of the PR here and will have a look after Mark feedback.
Comment From: mp911de
This issue is related to #28968, I'll have another look as this PR contains already fixes that I started preparing for #28968
Comment From: AntonioMorales97
We had to implement our custom reactive transaction manager for this but when we upgraded to Spring Boot 3 and updated r2dbc it stopped working. For some reason the isolation level cannot be set/passed to r2dbc. This can be reproduced with the following test (the second exception is thrown):
@Transactional(isolation = Isolation.SERIALIZABLE)
public Mono<Void> runAndEnsureSerializable() {
return Mono.deferContextual(ctx -> {
var transactionContext = ctx.get(TransactionContext.class);
if (transactionContext.getCurrentTransactionIsolationLevel() != Isolation.SERIALIZABLE.value()) {
throw new RuntimeException("Expected isolation level to be SERIALIZABLE");
}
return db.inConnection(connection -> {
if (connection.getTransactionIsolationLevel() != IsolationLevel.SERIALIZABLE) {
throw new RuntimeException("Expected isolation level to be SERIALIZABLE");
}
return Mono.empty();
});
});
}
and the following configuration:
@Bean
public ConnectionFactory connectionFactory(PostgreSQLContainer postgres) {
return new ConnectionPool(ConnectionPoolConfiguration.builder()
.initialSize(0)
.validationDepth(ValidationDepth.REMOTE)
.connectionFactory(ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, "postgresql")
.option(HOST, postgres.getHost())
.option(PORT, postgres.getFirstMappedPort())
.option(USER, postgres.getUsername())
.option(PASSWORD, postgres.getPassword())
.option(DATABASE, postgres.getDatabaseName())
.build()))
.customizer(builder -> builder.releaseHandler(Connection::rollbackTransaction))
.build());
}
We are using:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.2</version>
</parent>
...
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
</dependency>
The problem seems to be that autocommit
is switched in spring-r2dbc:6.0.4 but not in spring-r2dbc:5.3.24:
spring-r2dbc:6.0.4:
connectionMono.flatMap(con -> switchAutoCommitIfNecessary(con, transaction)
.then(Mono.from(doBegin(definition, con)))
.then(prepareTransactionalConnection(con, definition))
spring-r2dbc:5.3.24:
connectionMono.flatMap(con -> prepareTransactionalConnection(con, definition, transaction)
.then(Mono.from(con.beginTransaction()))
.then(prepareTransactionalConnection(con, definition))
Any ideas?
Comment From: sdeleuze
@AntonioMorales97 Please create a dedicated issue if that's another bug.
This issue will be handle via #27572 so I close it as a duplicate.