Hi all,

I noticed some odd behaviour with consumer groups when using AOF for persistence. This issue is non-existent when using RDB, that is how I work around it for now.

I simplified it down to this model scenario: - AOF is being used for persistence - One producer is pushing data to a stream with XADD samples MAXLEN ~ 100000 * imei 1234 seqno n sample some_json - One consumer is doing blocking reads with XREADGROUP GROUP client1 client1 COUNT 1 BLOCK 0 STREAMS samples >, and XACK samples client1 $id after processing - I stop my producer and consumer in that order (all samples processed) - redis-clisays:

127.0.0.1:6379> xinfo groups samples
1) 1) "name"
   2) "client1"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1586992142527-0"
  • After I do a clean shutdown and restart, redis-cli says:
127.0.0.1:6379> xinfo groups samples
1) 1) "name"
   2) "client1"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1586992142406-0"

What you see is that we actually go back in time here with last-delivered-id. I noticed this issue initially when after a restart a massive amount of samples were re-processed. How far back we go seems random.

I inspected the AOF file, and what I see is that when it works as expected, XREADGROUP generates an XCLAIM in the AOF file to record that a sample is assigned to a consumer, and XACK just shows as XACK.

For long periods however, I see only the XACKs appearing with missing XCLAIMs. The last-delivered-id after a reload coincides with the last correct XCLAIM+XACK pair. I have a feeling (not certain here), that if XREADGROUP didn't block and delivers a message previously added to the stream, it works correctly, but if we block and we need to wait for an XADD the issue appears.

Some info on my environment/config: - Running in the latest redis:alpine 64-bit image (v5.0.8) - No conf file, i just pass --appendonly yes as flag, all the rest is default. - My consumer and producer are using the redis-go library for Go (v7.2.0) - redis-cli monitor shows that the libraries do what they are supposed to do

Let me know if there is any more info I can supply to help resolve this issue.

Regards, Wilfried

Comment From: antirez

I have a feeling (not certain here), that if XREADGROUP didn't block and delivers a message previously added to the stream, it works correctly, but if we block and we need to wait for an XADD the issue appears.

Yes, you are correct, the bug is exactly in such terms. Fixing.

Comment From: antirez

@wilfrieddaniels Redis 5.0.9 is out with the fix. Thank you.

Comment From: wilfrieddaniels

Thanks for the super fast fix!

Comment From: guybe7

@antirez pleae note that using propagate() may cause consistency issues (if XREADGROUP returns several IDs, it will replicate several XCALIM commands, which must be inside MULTI/EXEC) i will PR a different fix ASAP

(regardless, if you do go with propagate() - you should also "fix" streamPropagateGroupID)

Comment From: guybe7

@antirez please review https://github.com/antirez/redis/pull/7111

Comment From: antirez

@guybe7 do you think we have any contract of atomicity of the claiming here? Consider that the message needs to be acknowledged, so I believe handing this like if it was multiple commands executed in a row is fine.

Comment From: guybe7

if the master crashes during the replication of the XCLAIMs, the replica will have a half-baked state in which some of the read entries are considered "read" (i.e. in some PEL) while the rest aren't

Comment From: guybe7

the other option i though of is adding a int from_unblock flag to streamPropagateGroupID and streamPropagateXCLAIM which will make them use propagate instead of alsoPropagate

Comment From: antirez

An alternative could be to use also propagate only in the context of the command execution but don't use it in the unblocking case.

Comment From: guybe7

@antirez yes, that's what i meant... but TBH i feel it's nicer to allow alsoPropagate from unblocking context... that way we can also do a small rfactoring, sharing code between the unblocking of zsets/lists (e.g. rpoplpushCommandcan share code with serveClientBlockedOnList)

Comment From: antirez

@guybe7 I'm not sure the semantics of alsoPropagate() could be super clear if called from outside a client context. When a context starts / ends? Because the point of alsoPropagate() is MULTI/EXEC wrapping, and this only makes sense if what is the start and end of a given transactional moment is clear.

Btw there is another issue, currently it's hard to detect if we are or not in the context of a command execution. There is no server.in_call or alike that we can exploit so far, so it is also hard to just make alsoPropagate() to fall back to propagate() if called outside a command context (and that could avoid certain bugs like the one here).

So perhaps the simplest thing to do right now, is to just modify t_stream.c to also have an "external" argument, true when called from external contexts, and use propgate() instead of alsoPropagate().

Comment From: antirez

@guybe7 what about c479eace4?

Comment From: antirez

@guybe7 sorry I mean 5aa5e3af9

Comment From: guybe7

@antirez looks goods except that i don't understand the changes in server.c

Comment From: antirez

@guybe7 sorry this were just tests. They are totally useless and I'll remove them from the final commit.

Comment From: guybe7

