[edit] See Oran's proposal here: https://github.com/redis/redis/issues/11116#issuecomment-1221605330
Hi I wanted to make priority queues based on streams. Suppose I have two queues: priority and normal. And several workers, and I want to read messages one by one, starting with the priority ones. I used XREADGROUP with two keys at once and count 1. However, if there are messages in both threads, I get two messages at once, although I only need one. How can I receive only one message? Is it possible to requeue the second message so that another free worker can process it?
I tried to solve this problem and make one stream and make two groups in it: priority and normal, However, when adding a message, you cannot determine the group where to send it. Why? I also realized that it is impossible to transfer a message from one group to another using X(AUTO)CLAIM.
Also I would like the ability to send a message to a specific consumer. But for some reason, the group for the message is determined only when XREADGROUP is called, and as far as I understand, there are no other ways to determine it. How to create a message immediately to a specific group or a specific consumer?
At the moment, it seems to me that there is not enough flexibility in the work of threads, or I am doing something wrong. Does anyone have any solutions for these issues? Can you add this functionality?
Comment From: forkfork
Could you use a Lua script (EVAL) to check the streams through XINFO GROUPS key, something along the lines of:
local hi_info = redis.call('XINFO','GROUPS','highpriority')
local low_info = redis.call('XINFO','GROUPS','lowpriority')
local hi_stream = redis.call('XINFO','STREAM','highpriority')
local low_stream = redis.call('XINFO','STREAM','lowpriority')
local LASTENTRY = 14
local LASTDELIVERY = 8
if hi_stream[LASTENTRY][1] > hi_info[1][LASTDELIVERY] then
return redis.call('XREADGROUP','GROUP','reader','consumer1','NOACK', 'STREAMS','highpriority','>')
elseif low_stream[LASTENTRY][1] > low_info[1][LASTDELIVERY] then
return redis.call('XREADGROUP','GROUP','reader','consumer1','NOACK','STREAMS','lowpriority','>')
end
This will read messages from the high priority queue if available, otherwise will read from the low priority queue.
tim@g14:~/git/redis$ ./src/redis-cli xadd highpriority "*" message alarm
"1660453297108-0"
tim@g14:~/git/redis$ ./src/redis-cli xadd lowpriority "*" message "nothing important"
"1660453319558-0"
tim@g14:~/git/redis$ ./src/redis-cli --eval priority.lua
1) 1) "highpriority"
2) 1) 1) "1660453297108-0"
2) 1) "message"
2) "alarm"
tim@g14:~/git/redis$ ./src/redis-cli --eval priority.lua
1) 1) "lowpriority"
2) 1) 1) "1660453319558-0"
2) 1) "message"
2) "nothing important"
tim@g14:~/git/redis$ ./src/redis-cli --eval priority.lua
(nil)
Comment From: itamarhaber
Hello @dskikot,
The way to go about this, IMO, is to have two streams - priority and normal. Consumers can read from only one or both. The order of stream keys in XREADGROUP is how Redis will reply. That means that when you call this:
redis> XREADGROUP GROUP somegroup someconsumer BLOCK 1 COUNT 1 STREAMS priority normal > >
The consumer will first get a message from the priority stream, if there are new ones, and then from the normal stream (if there are new ones).
Comment From: dskikot
Hello @itamarhaber That's just how I tried it. Perhaps I was inaccurate in the question. I'm using redis on python.
import redis
from random import randint
r = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
stream_priority = 'priority'
stream_normal = 'normal'
group = 'group1'
consumer1 = 'consumer1'
consumer2 = 'consumer2'
r.xadd(name=stream_priority, fields={'_id': randint(0, 100)})
r.xadd(name=stream_normal, fields={'_id': randint(0, 100)})
try:
r.xgroup_create(name=stream_priority, groupname=group, id=0)
r.xgroup_create(name=stream_normal, groupname=group, id=0)
except:
pass
message1 = r.xreadgroup(groupname=group, consumername=consumer1, streams={stream_priority: '>', stream_normal: '>'}, count=1)
print('consumer1 message', message1)
message2 = r.xreadgroup(groupname=group, consumername=consumer2, streams={stream_priority: '>', stream_normal: '>'}, count=1)
print('consumer2 message', message2)
Output:
consumer1 message [['priority', [('1660508221412-0', {'_id': '24'})]], ['normal', [('1660508221412-0', {'_id': '31'})]]]
consumer2 message []
Comment From: guybe7
@dskikot thanks for reporting this issue
@oranagra TL;DR XREADGROUP's COUNT is per-key rather than a global limit when the client can be served synchronously. if the client gets blocked, COUNT is a global limit.
it seems to be by design because the behavior is documented:
... COUNT option in the example, so that for each stream the call will return a maximum of two elements per stream
https://redis.io/commands/xread/
but I would argue that it should be considered a bug: 1. it is counter-intuitive 2. other synchronous commands that take multiple keys and a COUNT behave in a way where COUNT is global and not per-key (LMPOP, ZMPOP)
Comment From: guybe7
note: the approach above is kind of big braking change
Comment From: dskikot
@guybe7 I'm not sure what I understood, but it works exactly the same with the block option. The same script as above, but with these lines at the end:
message1 = r.xreadgroup(groupname=group, consumername=consumer1, streams={stream_priority: '>', stream_normal: '>'}, count=1, block=5000)
print('consumer1 message', message1)
message2 = r.xreadgroup(groupname=group, consumername=consumer2, streams={stream_priority: '>', stream_normal: '>'}, count=1, block=5000)
print('consumer2 message', message2)
Output the same:
consumer1 message [['priority', [('1660681878816-0', {'_id': '70'})]], ['normal', [('1660681878816-0', {'_id': '33'})]]]
consumer2 message []
Comment From: guybe7
@dskikot yes, because the clients weren't actually blocked (because the stream had entries to read) if you run one script that does a blocking XREADGROUP and then a script that does the two XADDs (even in a transaction), the blocking XREADGROUP would return just one entry
Comment From: dskikot
@guybe7 How long will it take to fix it? What about the second part of my post? About adding the ability to specify a group or consumer on XADD command. Or do I need to create a new issue about it?
Comment From: oranagra
@guybe7 the thing is that all the other commands [B]LMPOP [B]ZMPOP can take multiple keys, but they always return the elements from only one key (both the blocking and non-blocking modes).
With XREAD we have a unique capability to return the elements from more than one key, but that capability is (currently?) not being used in the blocking variant.
what i think should be done: 1. add a notation in which a negative count would mean the overall maximum number of elements to return (positive is per-stream, and negative is global). 2. improve the blocking variants of xread to be able to serve from more than one stream if available.
@guybe7 @itamarhaber WDYT?
Comment From: guybe7
@oranagra sounds good to me
Comment From: guybe7
@dskikot specifying a group/consumer in XADD defies the idea behind streams (a stream entry should not "belong" to any group, it is a part of an append-only log structure that can be consumed by any group or consumer)
the solution proposed above by @oranagra will solve your problem. it is somewhat a breaking change (returning multiple entries in the blocking variant) si I'm not sure it can be released before 7.2 but I might be wrong. @oranagra ?
Comment From: oranagra
the first change i proposed is not a breaking change. the second one would indeed be a behavior change, but considering that the command already has the capability to return records from more than one key in the non-blocking mode, i think it means the callers are likely to handle that correctly already. we can implement that, and then consider how / when to release..
Comment From: dskikot
@guybe7 Well, perhaps this is a slightly different problem. But if a stream has multiple consumer groups, why can't a message be passed between them?
Comment From: guybe7
@dskikot if a stream has multiple groups, the same message can be consumed by each of them (i.e. groups are not aware of each other) if you have message M in stream X, with groups G1 and G2, issuing an XREADGROUP on G1 will return M, and the same applies for G2