Spring boot: 3.1.0 and 3.1.1 (SNAPSHOT) Database: PostgreSQL

@Service
public class TxTester implements ApplicationRunner {
    private final Logger logger = LoggerFactory.getLogger(TxTester.class);

    private final DatabaseClient databaseClient;
    private final TransactionalOperator rrOperator;
    private final TransactionalOperator nOperator;

    @Autowired
    public TxTester(DatabaseClient databaseClient, ReactiveTransactionManager transactionManager) {
        this.databaseClient = databaseClient;
        this.rrOperator = TransactionalOperator.create(transactionManager, new TransactionDefinition() {
            @Override
            public int getIsolationLevel() {
                return TransactionDefinition.ISOLATION_REPEATABLE_READ;
            }
        });
        this.nOperator = TransactionalOperator.create(transactionManager, new TransactionDefinition() {
            @Override
            public int getPropagationBehavior() {
                return TransactionDefinition.PROPAGATION_REQUIRES_NEW;
            }
        });
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        logger.info("Creating table");
        databaseClient.sql("CREATE TABLE IF NOT EXISTS person(id INT PRIMARY KEY, name VARCHAR)").then().block();
        logger.info("Adding sample record");
        databaseClient.sql("INSERT INTO person(id, name) VALUES (1, 'John') ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name").then().block();

        logger.info("Getting row with select for update");
        var name = databaseClient.sql("SELECT 1")
                .then()
                .doOnSuccess(x -> logger.info("Now updating row in different transaction"))
                .then(databaseClient.sql("UPDATE person SET name = 'Mark' WHERE id = 1").then().as(nOperator::transactional))
                .doOnSuccess(x -> logger.info("Fetching row with FOR UPDATE. Expecting to fail"))
                .then(databaseClient.sql("SELECT * FROM person WHERE id = 1 FOR UPDATE")
                        .map((row, metadata) -> row.get("name", String.class))
                        .one()
                )
                .as(rrOperator::transactional)
                .block();
        logger.info("Got name: {}. This should not happen!", name);
    }
}

Expected is something like (works in spring-boot 2.4.13):

Caused by: org.springframework.dao.ConcurrencyFailureException: executeMany; SQL [SELECT * FROM person WHERE id = 1 FOR UPDATE]; could not serialize access due to concurrent update; nested exception is io.r2dbc.postgresql.ExceptionFactory$PostgresqlRollbackException: [40001] could not serialize access due to concurrent update

I also investigated traffic with wireshark between PostgreSQL and my demo project, there is no isolation level information being set.

Comment From: sasavilic

After deeper investigation, I believe the problem is following:

In R2dbcTransactionManager we have:

@Override
    protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction,
            TransactionDefinition definition) throws TransactionException {

                        ....

            return connectionMono.flatMap(con -> switchAutoCommitIfNecessary(con, transaction)
                    .then(Mono.from(doBegin(definition, con)))
                    .then(prepareTransactionalConnection(con, definition))
                    .doOnSuccess(v -> {
                        txObject.getConnectionHolder().setTransactionActive(true);
                        Duration timeout = determineTimeout(definition);
                        if (!timeout.isNegative() && !timeout.isZero()) {
                            txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis());
                        }
                        // Bind the connection holder to the thread.
                        if (txObject.isNewConnectionHolder()) {
                            synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder());
                        }
                    }).thenReturn(con).onErrorResume(e -> {
                        if (txObject.isNewConnectionHolder()) {
                            return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory())
                                    .doOnTerminate(() -> txObject.setConnectionHolder(null, false))
                                    .then(Mono.error(e));
                        }
                        return Mono.error(e);
                    })).onErrorResume(e -> {
                        CannotCreateTransactionException ex = new CannotCreateTransactionException(
                                "Could not open R2DBC Connection for transaction", e);
                        return Mono.error(ex);
                    });

switchAutoCommitIfNecessary will call setAutoCommit(false) before calling doBegin(definition, con). setAutoCommit(false) actually starts connection and thus doBegin(definition, con) calling con.beginTransaction(transactionDefinition) is actually no-op.

I believe that calling setAutoCommit is not necessary any more, because based on Connection interface autoCommit is disabled by beginTransaction(definition) method.:

    /**
     * Begins a new transaction.  Calling this method disables {@link #isAutoCommit() auto-commit} mode.  Beginning the transaction may fail if the {@link TransactionDefinition} conflicts with the
     * connection configuration.
     *
     * @param definition attributes for the transaction.
     * @return a {@link Publisher} that indicates that the transaction is open
     * @since 0.9
     */
    Publisher<Void> beginTransaction(TransactionDefinition definition);

To prove my theory, I have created wrapper around ConnectionFactory and basically ignore setAutoCommit call:

@Component
@Aspect
public class FactoryWrapper {
    @Around("execution(* io.r2dbc.spi.ConnectionFactory.create())")
    public Object wrapConnection(ProceedingJoinPoint pjp) throws Throwable {
        Publisher<? extends Connection> publisher = (Publisher<? extends Connection>)pjp.proceed();
        return Mono.from(publisher)
                .map(GuardAutoCommit::new);
    }

    private static class GuardAutoCommit implements Connection, Wrapped<Connection> {
        private final Connection delegate;

        private GuardAutoCommit(Connection delegate) {
            this.delegate = delegate;
        }

        @Override
        public boolean isAutoCommit() {
            return delegate.isAutoCommit();
        }

        @Override
        public Publisher<Void> setAutoCommit(boolean autoCommit) {
            return Mono.empty();
        }

        ....

        @Override
        public Connection unwrap() {
            return delegate;
        }
    }
}

and with that, it works.

Comment From: jhoeller

Thanks for the report! We seem to have missed the implicit auto-commit handling in R2DBC 0.9/1.0 when doing the upgrade.

@mp911de I hope my corresponding changes make sense there. Since there is only cleanup step left, I've folded that logging/completion code into the release connection step itself.