@antirez please consider taking the tests from my PR https://github.com/antirez/redis/pull/7111

Comment From: antirez

@guybe7 TLDR the problem is that, with the current interaction between processCommand() and call(), we have server.current_client cleared only after we processed the clients that are blocked. We did some change in the past in order to really serve such clients waiting for keys ASAP, but the effect of this is that it is "so close" to the call to call(), that processCommand() will not have the chance to clear the server.current_client. Now an obvious improvement would be to move the call that handles blocked clients in processCommandAndResetClient() instead. I'm looking if this is likely to cause any damage, cause it's a much better fix. Let me check.

Comment From: antirez

@guybe7 ok the alternative is in the "stream-propagation-fix-2" branch. I think I like it more. However there is a small disadvantage with this approach, that is IMHO a bit more correct. On crashes when processing operations that will awake other blocked clients, before in the crash log we used to see the operation that triggered the unblocking. Now there would be no current client, that is more correct, but has less details in the crash log.

Comment From: guybe7

@antirez TBH i prefer https://github.com/antirez/redis/commit/5aa5e3af99092a5ad34a2d24ce4eb91c3f0c5227 because it seems simpler to me (i don't really like alsoPropagate calling to propagate)

Comment From: antirez

@guybe7 yep, you have a point about simplicity. At the same time this exposes some low level detail in the data structure implementation. Maybe another possibility is to take propagate() and call it syncPropagate(), and then have propagate() that just calls the right function depending on the context, that is a small refactoring on top of "stream-propagation-fix-2". I'm not sure however.

Also pinging @oranagra, @yossigo and @soloestoy to check what they think about this, all having worked with this stuff in the past.

Comment From: guybe7

@antirez IMHO having to make such decisions depending on the context (some server struct global?) is potentially dangerous because it's hard to debug. maybe the best thing to do for now is to go with https://github.com/antirez/redis/commit/5aa5e3af99092a5ad34a2d24ce4eb91c3f0c5227 and open an issue for some propagate/alsoPropagate re-thinking? it has become quite a confusing issue already...

Comment From: guybe7

@antirez after discussing this with @oranagra he noticed that even if we do go with https://github.com/antirez/redis/commit/5aa5e3af99092a5ad34a2d24ce4eb91c3f0c5227 we still have to wrap the XCLAIMs/XSETIDs with MULTI/EXEC (when unblocking a client) he suggested that: 1. both streamPropagateGroupID and streamPropagateXCLAIM will always use propagate 2. streamReplyWithRange will be in charge of replicating the MULTI/EXEC - that way we solve the consistency problem for both unblocked and never-blocked clients i too feel like it's the best solution

Comment From: guybe7

@antirez ping about this issue

Comment From: antirez

@oranagra @guybe7 are you sure we need that? I don't think we have cross-message consistency guarantees to provide with streams.

Comment From: oranagra

@antirez i'm sorry, i'm not fully aware of the full details of this issue, what i have to contribute without a full dive into it is this:

  1. i suppose there is no other place in redis in which a single command that runs on the master propagates multiple commands to the replica without multi-exec. maybe for the case of stream you argue that it isn't so bad and willing to give it some slack. but if it were for free (happens implicitly by the infra), would you still consider not adding multi-exec?

  2. i don't think it's good that this scenario would sometimes use propagate, and other times it would use alsoPropagate, and i really don't like the idea that it will be implicitly based on some global flag (current_client) that's used for other purposes and may some day get shifted and things will break (and in this case no one would really notice it's broken).

for these reasons, i think it's better to just always use propagate (either with or without multi-exec), since there are times were we can't afford to use alsoPropagate.

Comment From: antirez

@guybe7 @oranagra yep I agree with Oran that we could just revert to always using propagate(), and that's it. The reason why there are no consistency problems, and actually MULTI/EXEC will make it worse semantically, is that the consumer group holds state for N messages, but is not a data structure.

So basically, as soon as a message is delivered, we want to mark it as delivered, so that after a restart or a failover, what was surely delivered is not delivered again. If we wrap it with MULTI/EXEC, the effect is that even if we have the information of something already being delivered, we lose it. For instance if Redis crashes immediately after delivering certain messages and there is a failover, we want the replica to be aware at least of the already delivered messages.

So do we agree we want just to use propagate() and that's it?

Comment From: antirez

(I reopened given that we still have to find a good solution we agree about).

Comment From: guybe7

@antirez yes, that makes sense

Comment From: oranagra

sounds good.. make sure to add a comment why you prefer not to use multi-exec.

Comment From: antirez

Ok I'll do for sure. Thanks.

Comment From: antirez

@oranagra @guybe7 done, closing.

Comment From: guybe7

@antirez i see the commit only in the 6.0 branch, don't we want it in unstable and 5.0?