Right now RSocketRequester.Builder uses the connect method as a terminal operation which builds a Mono<RSocketRequester as of now this method wraps doConnect with extra Mono.defer which seems to be a legacy solution to defer of some heavy internals allocation.
As of now, this method seems to be redundant and somewhat stops to use one of the key features of vanilla RSocket like reconnect which allows the usage of the same Mono<RSocket> instance in order to access the same cached connection.
Unfortunately, because of the Mono.defer(() -> doCancel() the underlying mono is not directly propagated to the user, hence all subsequent subscriptions create new connections instead of the usage of the cached (assumed the reconnect feature is enabled)
Expected
rsocketRequesterBuilder
.rsocketConnector(rsocketConnector ->
rsocketConnector
.lease(() -> Leases.create().receiver(leaseReceiver))
.reconnect(
Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5))
)
)
.connectWebSocket(URI.create(adjustmentProperties.getBaseUrl()));
to behave approximately identical to
RSocketConnector
.create()
.lease(() -> Leases.create().receiver(leaseReceiver))
.dataMimeType(dataMimeType.toString())
.metadataMimeType(metadataMimeType.toString())
.reconnect(
Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5))
)
.connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket")))
.map(rsocket -> RSocketRequester.wrap(
rsocket,
dataMimeType,
metadataMimeType,
rSocketStrategies
));
Actual
connect method wraps
RSocketConnector
.create()
.lease(() -> Leases.create().receiver(leaseReceiver))
.dataMimeType(dataMimeType.toString())
.metadataMimeType(metadataMimeType.toString())
.reconnect(
Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5))
)
.connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket")))
into
Mono.defer(() -> RSocketConnector
.create()
.lease(() -> Leases.create().receiver(leaseReceiver))
.dataMimeType(dataMimeType.toString())
.metadataMimeType(metadataMimeType.toString())
.reconnect(
Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5))
)
.connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket"))))
so reconnect does not work as expected