Is there a way to set idle time, so after an event has been hold up by a consumer without getting ack (for the amount of idle time), the event will be put back into the consumer group to be claimed by another consumer?

Comment From: madolson

You could achieve this with a LUA script that first checks if there is a pending item older than some amount of time, otherwise do a XGROUPREAD. This is likely slow and may not fit your use case.

I think having a first class function for this type of behavior makes sense though. Something like XGROUPREAD [IDLE-TIMEOUT TIME] might be a useful feature. A question for you, were you looking to set the idle time on the key (All xgroupreads to that key have a timeout), the consumer group (All xgroupreads for a consumer group) or only for a single xgroupread command? I think we could consider implementing something like this if you want to update this to a feature request.

Comment From: Kama0165

That would be fantastic :0. I was thinking of setting the idle time on the consumer group. So once the idle time has been exceeded for an event, the event get unassigned from a consumer and automatically reassigned to a consumer that is available. I don't know how feasible is that. But it would be great if this can work. This way we don't have to create code separately to handle pending messages due to system failing in the middle of event processing.

Comment From: guybe7

@Kama0165 just to verify: for your use-case you need an optional IDLETIME arg to XREADGROUP that means the following: go over the group's PEL, if there's an entry where (now-delivery_time < IDLETIME) then remove it from the consumer that currently has it and add it to the consumer that was given in the current XREADGROUP (basically simulate an XCLAIM) did i understand correctly?

Comment From: Kama0165

yes, that is correct. Is this feasible?

Comment From: guybe7

yes, but i'm a bit worried about the time complexity here... since it is relative to now that XREADGROUP will have to start scanning the PEL from the beginning every time we execute it... i guess the best way to go is to make sure COUNT has some low value (unless explicitly set to 0). that COUNT will mean "the number of entries to scan" rather than "maximum number of entries to return". @madolson any feedback?

Comment From: madolson

We could do it that way, we could also store the idle information in a separate rax data structure. There should be no overhead if you don't use any timeouts. There might also be some happy middle ground where we try to optimize how often we scan the PEL structure.

This change will introduce a RDB change, since we will need to persist the idle timeout, so we should think it through well.

Comment From: guybe7

we could also store the idle information in a separate rax data structure.

do you mean where the key is streamID and value is idle-time? storing the idle-time as a key/value of a rax will require us to contantly update it

There should be no overhead if you don't use any timeouts.

what do you mean by "use timeouts"? the new "timeout" arg for XREADGROUP? and by "overhead" you mean CPU overhead, right? because if we go with another rax, there will be memory overhead

Comment From: guybe7

how about writing a new command:

XTRANSFER key group consumer min-idle-time COUNT count

(or modify the existing XCLAIM)

the user will have to run it from time to time (same as with the XREAGROUP TIMEOUT option) there are two possible outcomes:

  1. some entries were transferred to the dst consumer: "old tasks" got someone to handle them so the application is doing fine
  2. no entries were transferred: there are no "old tasks" so even in this case the application is doing fine

time complexity is O(count)

Comment From: Kama0165

I am sorry, I am a little confused here, and I want to make sure I understand this correctly. The purpose of XREAGROUP TIMEOUT option was to give a time constraint between reading messages right?

I am trying to containerizing my service and to have consumer auto-generated. The issue with XClaim is that you need to keep track of consumers. You need to check a specific consumers pending list and reassigned it to a consumer. That is why I would like to have this feature where you set an idle time and once an event reach that time, it would be auto un-claimed and be put back into the consumer groups pending list to be claimed by another consumer automatically. The solution proposed by @guybe7 wouldn't really work, because you still need a consumer to be transferred to. Also what would happen if the consumer you are transferring everything to, dies? There will still be events lost to that consumer.

Comment From: madolson

Let me rephrase, I think there are two separate approaches: 1. We update XCLAIM (or implement XTRANSEFER) so that it will check the pending list and claim items that have exceeded a specific time. Just like @guybe7 mentioned. I think this should work in the architecture that @Kama0165 mentioned, since there is no knowledge of which consumer has which items. There are some optimizations we could do here to limit impact. 2. We update XGROUPREAD so that it set's how long an item is claimed by the consumer before it goes back to the general pool. Something like XGROUPREAD [IDLE-TIMEOUT

