Currently redis streams worker groups have to jump through hoops to properly manage unfinished (ie: pending by another client) messages. IMHO this is wholly unnecessary with a very simple addition to XREADGROUP:
Proposal Add an optional IDLE argument to XREADGROUP, which does the following, if defined: - check if any pending messages in the group are older than IDLE milliseconds. If so, immediately claim them and return them - otherwise, continue as normal.
This tiny change on the redis server would allow users of XREAD to pretty much upgrade to XREADGROUP with no additional complexity to their logic. There would be no stuffing around with XPENDING and XCLAIM, which add unnecessary latency and complexity to client code, and will inevitably lead to implementation bugs by each client.
Rationale
The current recommended approach when using XREADGROUP is to have downstream clients periodically invoke XPENDING to manually assess whether there are any messages which have not been processed for a certain age, and then requires them to separately execute XCLAIM for these messages. Because this is not atomic on the server side it becomes a case of aysnc ping-pong:
- XPENDING to the server
- (get a list of pending messages)
- XCLAIM to the server
- (perhaps claim these messages, perhaps not if someone else processed or claimed them in the interim)
- XREADGROUP
This makes no sense. Almost the only pattern a consumer will want to follow is retrieving any old messages which aren't processed, and only taking new messages otherwise.
Comment From: nicois
@antirez now that v5 is out, would you have time to comment on the above. Am I missing something?
Comment From: antirez
Hello @nicois! This looks a very interesting proposal to me. I think that there is some value in differentiating the normal messages from the claimed messages, because, for instance, claimed messages needs to be reported, moreover we have a processing-attempts counter that allows to create dead letters based on apparently un-processable messages. However for people that don't have such requirements the IDLE option may be a nice shortcut. It's the first time I read such proposal (sorry for Redis 5 I mostly focused on critical issues only affecting Redis 5), but I'll review it with more care and consider adding such subcommand.
Comment From: antirez
P.S. Maybe the output of the messages when XREADGROUP IDLE is used should also change in order to emit the additional metadata needed, such as: firstly read or claimed, and at the same time to return the number of delivery attempts.
Comment From: nicois
potentially the extra metadata could be useful, and it certainly can be ignored by clients for whom it is not required. I imagine many systems would not need to differentiate between messages first read by this consumer, or those which are being reprocessed because of previous failures.
Certainly various monitoring tools should be keeping track of processing failures and the (virtual) queue size for consumer groups, though I believe this can be kept separate from the logic for actually consuming the messages.
Comment From: antirez
Without the metadata what happens is that when there are impossible to process items, they pile up more and more, and to go into this situation is very simple. Imagine for instance if the message describes a task in JSON format, and the JSON is corrupted for any reason. Or, as it happens more often, that there is a bug in the workers that will prevent them from processing a subset of the messages. With IDLE, because it is so "high level", what happens is just that we see the consumer group having a larger PEL, but this is indistinguishable from other reasons that will make the PEL longer, for example just a spike in the production of messages. So to be fail, I think that actually most users will require to inspect the metadata into the message if they use IDLE. Instead implementing claiming manually the user understands much better the semantics of what she/he is trying to do.
Users using IDLE (I actually plan to call the feature differently, like CLAIM or alike) not inspecting the metadata, should do anyway the work of periodically using XPENDING, checking at the same time if there are messages that appear to have a large idle time or alternatively a big delivery attempts counter. That would be very similar to having to do XPENDING+XCLAIM, so basically I think that such feature without the additional metadata in the output risks to be almost unusable.
So my plan is to see a bit what happens in the wild, and then understand what's the best way we could implement this feature. Fortunately the feature appears to be technically possible: what I mean is that this whole mechanism is replicated to replicas / AOF, and right now what to do is very clear for each command. Moreover XPENDING is a read-only command, allowing the check for unprocessed messages to be executed on the replicas instead of using the master for the same goal. However it looks like that anyway, fortunately, the CLAIM option would result into replicating an XCLAIM command to replicas, exactly like anyway happens when XREADGROUP is called, for each message returned.
Surely food for thought... thanks.
Comment From: antirez
Btw probably a good name for such feature would be AUTOCLAIM.
AUTOCLAIM <min-idle-time>
There is to think if we should also provide a count, but I think it should simply claim as much messages as possible according to the current COUNT option.
Comment From: vladfaust
Hi, what's the status of this request? I'd personally love to see it implemented.
Comment From: nitzanav
@antirez I think that use cases are already clear since streaming solutions and message queues (this one I think combines both requirements) are out there for years / decades and clear.
RabbitMQ, SQS, resque, sidekiq, bull, kue (and I guess all others) - have this "visibility period" that after x time of no ack or no nack, message returns to queue. I know it is a feature that is more acceptable in message queues rather than streams, but if this is part of Redis Streams the it is the most powerful message queue and stream out there!
Comment From: vflopes
I'm massively using Redis streams and this would be a great feature to have and to simplify my consumers logic. I know there are some design decisions like capped streams does not remove deleted messages from PELs (I think XDEL doesn't removes too, although I avoid using XDEL). To keep my consumers simplified I've created a Lua script to implement all the retry logic on Redis side (and I'm calling XRETRY.lua on my apps :laughing: )
--[[
Retry stalled out messages on streams:
Input:
KEYS[1] 'group' // The group to read messages
KEYS[2] 'consumerId' // The consumer id that is ready to claim messages
KEYS[3] 'retryLimit' // Maximum number of retries
KEYS[4] 'count' // Maximum number of messages to be claimed
KEYS[5] 'pageSize' // Pagination size when iterating over pending messages list
KEYS[6] 'deadline' // Minimum time (ms) that a message must be in pending list to be considered stalled out
ARGV[N] // Streams key
]]
local rcall = redis.call
local toRetry = {}
local claimed = {}
local retryLimit = tonumber(KEYS[3])
local count = tonumber(KEYS[4])
local pageSize = tonumber(KEYS[5])
local deadline = tonumber(KEYS[6])
for i = 1, #ARGV do
local pendingSummary = rcall("XPENDING", ARGV[i], KEYS[1])
if pendingSummary[1] > 0 then
local pendingIndex = pendingSummary[2]
while pendingIndex ~= "" and #toRetry < count do
local pending = rcall("XPENDING", ARGV[i], KEYS[1], pendingIndex, "+", pageSize)
pendingIndex = ""
for t = 1, #pending do
if pending[t][4] < retryLimit and pending[t][3] >= deadline then
pending[t][#pending[t]+1] = ARGV[i]
toRetry[#toRetry+1] = pending[t]
end
pendingIndex = pending[t][1]
end
if #pending < pageSize then
pendingIndex = ""
end
end
end
if #toRetry >= count then
break
end
end
for i = 1, #toRetry do
local claim = rcall("XCLAIM", toRetry[i][5], KEYS[1], KEYS[2], deadline, toRetry[i][1], "RETRYCOUNT", toRetry[i][4]+1)
if #claim > 0 then
if claim[1] ~= false then
toRetry[i][#toRetry[i]+1] = claim[1][2]
claimed[#claimed+1] = toRetry[i]
else
rcall("XACK", toRetry[i][5], KEYS[1], toRetry[i][1])
end
end
end
return claimed
I know, this script is not the most performative at all (it's looping through the entire PEL list), but I'm using it anyway for the sake of simplicity of my consumers (a sad tradeoff). It's a legit case for many people that are unleashing the power and beauty of Redis Streams (I really like it and I used to do the things that I do with Redis Streams on Kafka/RabbitMQ).
And I like AUTOCLAIM naming, but @antirez any idea how the retry counter will be sent in XREADGROUP reply? It's important to consumers know about how many times that message was delivered (to avoid retrying on anomalies/unprocessable messages).
Comment From: johncantrell97
Bumping an old thread because I'm running into this use case and would love to know if there any any new thoughts on the best solution to this problem until something can be included into core redis.
I am trying @vflopes solution via lua scripting but finding it really slow. Also, I noticed that he's currently XCLAIM one entry at a time even though the command supports multiple ids. I can't find a way to support passing arbitrary length of ids via LUA. I tried table.concat() into single string of ids separated by space and also tried passing all args as a single table (and table.move()'d the ids into the args table) but neither of these solutions worked. Even if it did handle XCLAIM'ing all at once I'm not sure how much faster it would really end up being.
Also seems like it unnecessarily is incrementing retry count when XCLAIM does this by default?
Regardless, it does feel like a missing feature of the redis streams consumer group feature. If you are using consumer groups you almost always need to support a retry on failure method.
Thank you!
Comment From: madolson
@johncantrell97 Would you mind reviewing some of the approaches mentioned in #7649, specifically the XTRANSFER implementation to see if they meet your use case.
Comment From: marcbachmann
@johncantrell97
I am trying @vflopes solution via lua scripting but finding it really slow. Also, I noticed that he's currently XCLAIM one entry at a time even though the command supports multiple ids. I can't find a way to support passing arbitrary length of ids via LUA.
you can use unpack(table) to "spread" the arguments.
In this script I'm claiming all messages of another consumer:
-- KEYS[1] // The group to read messages
-- KEYS[2] // The destination consumer id id that claim messages
-- KEYS[3] // The source consumer id from which we want to take messages
-- ARGV[1] // Stream key
local pending = redis.call("XPENDING", ARGV[1], KEYS[1], "-", "+", 10000, KEYS[3])
if #pending == 0 then return 0 end
local ops = {"XCLAIM", ARGV[1], KEYS[1], KEYS[2], 0}
for t = 1, #pending do
ops[t + 5] = pending[t][1]
end
ops[#ops + 1] = 'JUSTID'
return #redis.call(unpack(ops))
Attention, I haven't tested the script with a big amount.
Comment From: johncantrell97
Oh cool, was looking for a 'spread' operator in lua -- didn't look hard enough I guess. Thanks!
I've moved on from this lua script to XAUTOCLAIM here (https://github.com/redis/redis/pull/7973)
Comment From: itamarhaber
Attention, I haven't tested the script with a big amount.
As long as big is less than about 8000 iirc it "should work". Around that number the hard coded Lua stack limit will stop you with a runtime error.
That said, we're hoping to provide a better solution soon(tm).
Comment From: zamu-flowerpot
Any information on when this might be on the horizon? My team is interested in using redis as a persistent message broker but this seems to be a bike shed for us which is leading for some people to propose much more complex systems just for the DX/UX.
Comment From: ranshid
@zamu-flowerpot isn't the 6.2 XAUTOCLAIM answer your need?
@oranagra I might have missed some details in the conversation, but shouldn't we close this as done in 6.2?
Comment From: oranagra
AFAIR, XAUTOCLAIM doesn't completely solves it, in the sense that it's slightly less convenient to use compared to XREADGROUP IDLE.
i see that here we had a plan, to introduce XPENDING IDLE and XAUTOCLAIM which were easy to do, and still wanted to do the rest some day (the other parts were considerably harder to implement).
that said, i suppose the main complaints in this issue about the atomicity and "aysnc ping-pong" are handled, and we have the plan in the other ticket, maybe we can close it and keep tracking the other one.
@guybe7 please keep me honest.
Comment From: zamu-flowerpot
@zamu-flowerpot isn't the 6.2 XAUTOCLAIM answer your need?
While I don't think calling XAUTOCLAIM and then XREADGROUP is particularly complex and sort of solves the problem, it will always be two round trips on the network. Additionally, the only way to improve it's performance is through a Lua script on redis.