Affects: 5.3.8
Problem:
- Reactive transaction when applied to coroutines doesn't pass the caller's
CoroutineContext
.
// in whatever caller.. start coroutine with custom context.
val user = User(role = "admin") // custom coroutine context
GlobalScope.launch(user) {
val hasUser1 = transaction1() // return false
val hasUser2 = transaction2() // return false, too.
}
@Transactional
suspend fun transaction1(): Boolean {
return coroutineContext[User] != null
}
suspend fun transaction2(): Boolean = transactionalOperator.executeAndAwait {
coroutineContext[User] != null
}
Cause:
- CoroutinesUtils#invokeSuspendingFunction and TransactionalOperatorExtension#executeAndAwait overwrites caller's coroutine context while converting coroutines into a
Publisher
by doing...:
mono(Dispatchers.Unconfined) { ... }
Fix:
- Passing the
CoroutineContext
of the caller may fix the problem.
// in `TransactionalOperatorExtension`
suspend fun <T : Any?> TransactionalOperator.executeAndAwait(f: suspend (ReactiveTransaction) -> T): T = {
val ctx = Dispatchers.Unconfined + coroutineContext.minusKey(Job) // combine the caller's context
return execute { status -> mono(ctx) { f(status) } }.map { value -> Optional.of(value!!) }
.defaultIfEmpty(Optional.empty()).awaitLast().orElse(null)
}
// in `CorotuinesUtils`
fun invokeSuspendingFunction(method: Method, target: Any, vararg args: Any?): Publisher<*> {
val function = method.kotlinFunction!!
val ctx = Dispatchers.Unconfined + ((args[args.size - 1] as Continuation<*>?)?.context?.minusKey(Job) ?: EmptyCoroutineContext) // combine the caller's context
val mono = mono(ctx) {
function.callSuspend(target, *args.sliceArray(0..(args.size-2))).let { if (it == Unit) null else it }
}.onErrorMap(InvocationTargetException::class.java) { it.targetException }
return if (function.returnType.classifier == Flow::class) {
mono.flatMapMany { (it as Flow<Any>).asFlux() }
}
else {
mono
}
}
Comment From: sbrannen
- Superseded by #27308