The idle-timeout is optional and is the amount of time until the read should be considered timed out and can be picked up by another consumer. If it's provided, the absolute idle-timeout will be stored on the consumer group. On every XGROUPREAD we will check that internal structure to see if anything has exceeded the lowest idle timeout, and if it has, implicitly run an XCLAIM to move the ownership to the new consumer.

Internally we will maintain a second radix tree with KEY: idle-timeout and VALUE: stream ID. On every read with a timeout, we populate this timeout radix tree and the PEL. Every XACK will remove the item from the PEL, retrieve the timeout, and then remove it from the timeout radix tree. For a user that wants a timeout, this will consume memory and CPU. If a user doesn't use timeout, they will pay no extra penalty either for memory or CPU.

Option 1 is simpler to implement since there is no change to the underlying data structure. Option 2 is simpler for users, since the client just needs to keep calling the same API and has no performance penalty for users who don't want to use it. I also think option 2 is more inline with what Redis typically does, but it is complex, so maybe option 1 makes more sense.

@Kama0165 Thoughts?

Comment From: Kama0165

Would it be possible to do something like this? XGROUP CREATE mystream consumer-group-name 0 [IDLE-TIMEOUT ]

I like option 1 since it is simpler to implement.

Comment From: guybe7

@oranagra @madolson @yossigo @soloestoy we need to pick an option and design/implement

Comment From: guybe7

@madolson could you explain why do we need another rax? i mean, why shouldn't XREADGROUP TIMEOUT do the following things: 1. go over the PEL (cap it with COUNT) and "claim" all entries that are "too idle"; and then 2. read entries from stream (new entries)

also,

the absolute idle-timeout will be stored on the consumer group

what do you mean by "on the consumer group"? will two XREADGROUP with different idle-timeout change something in the group itself?

Comment From: madolson

@guybe7 I was trying to highlight two different types of approaches. The first option is just as you suggested, you can enhance a read so that it can also scan the PEL. The second option was to actually specify the timeout when you do the read, which would store new data in the consumer group (and I suggested a RAX). The only reason I brought up the second idea is that it was more core to what @Kama0165 was asking, but I don't think the complexity justifies it since it would involve replication changes.

So does XREADGROUP [MIN-IDLE-TIME TIME], which does as you suggested, seem like a good approach then? It seems like the ideal case is if clients don't have to call a special command periodically and can just keep calling XREADGROUP and idle items will eventually get claimed.

Comment From: guybe7

i'm a bit leaning towards changing XCLAIM / adding XTRANSFER just because xreadCommand and streamReplyWithRange are already quite complex and i'm scared at some point they're gonna be too packed with logic... not sure if this argument is enough justification..

i mean, the user anyway has to execute XREADGROUP every once in a while (or BLOCK - btw how will that work with MIN-IDLE-TIME?..) so does it really matter if he executes also an XCLAIM/XTRANSFER? having another command might also help the user to distinguish between "new" entries and "transferred" entries more easily...

maybe we need one of the @redis/core-team to have a look and decide

Comment From: madolson

I agree about the increased complexity. Maybe I'll take a stab at the implementation to see how ugly it actually is and if it's really overly complex we can fallback to the XTRANSFER approach?

@redis/core-team TL;DR. The problem, stream items have to be manually claimed when a client dies with XCLIAM + XPENDING. This is a complex solution.

The two proposals to fix it: - Implement XTRANSFER as a new command - - Effectively XPENDING + XCLAIM in one command based on timeout. - - Easier to implement since it's just those two commands. - - Replicated as an XCLAIM - Extend XGROUPREAD so that it also can scan PEL for timed out items - - Simplifies API usage, no new API calls from client just modifying existing one - - More complex for implementation since it needs to handle all of the weird XREAD variants - - Blocking needs special handling. Since timed out items should be the edge case, only checking up front might be okay. - - Replicated as an XCLAIM

Comment From: madolson

I also just found a related item: https://github.com/redis/redis/issues/5212

Not very clear what antirez position was in the end, though.

Comment From: oranagra

I only skimmed though the 3 linked issues, so i may be missing many things, so let me see if i get it right: - XTRANSFER (or XAUTOCLAIM which i prefer) is easier to implement, but harder to use than XREADGROUP IDLE. - One of the reasons that the XREADGROUP IDLE approach is harder to implement is because it needs to handle a lot of other cases that XREADGROUP handles (like blocking and possibly others). - Additionally since XREADGROUP can now return both transferred entries and normal ones, we may want to return some metadata, which is hard to do (requires changing the response format). for XAUTOCLAIM that's not a problem since the user knows what was a result of each operation, but on the other hand, some use cases don't care about that info.

