Spring Boot 2.3.0.M4 Kotlin 1.3.72 postgres 9.5.21 / jdbc driver 42.2.9 / r2dbc 0.8.2
When I use several transactions sequently, I ran into visibility problems. Data inserted in previous transactions may be absent in the table.
@Test
fun invisibleData() = runBlocking {
databaseClient.execute("""create table test (id int);""".trimIndent()).await()
val id = transactionalOperator.executeAndAwait {
databaseClient.insert()
.into("test")
.value("id", 1)
.map { row -> row["id"]}
.awaitOne() as Int
}
logger.info("Inserted! executeAndAwait $id")
databaseClient.insert()
.into("test")
.value("id", 2)
.then()
.`as`(transactionalOperator::transactional)
.awaitFirstOrNull()
logger.info("Inserted! transactional")
transactionalOperator.execute {
databaseClient.insert()
.into("test")
.value("id", 3)
.then()
}.awaitFirstOrNull()
logger.info("Inserted! execute")
}
2020-05-03 08:57:45.078 DEBUG 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient : Response: CommandComplete{command=INSERT, rowId=0, rows=1}
2020-05-03 08:57:45.078 DEBUG 12802 --- [tor-tcp-epoll-1] i.r.postgresql.util.FluxDiscardOnCancel : received cancel signal
2020-05-03 08:57:45.078 TRACE 12802 --- [tor-tcp-epoll-1] .s.t.r.TransactionSynchronizationManager : Retrieved value [org.springframework.data.r2dbc.connectionfactory.ConnectionHolder@3bae4c6d] for key [ConnectionPool[PostgreSQL]] bound to context [ad41659f-9635-4b70-8000-7b8658683f33: generic-transaction]
2020-05-03 08:57:45.080 TRACE 12802 --- [-1 @coroutine#2] o.s.d.r.c.R2dbcTransactionManager : Triggering beforeCommit synchronization
2020-05-03 08:57:45.080 TRACE 12802 --- [-1 @coroutine#2] o.s.d.r.c.R2dbcTransactionManager : Triggering beforeCompletion synchronization
2020-05-03 08:57:45.082 DEBUG 12802 --- [-1 @coroutine#2] o.s.d.r.c.R2dbcTransactionManager : Initiating transaction commit
2020-05-03 08:57:45.082 DEBUG 12802 --- [-1 @coroutine#2] o.s.d.r.c.R2dbcTransactionManager : Committing R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@2e3b4394, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@70ccb3bf}]]
2020-05-03 08:57:45.083 DEBUG 12802 --- [-1 @coroutine#2] io.r2dbc.postgresql.QUERY : Executing query: COMMIT
2020-05-03 08:57:45.083 DEBUG 12802 --- [-1 @coroutine#2] i.r.p.client.ReactorNettyClient : Request: Query{query='COMMIT'}
2020-05-03 08:57:45.083 TRACE 12802 --- [-1 @coroutine#2] i.r.p.client.ReactorNettyClient : [id: 0x36ce44a1, L:/127.0.0.1:38216 - R:localhost/127.0.0.1:32258] WRITE: 12B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 51 00 00 00 0b 43 4f 4d 4d 49 54 00 |Q....COMMIT. |
+--------+-------------------------------------------------+----------------+
2020-05-03 08:57:45.083 TRACE 12802 --- [-1 @coroutine#2] i.r.p.client.ReactorNettyClient : [id: 0x36ce44a1, L:/127.0.0.1:38216 - R:localhost/127.0.0.1:32258] FLUSH
!!! 2020-05-03 08:57:45.084 INFO 12802 --- [in @coroutine#1] r.i.d.j.JvmBackendApplicationTests : Inserted! executeAndAwait 1
2020-05-03 08:57:45.085 DEBUG 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient : Response: CloseComplete{}
2020-05-03 08:57:45.086 DEBUG 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient : Response: ReadyForQuery{transactionStatus=TRANSACTION}
2020-05-03 08:57:45.086 TRACE 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient : [id: 0x36ce44a1, L:/127.0.0.1:38216 - R:localhost/127.0.0.1:32258] READ COMPLETE
2020-05-03 08:57:45.086 TRACE 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient : [id: 0x36ce44a1, L:/127.0.0.1:38216 - R:localhost/127.0.0.1:32258] READ: 18B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 43 00 00 00 0b 43 4f 4d 4d 49 54 00 5a 00 00 00 |C....COMMIT.Z...|
|00000010| 05 49 |.I |
+--------+-------------------------------------------------+----------------+
2020-05-03 08:57:45.086 DEBUG 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient : Response: CommandComplete{command=COMMIT, rowId=null, rows=null}
You can see the logger message before commit actually happens, right after Netty FLUSH.
In cases with transactional
and execute
it works fine and awaits for database answer.
Is it intentional behaviour, or bug?
Comment From: jhoeller
@mp911de Could you please clarify the intended behavior here?
Comment From: mp911de
The log statement Inserted! executeAndAwait $id
should be printed after the CommandComplete{command=COMMIT …}
line as executeAndAwait
should await completion of the commit.
There's a bit more to this arrangement. awaitOne(…)
is method that awaits either completion of the subscription or it awaits the first element and then cancels the subscription. In this case, the upstream subscription was cancelled (see FluxDiscardOnCancel : received cancel signal
). I'm not fully sure about Coroutines semantics and the flow synchronization.
The R2DBC Postgres driver behaves correctly in the sense that the commit command properly awaits a response from the database. Maybe @sdeleuze can chime in.
Comment From: istarion
I found that awaitFirstOrNull()
call in executeAndAwait
function resumes continuation at onNext()
stage, when we need to await for onComplete()
. awaitSingle()
and awaitLast()
work fine, but they will throw NoSuchElementException
instead of nullable result. (It can break existing code)
Maybe we need to implement await()
extension function for Mono
, which should wait for completion with nullable result? Publisher extension functions are really confusing in this context.
Comment From: sdeleuze
Indeed @istarion, the source code of Reactive to Coroutines support confirms the behavior you described.
One possible solution could indeed to add a awaitSingleOrNull()
variant to kotlinx.coroutines
that would have the behavior we need. We could then use it in TransactionalOperator.executeAndAwait
. This need to be discussed with @elizarov and @qwwdfsad. The 2 questions I have in mind to help deciding if it is worth or not are:
- Would awaitSingleOrNull()
versus awaitFirstOrNull()
be confusing for users?
- Are there other use cases than Spring Reactive transaction support where this would be useful.
The alternative would be to leverage awaitSingle()
with a try
/catch
block that would turn NoSuchElementException
to null
return value.
Worth to fix in 5.2.7 in any case.
Comment From: elizarov
👍 for awaitSingleOrNull
. It is totally consistent with Flow.singleOrNull
operator and with Kotlin standard library. In general, we have there:
firstXxx
family of operators that look at the first element and ignore the rest of them.singleXxx
family of operators that expect the underlying collection/stream/flow to have just the single element and consider it an error if there are more.
I've filed: https://github.com/Kotlin/kotlinx.coroutines/issues/1993
Comment From: sdeleuze
Awesome thanks @elizarov, I will send you a PR.
Comment From: sdeleuze
Side note: we may want to replace awaitFirstOrNull()
by awaitSingleOrNull()
in other Spring Kotlin extensions as well.
Comment From: sdeleuze
Pull request submitted on Coroutines side.
Comment From: sdeleuze
Waiting kotlinx.coroutines 1.4.0-RC to be able to fix that one. It should be released next week but not sure when, I pushed for the release to happen in time for our RC2 so let's see if that make it or not.