Affects: 5.2.4 (currently what we use); 5.3.0-SNAPSHOT
https://github.com/spring-projects/spring-framework/blob/c972d861edee89c0ca97ddfe455d803f3d046ab9/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java#L500
When testing code that is using explicit retain/release it is useful to have unit tests that exercise your code and check that the buffers are correctly released.
With the check on isAllocated
before releasing, your code can release as many times as it wants or forget to retain, and the tests still pass; but when actually running longer tests or in production, the error is delayed until some time in the future, for example, when data is read.
Flux<DataBuffer> transform(Flux<DataBuffer> inputs) { // do some work that joins or filters the input.
return inputs.filter(somePredicate)
.map(DataBufferUtils::retain) // retain just the bits that we need downstream
.doOnComplete(inputs.subscribe(DataBufferUtils::release) // causes every input to be released; the buffers that are not used should be deallocated.
}
void testReferenceCounts() {
Flux<DataBuffer> inputs = createSomeSuitableData(); // every DataBuffer has a reference count of 1 from it's initial creation.
result = transform(inputs);
result.forEach(DataBufferUtils::release) // release the output. This should release & deallocate every input buffer that was not filtered
assertThat(input, everyItem(isUnallocated());
assertThat(result, everyItem(isUnallocated());
}
If you forget the retain
in transform
then you should have a fail-fast(ish) exception in the test when trying to release the result
. With the check on isAllocated
in DataBufferUtils
this test would pass, and the error surfaces itself in a bit of code that is unrelated.
If you forget the doOnComplete
then you have a memory leak, and the test fails, correctly when attempting to assert that every item is unallocated.
If you have too many release
calls or release the wrong buffers, then the test still passes and the error surfaces somewhere else.
I currently think that the check on isAllocated
is overly defensive and makes errors much harder to diagnose.
Comment From: rstoyanchev
transform
returns Flux<DataBuffer>
but Flux
doesn't have forEach
so how can this be?
result = transform(inputs);
result.forEach(DataBufferUtils::release)
Perhaps that is meant to show consuming the Flux again? So this seems to consume the buffers once, retaining some of the buffers and filtering others, then consumes it again via doOnComplete
at the end releasing every buffer, and then consumes it yet again via result.forEach
?
I don't think code managing buffers should be written like this with multiple passes over the Flux
which also assumes the Flux
produces the same buffers every time which is not typically the case. Buffers should be released immediately at the point of being consumed or dropped (e.g. filtered). Allowing them to linger leaves you exposed to cancellation and error signals and invites more issues.
Another point to make is that allowing the release to fail easily leads to more leaks. As the exception is allowed to propagate code in higher level parts of the stack will likely fail to release other buffers or clear caches unless it is written carefully to take into account unexpected exceptions. This is why for 5.3 and 5.2.8 we have taken further steps to prevent IllegalReferenceCountException
(which can still occur) from bubbling up, see #22594.
Comment From: CandleCandle
I was trying to keep the example relatively short and simple, and probably added confusion in there. Fixing the obvious flaws in the previous example gives us:
Flux<DataBuffer> transform(Flux<DataBuffer> inputData) { // do some work that joins or filters the input.
Flux<DataBuffer> inputs = inputData.cache(); // cache so that the doFinally has the same sequence of DataBuffers
return inputs.filter(somePredicate)
.map(DataBufferUtils::retain) // retain just the bits that we need downstream
.doFinally(s -> inputs.subscribe(DataBufferUtils::release)) // causes every input to be released; the buffers that are not used should be deallocated.
}
void testReferenceCounts() {
Flux<DataBuffer> inputs = createSomeSuitableData(); // every DataBuffer has a reference count of 1 from it's initial creation.
transformed = transform(inputs);
List<DataBuffer> result = transformed.collectList().block();
result.forEach(DataBufferUtils::release) // release the output. This should release & deallocate every input buffer that was not filtered
assertThat(input, everyItem(isUnallocated());
assertThat(result, everyItem(isUnallocated());
}
Changes to the example:
I don't think code managing buffers should be written like this with multiple passes over the Flux which also assumes the Flux produces the same buffers every time which is not typically the case
Added a .cache()
to mean that the doOnComplete
has references to the same buffers. Our code does need to do multiple passes across the same data.
but Flux doesn't have forEach
Made the test use
collectList().block()
to convert the output to a List for the assertions.
With a fair bit more experimentation and thinking, I think that the current defensive implementation is correct for production code. For testing and ensuring that your code is correct before production it is not the correct decision. Given release
and retain
are static method it is therefore much harder to replace.