Canceling a Subscription
(in reactive transactions) leads to a state where transaction cleanup happens asynchronously and detached from completion signals.
Consider the following code:
TransactionalOperator transactionalOperator = …;
DatabaseClient databaseClient = …;
JdbcTemplate jdbcTemplate = …;
Flux<Integer> integerFlux = transactionalOperator.execute(status -> {
return databaseClient
.execute("INSERT INTO legoset (idl) VALUES(42055)").fetch().rowsUpdated();
});
Mono<Integer> next = integerFlux.next();
next.as(StepVerifier::create).expectNext(1).verifyComplete();
assertThat(jdbcTemplate.queryForMap("SELECT id, name, manual FROM legoset")).hasEntrySatisfying("id",42055);
Initially, the table is empty and both, DatabaseClient
and JdbcTemplate
are configured to point to the same database.
The assertion with queryForMap
typically fails with EmptyResultDataAccessException
. This is, because calling integerFlux.next()
cancels the upstream subscription while emitting completion as soon as an element was emitted.
TransactionalOperator
and its backing ReactiveTransactionManager
implementations issue a commit to clean up the transaction that happens asynchronously, without an ability to await commit completion.
Not sure whether we can fix the problem at all or whether we can mitigate it. One approach could be with TransactionalOperator.transactional(Mono)
to cancel the innermost Publisher
and hand out a Mono
. This change would return a properly bounded Mono
and cancellation would happen on the innermost Publisher
and preventing cancellation of the Publisher
that is returned from TransactionalOperator
.
This ticket is an opportunity to discuss that effect and its potential impact on cancellation of Publishers
which are enhanced for transactions.
/cc @smaldini @simonbasle
Comment From: michael-simons
Thanks Mark for raising this ticket.
We have the exact same problem in SDN-RX: https://github.com/neo4j/sdn-rx/blob/19a0bd407da0aa632be921d1cb529532c9649ae0/spring-data-neo4j-rx/src/test/java/org/neo4j/springframework/data/integration/reactive/ReactiveRepositoryIT.java#L408-L434
We solved this by wrapping the implicit transactions on the repositories in another transactional operator (https://github.com/neo4j/sdn-rx/commit/44f593780ed1f745aac1c829a8c0343d0dde75c5) but apparently, this fix works just by coincidence (changing the timing of things).
The implementation of our reactive transaction manager (see here https://github.com/neo4j/sdn-rx/blob/master/spring-data-neo4j-rx/src/main/java/org/neo4j/springframework/data/core/transaction/ReactiveNeo4jTransactionManager.java#L185-L200) uses the new Neo4j driver. The driver provides publishers for both commit and rollback, which eventually issue asynchronous calls to the database (for reference https://github.com/neo4j/neo4j-java-driver/blob/2.0/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java#L67-L91 and https://github.com/neo4j/neo4j-java-driver/blob/2.0/driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java#L220-L228).
We observed this on the wire as well:
Our step verifiers finished, our separate verification of the data much like Marks call with the JDBC template started, began a new transaction and depending on the load of the machine, the acknowledgement of the previous transaction was received on the client.