The attached integration test leads to the following problem when run against the attached Demo App.

java.lang.UnsupportedOperationException: null
    at java.base/java.nio.ByteBuffer.array(ByteBuffer.java:1041) ~[na:na]
    at com.example.demo.DemoApplication.lambda$handle$0(DemoApplication.java:36) ~[classes/:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.3.1.RELEASE.jar:
...

The Demo App is a very basic echo server written in Spring Webflux. Every binary WebSocket message sent to it will be echoed. The interesting part looks like this:

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(
                session.receive()
                        .map(WebSocketMessage::getPayload)
                        .map(dataBuffer -> DataBufferUtils.retain(dataBuffer).asByteBuffer().array())
                        .map(bytes -> {
                            byte[] newArray = new byte[bytes.length + 1];
                            newArray[0] = 1;
                            System.arraycopy(bytes, 0, newArray, 1, bytes.length);
                            return newArray;
                        })
                        .map(bytes -> session.binaryMessage(dataBufferFactory -> dataBufferFactory.wrap(bytes)))
                        .doOnError(throwable -> LOGGER.error("WebSocket error", throwable))
        );
    }

The integration test connects to the server and tries to send a message like this:

    @Test
    void sendATextMessage() {
        client.execute(
                URI.create("ws://localhost:8080/demo"),
                session -> session.send(Mono.just(session.binaryMessage(dataBufferFactory -> dataBufferFactory.wrap(new byte[]{1, 2, 3}))))
                        .thenMany(session.receive().doOnNext(webSocketMessage -> {
                            LOGGER.info("Received: " + webSocketMessage.getPayload().asByteBuffer().array());
                        }))
                        .then()
        ).block(Duration.ofSeconds(10L));
    }

When I change the program just slightly to support text messages instead of binary messages, it runs perfectly.

Please take a look! I am sure you guys are much more deeply involved in Java NIO than I am.

Tested platforms:

  • OpenJDK 11.0.4 and Oracle JDK 1.8.0_212.
  • spring-boot-starter-webflux:2.2.2.RELEASE

demo.zip

Comment From: DrStefanFriedrich

I wrote a client in Dart which leads exactly to the same result. So I assume the problem is on the server side. Maybe a problem in one of the implementations of interface DataBuffer in Spring or in Reactor, or in Netty?

Comment From: rstoyanchev

Thanks for getting in touch, but it feels like this is a question that would be better suited to Stack Overflow. As mentioned in the guidelines for contributing, we prefer to use the issue tracker only for bugs and enhancements. Feel free to update this issue with a link to the re-posted question (so that other people can find it) or add some more details if you feel this is a genuine bug.

Comment From: rstoyanchev

As an additional comment, please do a little more investigation before assuming where the problem is. The Javadoc of ByteBuffer#array says:

     * <p> Invoke the {@link #hasArray hasArray} method before invoking this
     * method in order to ensure that this buffer has an accessible backing
     * array.  </p>

Here you have a direct memory buffer which cannot give you an array. You can only copy to an array. See for example the source for ByteArrayDecoder.

In addition, you should not retain the DataBuffer if copying its contents immediately. Retaining and never releasing creates a leak on Netty where direct memory is used with pooled buffers. Please review the docs for more background on data buffers.