How to get consumer group lag in Redis Streams like in Kafka, This is equal to number of messages in streams - number of messages consumed by a consumer group...

How can we scale number of consumers in a consumer group if the load increases based on consumer lag?

https://stackoverflow.com/questions/53188211/how-to-get-the-lag-size-of-a-consumer-group-in-redis-stream

I am not talking about pending messages or PEL... That serves a different purpose...

Comment From: justinrush

I also have a need for this functionality with the use case being creating a k8s HPA that watches the queue depth and spins out or removes the # of pods acting as consumers on a queue.

It seems like the only way to implement this right now is to use xinfo group to get the last delivered id and then range over (last_delivered_id and + just to get a count. If the data is slim, like "name=bob", not a big deal. If the data is like 64k binary blobs... that feels super expensive.

Comment From: itamarhaber

Hi @deepaksood619

Right now streams do not support partial length queries as the internal data structure can't provide that with reasonable time complexity. We hope we'll be able to extend the internals with Redis 7 to include this capability (cc @guybe).

As an alternative is to index the stream with a sorted set. Consider the following Lua script as a replacement to calling XADD:

-- Perform an XADD and add the (0-padded) ID to a lexicographical sorted set.
local stream, index = KEYS[1], KEYS[2]
local id = redis.call("XADD", stream, unpack(ARGV))
local sep = id:find("-")
local ts, sq = id:sub(1,sep-1), id:sub(sep+1)
local maxlen = 20  -- 2^64-1 string form
local pad = "0"
local el = pad:rep(maxlen-#ts) .. ts .. "-" .. pad:rep(maxlen-#sq) .. sq
redis.call("ZADD",index,0,el)
return id

And this script to "count" the entries from a given ID to the stream's end:

-- Return the number of stream entries from a given ID from a zset index
local index = KEYS[1]
local id = ARGV[1]
local sep = id:find("-")
local ts, sq = id:sub(1,sep-1), id:sub(sep+1)
local maxlen = 20  -- 2^64-1 string form
local pad = "0"
local el = pad:rep(maxlen-#ts) .. ts .. "-" .. pad:rep(maxlen-#sq) .. sq
local card = redis.call("ZCARD",index)
local rank = redis.call("ZRANK",index,el)
return card-(rank+1)

Example run:

❯ redis-cli --eval xaddindex.lua stream zset , "*" foo bar
"1619260831304-0"
❯ redis-cli --eval xaddindex.lua stream zset , "*" foo bar
"1619260837652-0"
❯ redis-cli --eval xaddindex.lua stream zset , "*" foo bar
"1619260841484-0"
❯ redis-cli --eval xlenindex.lua zset , "1619260831304-0"
(integer) 2
❯ redis-cli --eval xlenindex.lua zset , "1619260837652-0"
(integer) 1
❯ redis-cli --eval xlenindex.lua zset , "1619260841484-0"
(integer) 0

Comment From: nantiferov

It seems this been released in Redis v7.0, I guess issue could be closed?