Spring boot: 3.1.0 and 3.1.1 (SNAPSHOT) Database: PostgreSQL
@Service
public class TxTester implements ApplicationRunner {
private final Logger logger = LoggerFactory.getLogger(TxTester.class);
private final DatabaseClient databaseClient;
private final TransactionalOperator rrOperator;
private final TransactionalOperator nOperator;
@Autowired
public TxTester(DatabaseClient databaseClient, ReactiveTransactionManager transactionManager) {
this.databaseClient = databaseClient;
this.rrOperator = TransactionalOperator.create(transactionManager, new TransactionDefinition() {
@Override
public int getIsolationLevel() {
return TransactionDefinition.ISOLATION_REPEATABLE_READ;
}
});
this.nOperator = TransactionalOperator.create(transactionManager, new TransactionDefinition() {
@Override
public int getPropagationBehavior() {
return TransactionDefinition.PROPAGATION_REQUIRES_NEW;
}
});
}
@Override
public void run(ApplicationArguments args) throws Exception {
logger.info("Creating table");
databaseClient.sql("CREATE TABLE IF NOT EXISTS person(id INT PRIMARY KEY, name VARCHAR)").then().block();
logger.info("Adding sample record");
databaseClient.sql("INSERT INTO person(id, name) VALUES (1, 'John') ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name").then().block();
logger.info("Getting row with select for update");
var name = databaseClient.sql("SELECT 1")
.then()
.doOnSuccess(x -> logger.info("Now updating row in different transaction"))
.then(databaseClient.sql("UPDATE person SET name = 'Mark' WHERE id = 1").then().as(nOperator::transactional))
.doOnSuccess(x -> logger.info("Fetching row with FOR UPDATE. Expecting to fail"))
.then(databaseClient.sql("SELECT * FROM person WHERE id = 1 FOR UPDATE")
.map((row, metadata) -> row.get("name", String.class))
.one()
)
.as(rrOperator::transactional)
.block();
logger.info("Got name: {}. This should not happen!", name);
}
}
Expected is something like (works in spring-boot 2.4.13):
Caused by: org.springframework.dao.ConcurrencyFailureException: executeMany; SQL [SELECT * FROM person WHERE id = 1 FOR UPDATE]; could not serialize access due to concurrent update; nested exception is io.r2dbc.postgresql.ExceptionFactory$PostgresqlRollbackException: [40001] could not serialize access due to concurrent update
I also investigated traffic with wireshark between PostgreSQL and my demo project, there is no isolation level information being set.
Comment From: sasavilic
After deeper investigation, I believe the problem is following:
In R2dbcTransactionManager
we have:
@Override
protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction,
TransactionDefinition definition) throws TransactionException {
....
return connectionMono.flatMap(con -> switchAutoCommitIfNecessary(con, transaction)
.then(Mono.from(doBegin(definition, con)))
.then(prepareTransactionalConnection(con, definition))
.doOnSuccess(v -> {
txObject.getConnectionHolder().setTransactionActive(true);
Duration timeout = determineTimeout(definition);
if (!timeout.isNegative() && !timeout.isZero()) {
txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis());
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder());
}
}).thenReturn(con).onErrorResume(e -> {
if (txObject.isNewConnectionHolder()) {
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory())
.doOnTerminate(() -> txObject.setConnectionHolder(null, false))
.then(Mono.error(e));
}
return Mono.error(e);
})).onErrorResume(e -> {
CannotCreateTransactionException ex = new CannotCreateTransactionException(
"Could not open R2DBC Connection for transaction", e);
return Mono.error(ex);
});
switchAutoCommitIfNecessary
will call setAutoCommit(false)
before calling doBegin(definition, con)
. setAutoCommit(false)
actually starts connection and thus doBegin(definition, con)
calling con.beginTransaction(transactionDefinition)
is actually no-op.
I believe that calling setAutoCommit is not necessary any more, because based on Connection interface autoCommit is disabled by beginTransaction(definition) method.:
/**
* Begins a new transaction. Calling this method disables {@link #isAutoCommit() auto-commit} mode. Beginning the transaction may fail if the {@link TransactionDefinition} conflicts with the
* connection configuration.
*
* @param definition attributes for the transaction.
* @return a {@link Publisher} that indicates that the transaction is open
* @since 0.9
*/
Publisher<Void> beginTransaction(TransactionDefinition definition);
To prove my theory, I have created wrapper around ConnectionFactory and basically ignore setAutoCommit call:
@Component
@Aspect
public class FactoryWrapper {
@Around("execution(* io.r2dbc.spi.ConnectionFactory.create())")
public Object wrapConnection(ProceedingJoinPoint pjp) throws Throwable {
Publisher<? extends Connection> publisher = (Publisher<? extends Connection>)pjp.proceed();
return Mono.from(publisher)
.map(GuardAutoCommit::new);
}
private static class GuardAutoCommit implements Connection, Wrapped<Connection> {
private final Connection delegate;
private GuardAutoCommit(Connection delegate) {
this.delegate = delegate;
}
@Override
public boolean isAutoCommit() {
return delegate.isAutoCommit();
}
@Override
public Publisher<Void> setAutoCommit(boolean autoCommit) {
return Mono.empty();
}
....
@Override
public Connection unwrap() {
return delegate;
}
}
}
and with that, it works.
Comment From: jhoeller
Thanks for the report! We seem to have missed the implicit auto-commit handling in R2DBC 0.9/1.0 when doing the upgrade.
@mp911de I hope my corresponding changes make sense there. Since there is only cleanup step left, I've folded that logging/completion code into the release connection step itself.