Spring Boot 2.3.0.M4 Kotlin 1.3.72 postgres 9.5.21 / jdbc driver 42.2.9 / r2dbc 0.8.2

When I use several transactions sequently, I ran into visibility problems. Data inserted in previous transactions may be absent in the table.

    @Test
    fun invisibleData() = runBlocking {
        databaseClient.execute("""create table test (id int);""".trimIndent()).await()
        val id = transactionalOperator.executeAndAwait {
            databaseClient.insert()
                .into("test")
                .value("id", 1)
                .map { row -> row["id"]}
                .awaitOne() as Int
        }
        logger.info("Inserted! executeAndAwait $id")

        databaseClient.insert()
            .into("test")
            .value("id", 2)
            .then()
            .`as`(transactionalOperator::transactional)
            .awaitFirstOrNull()
        logger.info("Inserted! transactional")

        transactionalOperator.execute {
            databaseClient.insert()
                .into("test")
                .value("id", 3)
                .then()
        }.awaitFirstOrNull()
        logger.info("Inserted! execute")
    }
2020-05-03 08:57:45.078 DEBUG 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient          : Response: CommandComplete{command=INSERT, rowId=0, rows=1}
2020-05-03 08:57:45.078 DEBUG 12802 --- [tor-tcp-epoll-1] i.r.postgresql.util.FluxDiscardOnCancel  : received cancel signal
2020-05-03 08:57:45.078 TRACE 12802 --- [tor-tcp-epoll-1] .s.t.r.TransactionSynchronizationManager : Retrieved value [org.springframework.data.r2dbc.connectionfactory.ConnectionHolder@3bae4c6d] for key [ConnectionPool[PostgreSQL]] bound to context [ad41659f-9635-4b70-8000-7b8658683f33: generic-transaction]
2020-05-03 08:57:45.080 TRACE 12802 --- [-1 @coroutine#2] o.s.d.r.c.R2dbcTransactionManager        : Triggering beforeCommit synchronization
2020-05-03 08:57:45.080 TRACE 12802 --- [-1 @coroutine#2] o.s.d.r.c.R2dbcTransactionManager        : Triggering beforeCompletion synchronization
2020-05-03 08:57:45.082 DEBUG 12802 --- [-1 @coroutine#2] o.s.d.r.c.R2dbcTransactionManager        : Initiating transaction commit
2020-05-03 08:57:45.082 DEBUG 12802 --- [-1 @coroutine#2] o.s.d.r.c.R2dbcTransactionManager        : Committing R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@2e3b4394, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@70ccb3bf}]]
2020-05-03 08:57:45.083 DEBUG 12802 --- [-1 @coroutine#2] io.r2dbc.postgresql.QUERY                : Executing query: COMMIT
2020-05-03 08:57:45.083 DEBUG 12802 --- [-1 @coroutine#2] i.r.p.client.ReactorNettyClient          : Request:  Query{query='COMMIT'}
2020-05-03 08:57:45.083 TRACE 12802 --- [-1 @coroutine#2] i.r.p.client.ReactorNettyClient          : [id: 0x36ce44a1, L:/127.0.0.1:38216 - R:localhost/127.0.0.1:32258] WRITE: 12B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 51 00 00 00 0b 43 4f 4d 4d 49 54 00             |Q....COMMIT.    |
+--------+-------------------------------------------------+----------------+
2020-05-03 08:57:45.083 TRACE 12802 --- [-1 @coroutine#2] i.r.p.client.ReactorNettyClient          : [id: 0x36ce44a1, L:/127.0.0.1:38216 - R:localhost/127.0.0.1:32258] FLUSH
!!! 2020-05-03 08:57:45.084  INFO 12802 --- [in @coroutine#1] r.i.d.j.JvmBackendApplicationTests       : Inserted! executeAndAwait 1
2020-05-03 08:57:45.085 DEBUG 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient          : Response: CloseComplete{}
2020-05-03 08:57:45.086 DEBUG 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient          : Response: ReadyForQuery{transactionStatus=TRANSACTION}
2020-05-03 08:57:45.086 TRACE 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient          : [id: 0x36ce44a1, L:/127.0.0.1:38216 - R:localhost/127.0.0.1:32258] READ COMPLETE
2020-05-03 08:57:45.086 TRACE 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient          : [id: 0x36ce44a1, L:/127.0.0.1:38216 - R:localhost/127.0.0.1:32258] READ: 18B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 43 00 00 00 0b 43 4f 4d 4d 49 54 00 5a 00 00 00 |C....COMMIT.Z...|
|00000010| 05 49                                           |.I              |
+--------+-------------------------------------------------+----------------+
2020-05-03 08:57:45.086 DEBUG 12802 --- [tor-tcp-epoll-1] i.r.p.client.ReactorNettyClient          : Response: CommandComplete{command=COMMIT, rowId=null, rows=null}

You can see the logger message before commit actually happens, right after Netty FLUSH.

In cases with transactional and execute it works fine and awaits for database answer. Is it intentional behaviour, or bug?

Comment From: jhoeller

@mp911de Could you please clarify the intended behavior here?

Comment From: mp911de

The log statement Inserted! executeAndAwait $id should be printed after the CommandComplete{command=COMMIT …} line as executeAndAwait should await completion of the commit.

There's a bit more to this arrangement. awaitOne(…) is method that awaits either completion of the subscription or it awaits the first element and then cancels the subscription. In this case, the upstream subscription was cancelled (see FluxDiscardOnCancel : received cancel signal). I'm not fully sure about Coroutines semantics and the flow synchronization.

The R2DBC Postgres driver behaves correctly in the sense that the commit command properly awaits a response from the database. Maybe @sdeleuze can chime in.

Comment From: istarion

I found that awaitFirstOrNull() call in executeAndAwait function resumes continuation at onNext() stage, when we need to await for onComplete(). awaitSingle() and awaitLast() work fine, but they will throw NoSuchElementException instead of nullable result. (It can break existing code)

Maybe we need to implement await() extension function for Mono, which should wait for completion with nullable result? Publisher extension functions are really confusing in this context.

Comment From: sdeleuze

Indeed @istarion, the source code of Reactive to Coroutines support confirms the behavior you described.

One possible solution could indeed to add a awaitSingleOrNull() variant to kotlinx.coroutines that would have the behavior we need. We could then use it in TransactionalOperator.executeAndAwait. This need to be discussed with @elizarov and @qwwdfsad. The 2 questions I have in mind to help deciding if it is worth or not are: - Would awaitSingleOrNull() versus awaitFirstOrNull() be confusing for users? - Are there other use cases than Spring Reactive transaction support where this would be useful.

The alternative would be to leverage awaitSingle() with a try/catch block that would turn NoSuchElementException to null return value.

Worth to fix in 5.2.7 in any case.

Comment From: elizarov

👍 for awaitSingleOrNull. It is totally consistent with Flow.singleOrNull operator and with Kotlin standard library. In general, we have there:

  • firstXxx family of operators that look at the first element and ignore the rest of them.
  • singleXxx family of operators that expect the underlying collection/stream/flow to have just the single element and consider it an error if there are more.

I've filed: https://github.com/Kotlin/kotlinx.coroutines/issues/1993

Comment From: sdeleuze

Awesome thanks @elizarov, I will send you a PR.

Comment From: sdeleuze

Side note: we may want to replace awaitFirstOrNull() by awaitSingleOrNull() in other Spring Kotlin extensions as well.

Comment From: sdeleuze

Pull request submitted on Coroutines side.

Comment From: sdeleuze

Waiting kotlinx.coroutines 1.4.0-RC to be able to fix that one. It should be released next week but not sure when, I pushed for the release to happen in time for our RC2 so let's see if that make it or not.