Affects: 5.3.8


Problem:

  • Reactive transaction when applied to coroutines doesn't pass the caller's CoroutineContext.
// in whatever caller.. start coroutine with custom context.
val user = User(role = "admin") // custom coroutine context
GlobalScope.launch(user) {
    val hasUser1 = transaction1() // return false
    val hasUser2 = transaction2() // return false, too.
}

@Transactional
suspend fun transaction1(): Boolean {
    return coroutineContext[User] != null
}

suspend fun transaction2(): Boolean = transactionalOperator.executeAndAwait {
    coroutineContext[User] != null
} 

Cause:

mono(Dispatchers.Unconfined) { ... }

Fix:

  • Passing the CoroutineContext of the caller may fix the problem.
// in `TransactionalOperatorExtension`
suspend fun <T : Any?> TransactionalOperator.executeAndAwait(f: suspend (ReactiveTransaction) -> T): T = {
    val ctx = Dispatchers.Unconfined + coroutineContext.minusKey(Job) // combine the caller's context
    return execute { status -> mono(ctx) { f(status) } }.map { value -> Optional.of(value!!) }
        .defaultIfEmpty(Optional.empty()).awaitLast().orElse(null)
}

// in `CorotuinesUtils`
fun invokeSuspendingFunction(method: Method, target: Any, vararg args: Any?): Publisher<*> {
    val function = method.kotlinFunction!!
    val ctx = Dispatchers.Unconfined + ((args[args.size - 1] as Continuation<*>?)?.context?.minusKey(Job) ?: EmptyCoroutineContext) // combine the caller's context
    val mono = mono(ctx) {
        function.callSuspend(target, *args.sliceArray(0..(args.size-2))).let { if (it == Unit) null else it }
    }.onErrorMap(InvocationTargetException::class.java) { it.targetException }
    return if (function.returnType.classifier == Flow::class) {
        mono.flatMapMany { (it as Flow<Any>).asFlux() }
    }
    else {
        mono
    }
}

Comment From: sbrannen

  • Superseded by #27308