Spring Boot + Spring Webflux 2.7.4 Spring r2dbc 5.3.23 r2dbc-pool 0.9.2.RELEASE JDK 17
r2dbc pool settings:
spring.r2dbc.pool.enabled=true
spring.r2dbc.pool.initial-size=1
spring.r2dbc.pool.max-size=10
spring.r2dbc.pool.maxAcquireTime=PT3S
I have a simple controller invoking bean method annotated with @Transactional
.
Scenario: transaction managed part performs a long running logic (i.e. external HTTP call).
Within a stress test I found some issue with obtaining a connection from the pool. That happens always for @Transactional
annotated methods, even they do not do any changes / queries on the database. The only one solution for now is to get rid of maxAcquiryTime
config and let threads to wait to acquire the connection and/or increase connection pool size.
I know that limitation exists with JDBC and Hikari (and similar scenario would end up with Connection is not available
), but was expecting it's a little bit smarter with r2dbc. So I assume one transaction occupy connection as long as transaction is active and it can't be shared.
Created sample project to recreate an issue: https://github.com/pmaslany/spring-r2dbc
org.springframework.transaction.CannotCreateTransactionException: Could not open R2DBC Connection for transaction; nested exception is io.r2dbc.spi.R2dbcTimeoutException: Connection acquisition timed out after 3000ms
at org.springframework.r2dbc.connection.R2dbcTransactionManager.lambda$null$5(R2dbcTransactionManager.java:227) ~[spring-r2dbc-5.3.23.jar:5.3.23]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Handler acquiry.AcquiryApplication$SomeController#transactional() [DispatcherHandler]
*__checkpoint ⇢ HTTP GET "/transactional" [ExceptionHandlingWebHandler]
Original Stack Trace:
at org.springframework.r2dbc.connection.R2dbcTransactionManager.lambda$null$5(R2dbcTransactionManager.java:227) ~[spring-r2dbc-5.3.23.jar:5.3.23]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxRetry$RetrySubscriber.onError(FluxRetry.java:95) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.Mono.subscribe(Mono.java:4455) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.23.jar:3.4.23]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: io.r2dbc.spi.R2dbcTimeoutException: Connection acquisition timed out after 3000ms
at io.r2dbc.pool.ConnectionPool.lambda$null$11(ConnectionPool.java:168) ~[r2dbc-pool-0.9.2.RELEASE.jar:0.9.2.RELEASE]
at reactor.core.publisher.Mono.lambda$onErrorMap$30(Mono.java:3762) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.Mono.lambda$onErrorResume$32(Mono.java:3852) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.23.jar:3.4.23]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.23.jar:3.4.23]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'Connection acquisition from [org.springframework.boot.r2dbc.OptionsCapableConnectionFactory@6ffa56fa]' (and no fallback has been configured)
... 13 common frames omitted
Part of the code (linked above):
@Service
public static class SomeService {
@Autowired
private SlowServiceClient slowServiceClient;
@Autowired
private SomeRepository repository;
@Transactional
public Mono<Long> fetchAndStoreTransactional() {
return process();
}
public Mono<Long> fetchAndStoreNonTransactional() {
return process();
}
private Mono<Long> process() {
return slowServiceClient.fetchData()
.flatMap(data -> repository.save(new TestEntity(data)))
.flatMap(entity -> repository.count());
}
}
@Component
public static class SlowServiceClient {
public Mono<String> fetchData() {
return Mono.delay(Duration.ofSeconds(3)).then(Mono.just("res"));
}
}
Comment From: mp911de
I'm not quite sure what you're asking for with
expecting it's a little bit smarter with r2dbc. So I assume one transaction occupy connection as long as transaction is active and it can't be shared.
Connections are acquired upon transaction start and released once the transaction finishes. If you the acquisition backlog exceeds a period where you cannot obtain a connection for 3 seconds, then you start seeing exceptions to terminate the request timely.
Removing maxAcquireTime
lets the application wait indefinitely and I'm not sure that is what you want in production.
Comment From: snicoll
I am with Mark here and I don't see how this issue is actionable. Closing as working as designed.