I have used spring-tx for a while. I just upgraded the version into 6.0.9. But some of my tests, which were passed before, about transaction are failed. So, I tried to find the reason.

In the below code, the test failed, but the exception type are not "NoSuchElementException". It is "NullPointerException".

This code is not what my real test, but it is just a sample code.

@DataR2dbcTest
class TestTransaction {
    @Autowired
    private lateinit var txManager: ReactiveTransactionManager

    @OptIn(ExperimentalCoroutinesApi::class)
    @Test
    fun test1() = runTest {
        TransactionalOperator.create(txManager).executeAndAwait {
            Mono.error<NoSuchElementException>(NoSuchElementException())
                .awaitSingle()
        }
    }
}
Cannot invoke "String.startsWith(String)" because the return value of "java.lang.Throwable.getMessage()" is null
java.lang.NullPointerException: Cannot invoke "String.startsWith(String)" because the return value of "java.lang.Throwable.getMessage()" is null
    at org.springframework.transaction.reactive.TransactionalOperatorImpl.unwrapIfResourceCleanupFailure(TransactionalOperatorImpl.java:117)
    at reactor.core.publisher.Flux.lambda$onErrorMap$27(Flux.java:7099)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:398)
    at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:475)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:250)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:324)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
    at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onComplete(FluxOnErrorReturn.java:169)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:250)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:324)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
    at reactor.pool.SimpleDequePool.maybeRecycleAndDrain(SimpleDequePool.java:531)

I found code was changed 2 months ago. And the below is what was added or modified.

//In "TransactionalOperatorImpl"
        @Override
    public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionException {
        return TransactionContextManager.currentContext().flatMapMany(context ->
            Flux.usingWhen(
                this.transactionManager.getReactiveTransaction(this.transactionDefinition),
                action::doInTransaction,
                this.transactionManager::commit,
                this::rollbackOnException,
                this.transactionManager::rollback)
            .onErrorMap(this::unwrapIfResourceCleanupFailure))
        .contextWrite(TransactionContextManager.getOrCreateContext())
        .contextWrite(TransactionContextManager.getOrCreateContextHolder());
    }

    private Throwable unwrapIfResourceCleanupFailure(Throwable ex) {
        if (ex instanceof RuntimeException &&
                ex.getCause() != null &&
                ex.getMessage().startsWith("Async resource cleanup failed")) {
            return ex.getCause();
        }
        return ex;
    }

But if I do not use awaitSingle like the below code, the test works what I have expected.

@DataR2dbcTest
class TestTransaction {
    @Autowired
    private lateinit var txManager: ReactiveTransactionManager

    @OptIn(ExperimentalCoroutinesApi::class)
    @Test
    fun test1() = runTest {
        TransactionalOperator.create(txManager).executeAndAwait {
            f1()
        }
    }

    suspend fun f1(){
        throw NoSuchElementException()
    }
}

Should I not use Mono.awaitSingle in ReactiveTransactionManager?

Comment From: 21lva

Duplicated. https://github.com/spring-projects/spring-framework/issues/30597

Comment From: 21lva

30597 is about ReactiveTransactionSupport. But the TransactionlOperatorImpl should also be fixed.

Comment From: sbrannen

Hi @21lva,

Congratulations on reporting your first issue for the Spring Framework! 👍

This has been fixed in 6.0.x and main and will be available in 6.0.11.