Affects: 2.3.1.RELEASE


I found that spring webflux doesn't allow gc to collect a garbage. I simplified my code. There is simple pipeline:

@Configuration
class RouterConfig(
    val operationRepository: OperationRepository // r2dbc repository fun findOperationsRangeForCarrier(ids: Collection<Long>,start: LocalDateTime,end: LocalDateTime): Flux<TPPOperation>
) {
    val csvMapper = CsvMapper().apply { findAndRegisterModules() }
    val writer = csvMapper.writer(csvMapper.schemaFor(TPPOperation::class.java).withoutHeader())

    val carrierIds = listOf<Long>(1)
    val start = LocalDateTime.of(2021, 3, 10, 0, 0, 0)
    val end = LocalDateTime.of(2021, 4, 10, 0, 0, 0)

    @Bean
    fun routing(): RouterFunction<ServerResponse> = router {
        accept(MediaType.ALL).nest {
            GET("/test") {
                operationRepository.findOperationsRangeForCarrier(carrierIds, start, end) // returns about 800_000 items
                    .map { writer.writeValueAsBytes(it) }
                    .reduce(ByteArrayOutputStream()) { output, el ->
                        output.write(el)
                        output
                    }
                    .map { output -> output.toByteArray() } // 229Mb
                    .flatMap {
                        ServerResponse.ok()
                            .headers { httpHeaders ->
                                httpHeaders.contentType = MediaType("application", "force-download")
                                httpHeaders.set(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=test.csv")
                            }
                            .bodyValue(ByteArrayResource(it))
                    }
            }
        }
    }
}

The result file is downloaded successfully and has a size about 229Mb. After several sequential requests I found that the memory wasn't released and in result I had OOM.

Heap dump showed that the result byte arrays are live objects and GC can't collect them because they have GC root WindowsSelectorImpl.

Interesting that if I modify my code:

@Bean
fun routing(): RouterFunction<ServerResponse> = router {
    accept(MediaType.ALL).nest {
        GET("/test") {
            operationRepository.findOperationsRangeForCarrier(carrierIds, start, end) // returns about 800_000 items
                .map { writer.writeValueAsBytes(it) }
                .reduce(ByteArrayOutputStream()) { output, el ->
                    output.write(el)
                    output
                }
                .map { output -> output.toByteArray() } // 229Mb
                .subscribe()
            ServerResponse.ok().bodyValue("OK")
        }
        }
    }
}

I see that file is creating. but after process of creating is finished, heap dump doesn't contain resulted bytearray. I conclude that the main reason for the leak is spring webflux and not project reactor. Maybe something must be configured differently

Comment From: typik89

I found one more interesting effect.

If I add to the second part of code .doOnNext{ println("report size: ${it.size}") } then I have the same issue with memory leak:

GET("/test") {
                operationRepository.findOperationsRangeForCarrier(carrierIds, start, end) // returns about 800_000 items
                    .map { writer.writeValueAsBytes(it) }
                    .reduce(ByteArrayOutputStream()) { output, el ->
                        output.write(el)
                        output
                    }
                    .map { output -> output.toByteArray() }
                    .doOnNext { println("report size ${it.size}") }// 229Mb
                    .subscribe()
                ServerResponse.ok().bodyValue("OK")
            }

Comment From: rstoyanchev

It's not immediately obvious how WindowsSelectorImpl relates to WebFlux or how this becomes a leak. The logic accumulates data in a byte[] and nothing in this chain would hold on to it. When you return a ByteArrayResource, it would be written via ResourceEncoder and turned into DataBuffers with DataBufferUtils#read but again it's not clear why this would result in a leak.

What is the underlying HTTP server for this? Any chance of a repro project?

Comment From: typik89

I created a simple project to reproduce memory leak: https://github.com/typik89/webfluxMemoryRetroProject.

To run a database locally before starting the application, use https://github.com/typik89/webfluxMemoryRetroProject/blob/main/src/main/resources/docker-compose.yml. The local database will be created with a table containing 1 million rows.

After starting the application use requests HTTP GET http://localhost:8080/test to reproduce. As a result of requests, It should be downloaded files with a size of about 70MB. If you make a heap dump, you will see that all byte arrays are live objects in Heap and they will acummulate in heap while OOM happens.

Comment From: rstoyanchev

Thanks for the sample. Docker compose doesn't start, complains about the image. For the sake of simplicity, can the issue be reproduced without a data layer. A repository that simply returns fake data maybe?

Comment From: typik89

Sorry, I changed to 'image: postgres:13-alpine'. I tried to make a fake repository:

@Service
class FakeRepository{
    fun findAll(): Flux<CsvRow> =
        Flux.generate<Long,Long>(
            { 0L },
            { state,sink ->
                sink.next(state)
                state + 1
            }
        )
            .take(1000_000)
            .map { CsvRow(it,"c1$it","c2$it","c3$it") }
}

And I don't see memory leaking in this case

Comment From: sdeleuze

@rstoyanchev I migrate the sample to Java in order to allow you to validate it is not related to our Kotlin integration.

Comment From: typik89

I've tried with Java and seen the same

Comment From: typik89

It seems that it's not a webflux issue. I created a test to reproduce behavior when objects created during the running of a pipeline of creating byteArray aggregate are held in the heap memory for a long time after finishing the pipeline: https://github.com/typik89/webfluxMemoryRetroProject/blob/main/src/test/kotlin/ru/typik/reactor/MemoryLeakTest.kt When I run the test with -xmx1000MB, I see 6-7 log messages about ByteArray being created and after that, I see OutOfMemoryError and test fails. When I run the test with -xmx2000MB, the test works fine in an infinite loop. I create a heap dump and I see 9 ByteArrays with a size of about 130MB. It seems that Reactor holds 9 results of pipeline in the heap and other results are released successfully. I don't understand why it happens and what is this magical number 9 and how I can configure it.

Comment From: rstoyanchev

It might make more sense for this issue to be in https://github.com/reactor/reactor-core/issues, but it cannot be transferred across organizations. I'm going to close this as it does not seem like anything we can fix in the Spring Framework, but more comments welcome, once you find out more about the root cause.

/cc @simonbasle