Canceling a Subscription (in reactive transactions) leads to a state where transaction cleanup happens asynchronously and detached from completion signals. Consider the following code:

TransactionalOperator transactionalOperator = …;
DatabaseClient databaseClient = …;
JdbcTemplate jdbcTemplate = …;

Flux<Integer> integerFlux = transactionalOperator.execute(status -> {
    return databaseClient
        .execute("INSERT INTO legoset (idl) VALUES(42055)").fetch().rowsUpdated();
});

Mono<Integer> next = integerFlux.next();

next.as(StepVerifier::create).expectNext(1).verifyComplete();

assertThat(jdbcTemplate.queryForMap("SELECT id, name, manual FROM legoset")).hasEntrySatisfying("id",42055);

Initially, the table is empty and both, DatabaseClient and JdbcTemplate are configured to point to the same database.

The assertion with queryForMap typically fails with EmptyResultDataAccessException. This is, because calling integerFlux.next() cancels the upstream subscription while emitting completion as soon as an element was emitted.

TransactionalOperator and its backing ReactiveTransactionManager implementations issue a commit to clean up the transaction that happens asynchronously, without an ability to await commit completion.

Not sure whether we can fix the problem at all or whether we can mitigate it. One approach could be with TransactionalOperator.transactional(Mono) to cancel the innermost Publisher and hand out a Mono. This change would return a properly bounded Mono and cancellation would happen on the innermost Publisher and preventing cancellation of the Publisher that is returned from TransactionalOperator.

This ticket is an opportunity to discuss that effect and its potential impact on cancellation of Publishers which are enhanced for transactions.

/cc @smaldini @simonbasle

Comment From: michael-simons

Thanks Mark for raising this ticket.

We have the exact same problem in SDN-RX: https://github.com/neo4j/sdn-rx/blob/19a0bd407da0aa632be921d1cb529532c9649ae0/spring-data-neo4j-rx/src/test/java/org/neo4j/springframework/data/integration/reactive/ReactiveRepositoryIT.java#L408-L434

We solved this by wrapping the implicit transactions on the repositories in another transactional operator (https://github.com/neo4j/sdn-rx/commit/44f593780ed1f745aac1c829a8c0343d0dde75c5) but apparently, this fix works just by coincidence (changing the timing of things).

The implementation of our reactive transaction manager (see here https://github.com/neo4j/sdn-rx/blob/master/spring-data-neo4j-rx/src/main/java/org/neo4j/springframework/data/core/transaction/ReactiveNeo4jTransactionManager.java#L185-L200) uses the new Neo4j driver. The driver provides publishers for both commit and rollback, which eventually issue asynchronous calls to the database (for reference https://github.com/neo4j/neo4j-java-driver/blob/2.0/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java#L67-L91 and https://github.com/neo4j/neo4j-java-driver/blob/2.0/driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java#L220-L228).

We observed this on the wire as well:

Our step verifiers finished, our separate verification of the data much like Marks call with the JDBC template started, began a new transaction and depending on the load of the machine, the acknowledgement of the previous transaction was received on the client.