So a possible solution is to implement both, and make that feature of XREADGROP limited. I.E. in XREADGROP IDLE, if there are entries to be claimed, it returns just those and doesn't mix the two sources in the same response, this way we don't need to support the blocking mode. It would still be simple to use, and can make the implementation simpler. It's suitable for users that just want to get some entries without needing to implement background reclaiming task, and don't care where entries came from. On the other hand, we'll also add XAUTOCLAIM, and an IDLE argument filter for XPENDING. so that users that need to know where entries came from can use either XAUTOCLAIM (easy and efficient), or use XPENDING IDLE if they need more control (paying the extra round trip).

That makes the XREADGROUP IDLE feature an optional feature that's just simplifying some use cases, we can make it as limited as we want, depending on the complexity of supporting the various features of XREADGROUP, and document the limitations (possibly improving some in the future)

Comment From: guybe7

for the record, IMHO XREADGROUP already has too many "modes" and adding another "mode" will make it even harder to maintain (not to mention the already-too-confusing docs)

i think we should go with XAUTOCLAIM and XPENDING IDLE only

Comment From: madolson

I like the idea of making XREADGROUP limited, it's optimizing for the common use case which is someone just wants to also pull items periodically and not worry about failures.

I'm sort of stuck on the point that it seems to be inline with what most people have been looking for and XPENDING IDLE and XAUTOCLAIM are just the features that are easy to implement.

Comment From: valentinogeron

5934 may be related (another way to unregister an event from consumer PEL)

Comment From: johncantrell97

Just got pointed here from #5212 . I've implemented a slightly modified version of the lua script found in #5212 to do XPENDING + XCLAIM for items with idle time greater than some threshold but it's extremely slow and would love to see a native way to do this (assuming it would be faster). Would also love thoughts on how to improve the performance of the lua script in the mean time (my only thought was the XCLAIM is happening once for each entry instead of passing in all the ids to be retried in a single call.. but I can't figure out how to convert a table/array of ids into function call arguments in Lua.)

My use case is to implement a retry mechanism for entries that are not ACK'd in time. My lua script has an exponential back-off based on the retryCount (something like (2^retryCount) * baseRetryMilliseconds. With baseRetryMilliseconds set to 60000 so it retries after 1, 4, 8, etc minutes with a max of X retries before moving the entry to a dead-letter-queue.

From what I can gather it sounds like XTRANSFER proposal would basically replace my lua script and would function almost identically (hopefully significantly faster though). Really though I think in the end what I am really seeking is the XREADGROUP proposal to just have timed out entries re-enter and be returned similarly to new entries (perhaps with a retry flag + entry date so I could 'skip' the entry if I wanted to still implement the back-off logic).

Sounds like XTRANSFER solution is cleaner and much simpler and it also works for me given I've already implemented a lua version of it. Would be happy to test (or even help directly) an XTRANSFER implementation.

Comment From: guybe7

