Affects: 3.2.2
It appears when custom coroutine contexts are applied prior to entering a WebClient filter, the custom contexts are no available inside the filter.
When running the below application we add a custom context in the controller which is found in the coroutine context. However when we attempt to fetch the custom context in the filter, it comes back null. You can see this by running the app and hitting
GET http://localhost:8080/test
package com.target.test
import io.netty.channel.ChannelOption
import io.netty.handler.timeout.WriteTimeoutHandler
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.withContext
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.http.ResponseEntity
import org.springframework.http.client.reactive.ReactorClientHttpConnector
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.reactive.function.client.*
import reactor.netty.http.client.HttpClient
import java.net.URI
import java.time.Duration
import java.util.concurrent.TimeUnit
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext
@SpringBootApplication
@ComponentScan(basePackages = ["com.target"])
class TestApplication
fun main(args: Array<String>) {
runApplication<TestApplication>(*args)
}
@Configuration
class WebClientConfig {
@Bean
@Suppress("unused")
fun webClient(
filterMissingCustomCoroutineContext: FilterMissingCustomCoroutineContext
): WebClient {
val client = WebClient.builder()
.clientConnector(ReactorClientHttpConnector(httpClient()))
.filter(filterMissingCustomCoroutineContext)
.build()
return client
}
private fun httpClient(): HttpClient {
return HttpClient.create()
.responseTimeout(Duration.ofMillis(10000))
.doOnConnected {
it.addHandlerFirst(WriteTimeoutHandler(10000, TimeUnit.MILLISECONDS))
}
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
}
}
@Component
class FilterMissingCustomCoroutineContext: CoExchangeFilterFunction() {
override suspend fun filter(request: ClientRequest, next: CoExchangeFunction): ClientResponse {
val customContext = currentCoroutineContext()[CustomCoroutineContext]
println("In client filter, custom context is $customContext")
try {
assert(customContext != null)
}
catch(t: Throwable) {
t.printStackTrace()
throw t
}
return next.exchange(request)
}
}
@RestController
class TestController @Autowired constructor(
private val webClient: WebClient
) {
@GetMapping("/test")
suspend fun test(): ResponseEntity<String>? {
return withContext(CustomCoroutineContext("test")) {
val customContext = currentCoroutineContext()[CustomCoroutineContext]
assert(customContext != null)
println("In controller, custom context is $customContext")
webClient.get()
.uri(URI("https://github.com/spring-projects/spring-framework/issues/26977"))
.retrieve()
.toEntity(String::class.java)
.awaitFirstOrNull()
}
}
}
data class CustomCoroutineContext(val value: String) :
AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<CustomCoroutineContext>
}
Comment From: sdeleuze
I confirm the issue, likely caused by the CoroutineContext
being overridden at CoExchangeFilterFunction
level, but I am not sure yet how we can pass properly the CoroutineContext
until this point.
Maybe if we provide a specialized awaitEntityOrNull<String>()
to replace toEntity(String::class.java).awaitFirstOrNull()
where we could store the CoroutineContext
in the request attribute and set it explicitly in CoExchangeFilterFunction#filter
(and adapt other coroutines extension in a similar way)?
@poutsma Related question: is it possible at DefaultResponseSpec
level to get the ClientRequest
from the ClientResponse
in order to be able to set an attribute? It looks like I can only get the HttpRequest
.
Or can we reuse the CoroutineContext
injected in the Reactor context when awaitFirstOrNull()
is invoked? Not sure.
Comment From: poutsma
Yeah, it looks like HttpRequest
is all that's available. I think that's because ClientResponse
instances are not necessary created by the connector, but can also be built using the builder for use in interceptors.
Comment From: guggens
We experienced the same issue. Our use case is:
1. We receive some custom http headers and store them in the CoroutineContext
using the CoWebFilter
, which works fine.
2. We can properly access them later to write some cache entries.
3. We try to call a downstream system and try to forward the http headers by reading from the CoroutineContext
with a CoExchangeFilterFunction
, but we cannot read out any values set in the CoWebFilter previously.
It seems indeed that the CoExchangeFilterFunction
has a different CoroutineContext
than the code which is calling it, but it should have the same or a child of the context which is created when a rest endpoint is called from outside.
Comment From: dmitrysulman
@sdeleuze
Or can we reuse the
CoroutineContext
injected in the Reactor context whenawaitFirstOrNull()
is invoked?
Unfortunately, we can't use this approach directly because awaitFirstOrNull()
injects only the ReactorContext
element of the CoroutineContext
, not the entire CoroutineContext
itself. See ReactorContextInjector.kt:
internal class ReactorContextInjector : ContextInjector {
/**
* Injects all values from the [ReactorContext] entry of the given coroutine context
* into the downstream [Context] of Reactor's [Publisher] instances of [Mono] or [Flux].
*/
override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
val reactorContext = coroutineContext[ReactorContext]?.context ?: return publisher
return when(publisher) {
is Mono -> publisher.contextWrite(reactorContext)
is Flux -> publisher.contextWrite(reactorContext)
else -> publisher
}
}
}
Theoretically we could provide our own implementation of ContextInjector
that also injects the entire CoroutineContext
. Kotlin uses ServiceLoader
to load all implementation and then folds them, see ReactiveFlow.kt:
// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
private val contextInjectors: Array<ContextInjector> =
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader)
.iterator().asSequence()
.toList().toTypedArray() // R8 opto
internal fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
However this approach is not ideal as it would affect awaitSingleOrNull()
globally. Additionally ContextInjector is annotated with @InternalCoroutinesApi
, meaning it is not intended for external use:
@InternalCoroutinesApi
public interface ContextInjector {
/**
* Injects `ReactorContext` element from the given context into the `SubscriberContext` of the publisher.
* This API used as an indirection layer between `reactive` and `reactor` modules.
*/
public fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
}
Maybe if we provide a specialized
awaitEntityOrNull<String>()
to replacetoEntity(String::class.java).awaitFirstOrNull()
where we could store theCoroutineContext
in the request attribute and set it explicitly inCoExchangeFilterFunction#filter
(and adapt other coroutines extension in a similar way)?
I took this idea and implemented it by passing the entire CoroutineContext
to a specific key within the ReactorContext
element. Then in DefaultWebClient
I extract this key and add it to the ClientRequest
attributes.
Let me know if you have any feedback or suggestions. I'm happy to discuss and refine the approach if needed. I can also adapt other coroutine extensions in a similar way if we decide to proceed with this approach.