I noticed my CPU pegged for over a minute reading a 675K text resource when subscribing to a flux for a response in a test:

        val events = ClientResponse.create(HttpStatus.OK)
            .body(eventsBody)
            .build()
            .bodyToFlow<ServerSentEvent<JsonNode>>()

When I hooked up YourKit all the time was being spent in org.springframework.core.io.buffer.DataBufferUtils.KnuthMorrisPrattMatcher#match:

Screen Shot 2020-10-14 at 3 52 11 pm

After stepping in the debugger I noticed the problem - the default delimiter bytes are for DOS line endings and newlines and because DataBufferUtils.CompositeMatcher doesn't hold onto state indicating if a matcher previously failed to find a match, each time the newline delimiter is seen the entire input has to be scanned again.

The StringDecoder is created here:

"scheduling-1@17393" prio=5 tid=0xcd nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at org.springframework.core.codec.StringDecoder.<init>(StringDecoder.java:79)
      at org.springframework.core.codec.StringDecoder.textPlainOnly(StringDecoder.java:239)
      at org.springframework.core.codec.StringDecoder.textPlainOnly(StringDecoder.java:229)
      at org.springframework.http.codec.ServerSentEventHttpMessageReader.<init>(ServerSentEventHttpMessageReader.java:58)
      at org.springframework.http.codec.support.ClientDefaultCodecsImpl.extendObjectReaders(ClientDefaultCodecsImpl.java:103)
      at org.springframework.http.codec.support.BaseDefaultCodecs.getObjectReaders(BaseDefaultCodecs.java:354)
      at org.springframework.http.codec.support.BaseCodecConfigurer.getReaders(BaseCodecConfigurer.java:100)
      at org.springframework.http.codec.support.DefaultClientCodecConfigurer.getReaders(DefaultClientCodecConfigurer.java:31)
      at org.springframework.web.reactive.function.client.DefaultExchangeStrategiesBuilder$DefaultExchangeStrategies.<init>(DefaultExchangeStrategiesBuilder.java:86)
      at org.springframework.web.reactive.function.client.DefaultExchangeStrategiesBuilder.build(DefaultExchangeStrategiesBuilder.java:71)
      at org.springframework.web.reactive.function.client.DefaultExchangeStrategiesBuilder.<clinit>(DefaultExchangeStrategiesBuilder.java:42)
      at org.springframework.web.reactive.function.client.ExchangeStrategies.withDefaults(ExchangeStrategies.java:67)
      at org.springframework.web.reactive.function.client.DefaultWebClientBuilder.initExchangeStrategies(DefaultWebClientBuilder.java:287)
      at org.springframework.web.reactive.function.client.DefaultWebClientBuilder.build(DefaultWebClientBuilder.java:260)
      at org.springframework.web.reactive.function.client.WebClient.create(WebClient.java:150)

I've worked around for now by running the file through unix2dos.

Comment From: DanielThomas

I didn't expect this to be such a contributor to production code (I thought this was as expensive as it is because of a large, non-streaming input), but looks like it's just as bad there too. All of these hot spots are delimiter matching:

Screen Shot 2020-10-15 at 4 03 07 pm

This is limited to 8 concurrent event streams. Was a little tricky to work out how to workaround - if I add a fork of ServerSentEventHttpMessageReader as a custom codec the configurer adds it to typed codecs so it appears in the wrong place in the chain, so instead I went with:

       val strategies = ExchangeStrategies.builder().codecs { configurer ->
            configurer.readers
                .forEach { reader ->
                    val replacedReader = if (reader is ServerSentEventHttpMessageReader) {
                        val jsonDecoder = Jackson2JsonDecoder()
                        NewlineOnlyServerSentEventHttpMessageReader(jsonDecoder)
                    } else reader
                    configurer.customCodecs().register(replacedReader)
                }
            configurer.registerDefaults(false)
        }.build()
        val exchangeFunctions = ExchangeFunctions.create(ReactorClientHttpConnector(), strategies)
        return WebClient.builder()
            .exchangeFunction(exchangeFunctions)
            .baseUrl("...")
            .build()

Comment From: rstoyanchev

Thanks for reporting your findings. I've scheduled it tentatively to explore some improvements. It is a little surprising that it takes a 1 minute for 675K and that's with just 8 concurrent streams? Any chance of providing a reproducer?

Comment From: rstoyanchev

Or some sample (representative) input, perhaps broken down into chunks, would be quite helpful.

Comment From: DanielThomas

Happy to help - https://github.com/DanielThomas/spring-framework-issue-25915

20:50:49.926 [main] INFO  Issue25915 - Reading unix SSE
20:51:09.555 [main] INFO  Issue25915 - Unix line ending duration 19627ms
20:51:09.556 [main] INFO  Issue25915 - Reading DOS SSE
20:51:09.603 [main] INFO  Issue25915 - Dos line ending duration 47ms

I must have had a stream with more line endings for the first example, but this is still super slow.

Comment From: rstoyanchev

Super, thanks!

Comment From: rstoyanchev

This should is fixed now. I've confirmed with the sample as well. Thanks again for the helpful report!

Comment From: DanielThomas

Awesome, appreciate it. Elegant fix too!