i think we should solve this issue in the following order of implementaion: 1. XPENDING IDLE - very easy to implememnt 2. XAUTOCLAIM - also very easy 3. XREADGROUP IDLE - can only return entries from the PEL - no new entries. will not have a blocking option 4. (for https://github.com/redis/redis/issues/5934 XREADGROUP REJECTED, XREJECT - similar conditions to XREADGROUP IDLE)

@redis/core-team please approve

Comment From: oranagra

sounds good to me (i do think it's nice to have all these options). and indeed we should first implement the ones that are easy to implement (and harder to use), and then implement the one that's convenient to use but have it a bit limited so the implementation isn't too complex and fragile.

timing wise, i think this effort, and many other non-trivial streams improvements should be a good fit for redis 7 (so not merged to unstable before 6.2 is released). i wanna avoid adding some API and then when doing more changes to streams regret that it was already released and i can't make changes in it.

maybe if we are sure that the simpler ones (first two) are trivial to implement and we are very unlikely to want to change them later, we can ease some of the pain for people and add them to 6.2 anyway.

Comment From: madolson

I'm fine with option 1 and 2. I haven't looked at 4.

I think option #3 is missing the point. The the purpose of adding idle to XGROUPREAD is so that you can call a single command that will eventually pull everything out of the stream, it's simplifying the client side code. I think it should be able to return items both from the PEL and new items, otherwise you still have to periodically switch between them.

Comment From: oranagra

@madolson the idea in having XREADGROUP IDLE to return items from either the stream or the PEL, didn't mean that the user should switch between them, the user will always use the same command and arguments, it's just that to simplify the implementation (can later be changed), we will not return items from both sources in the same call. so one call will return the PEL items, and the next call will return items from the stream. still think it's missing the point?

Comment From: madolson

@oranagra Your suggestion sounds good. If that was what guy intended that I see no other concerns. Thanks for clarifying.

Comment From: marcbachmann

To people coming here and looking for a solution, the easiest one for now would probably be to implement some healthcheck loop on the consumer side.

e.g. run ZADD periodically to set the consumer status with a timestamp

ZADD activeconsumers 1604312654870 stream@group@consumer

And then you can retrieve inactive consumers using ZREVRANGEBYSCORE:

ZREVRANGEBYSCORE activeconsumers 1604312654870 -inf
Some lua script cleaning up everything
      local consumerskey = "activeconsumers" -- or use KEYS[1]
      local timestamp = tonumber(ARGV[1]) -- a timestamp since epoch in milliseconds
      local ttl = tonumber(ARGV[2] or "120000") -- ttl in milliseconds

      local exp = redis.call("SET", consumerskey .. ":cleaned", timestamp, "PX", ttl, "NX")
      if exp == nil then return end

      local inactive = redis.call("ZREVRANGEBYSCORE", consumerskey, timestamp - ttl, "-inf")
      local cachedbystream = {}
      local cleaned = {}
      for c = 1, #inactive do
        local match = string.gmatch(inactive[c], "[^@]+")
        local stream = match()
        local group = match()
        local consumer = match()

        local source
        local destination
        local consumers = cachedbystream[stream .. "@" .. group] or redis.call("XINFO", "CONSUMERS", stream, group)
        cachedbystream[stream .. "@" .. group] = consumers

        local i = 1
        while consumers[i] do
          if consumers[i][2] == consumer then source = consumers[i]
          elseif destination == nil and consumers[i][6] < ttl then destination = consumers[i] end
          if destination and source then break end
          i = i + 1
        end

        -- the consumer might already be removed in extreme edge cases (if it got deleted manually)
        if source == nil then
          redis.call("ZREM", consumerskey, stream .. "@" .. group .. "@" .. consumer)

        -- otherwise reassign all messages to a healthy consumer
        elseif destination and source[6] > ttl then
          local count = 0
          if source[4] > 0 then
            local finished = false
            while finished == false do
              local pending = redis.call("XPENDING", stream, group,  "-", "+", 1000, source[2])
              count = count + #pending
              if #pending > 0 then
                local ops = {"XCLAIM", stream, group, destination[2], 0}
                for t = 1, #pending do
                  ops[t + 5] = pending[t][1]
                end
                ops[#ops + 1] = 'JUSTID'
                redis.call(unpack(ops))
              else
                finished = true
              end
            end
          end

          redis.call("ZREM", consumerskey, stream .. "@" .. group .. "@" .. source[2])
          redis.call("XGROUP", "DELCONSUMER", stream, group, source[2])
          if count > 0 then
            cleaned[#cleaned + 1] = {"stream", stream, "group", group, "destination", destination[2], "source", source[2], "count", count}
          end
        end
      end
      return cleaned

Comment From: adriangb

One use case that I don't see mentioned is pulling and processing messages with large variable execution times. For example, a message might be a pointer (URL) to a file in S3 that gets downloaded and fed into an ML model for inference. The execution time is going to be very different for 1 data point vs. 1,000,000 data points. Not to mention it is very hardware dependent. There's no way to know ahead of time what a reasonable "idle" timeout is. If you set something too large and your consumer crashes it's going to take a really long time to re-process that message. The way this is usually handled with SQS or Pub/Sub is setting a relatively short (say 30s) visibility timeout and having something (a thread, etc.) "extend" that timeout in the background while each message is being processed. The result is that the consumer can take as long as needed but if it crashes the message will be back on the queue within 30 seconds. As far as I can tell this use case would not be covered by any of the solutions proposed thus far, right?