I have a question regarding Redis Streams.

I am developing an application in a K8s environment, and I am using K8s pod name as consumer name. The K8s pod name changes randomly each time the pod is started, so the number of no loonger needed consumers in the consumer group keeps growing.

Should I clean up the consumer? (XGROUP DELCONSUMER) And, do you have any recommendations on how to clean it?

I have considered using XINFO CONSUMERS to clean up consumers with long idle times. However, the idle time could also be long when the message is simply not produced/consumed in the stream.

related: https://stackoverflow.com/questions/71690425/consumer-naming-strategy-for-spring-data-redis-redis-streams-with-consumer-grou

Comment From: zuiderkwast

Hi @be-hase. There was no response, so I guess nobody knows the answer. :-)

I guess one way would be to store all consumers in a separate place in redis. Each consumer can periodically set a timestamp for itself somewhere, using a separate redis command, for example HSET. Then you can have a cron job for cleaning up consumers that have very old timestamps.

Comment From: qrli

Just to share my experience:

As a proof of concept, I tried to build a message queue solution based on Redis stream and consumer group. It could work relatively well when running on virtual machines where I can use the computer name as (or as part of) the identity of consumer. When the microservice crashes, it can restart and resume to be the same consumer.

On Kubernetes, the pod got a new random computer name on each start, so I have to clean up the orphaned consumers as they hold the the messages which they haven't finished processing. The official Redis doc described time-based reclaim solution, which cleans up consumers stuck for too long. It could work for case where message handling are typically fast, e.g. within 1 minute, or where it is ok to reprocess failed message hours later, but not for my case where a message could take 10 minutes or more while I also need messages to be processed as soon as possible.

For the solution: * An obvious one is to use Kubernetes's stateful set feature, which will give stable computer names to pod. However, it will still fail if I need to scale in the stateful set, which cause orphaned consumers. And as I'm dealing with a large existing system, it is not appropriate to move all microservices to stateful set just because of this specific Redis solution. * I tried to use heartbeat for each consumer. While it kind of works, it is not easy to do right, more suitable for driver/library to encapsulate. And the minimum timeout for reclaiming dead consumers has to be at least 2 heartbeat intervals, which gives me difficulty in balancing between failure recovery latency and cost & reliability of heartbeat. The reasonable recovery latency choice is between 30 second to 1 minute, in my estimation. * This made me wish that Redis to provide an option/setting to auto-reclaim a consumer if its socket connection is closed (or something similar to that, as it is not possible for other consumers to detect just in time), which would cover majority failure cases with 0 latency. Then the heartbeat could cover the rest rarer cases (OS crash, network down, consumer hang, etc.).

Comment From: be-hase

@qrli Thanks for the details. It is very helpful.

Comment From: zuiderkwast

@qrli

This made me wish that Redis to provide an option/setting to auto-reclaim a consumer if its socket connection is closed (or something similar to that, as it is not possible for other consumers to detect just in time), which would cover majority failure cases with 0 latency.

Do you want to suggest a new feature for this? I can imagine these possibilities:

  1. An extra option to XGROUP CREATECONSUMER which causes it to be deleted when the client connection is lost. (Ideas: DEL-ON-CLOSE or TEMPORARY or TRANSIENT.)
  2. An extra option to XGROUP CREATE that makes all automatically and explicitly created consumers in this group be deleted when their client connections are lost.
  3. A config option.

Comment From: qrli

@zuiderkwast To me, either 1. or 2. will work. And it is not only to delete to consumer but also to reclaim its unfinished messages to other alive consumers, evenly (which could be a new feature of reclaim by itself). In extreme case where all consumers lose connections, Redis server may need to delay the work until some consumer come online.

In addition, I guess the complete solution will need more. As I can think of, there are 2 categories of reasons for connection close: - A. The client executable closed or crashed. (This is the case where we want Redis to delete the consumers immediately.) - B. The client is still working but the network cut it. E.g. a load balancer reset the TCP connection for some reason.

In the B case, there are 2 possible handlings: - B1. The client need to know (by error of subsequent commands?) that Redis server has deleted that consumer, so that client drop the current work and try to recreate consumer. - B2. Optionally, Redis server support waiting for some short graceful period before deleting the consumer, so that client can reconnect and continue working as nothing happened.

Comment From: zuiderkwast

Good points @qrli.

And it is not only to delete to consumer but also to reclaim its unfinished messages to other alive consumers, evenly (which could be a new feature of reclaim by itself).

I don't know what you mean by "reclaim". It is not possible to move messages back from pending to new, because the message id can only be increasing. We can't move old messages back to the stream. I think we need to let the messages remain as "pending" so other consumers can take them using XAUTOCLAIM. If they use COUNT 1, they will be evenly distributed, right?

So, you're right that it is not good to just delete the consumer like XGROUP DELCONSUMER does, because it also deletes the pending entries so no other consumers will get them. It is a dangerous behaviour of XGROUP DELCONSUMER. (Maybe a new flag can be added to make it more safe, something like XGROUP DELCONSUMER key group consumer IF-NOTHING-PENDING...)

Here is an idea: When a connection is lost, we can delay deleting the consumer until all pending entries have been claimed by other consumers. If the same consumers comes back (using a command like XREADGROUP) we cancel deleting the consumer.

This seems to be an area of streams where the design is not yet complete. Maybe @itamarhaber wants to join the discussion?

Comment From: qrli

I don't know what you mean by "reclaim". It is not possible to move messages back from pending to new, because the message id can only be increasing. We can't move old messages back to the stream.

Sorry, I should have written "claim", I did mean something similar to XAUTOCLAIM.

Here is an idea: When a connection is lost, we can delay deleting the consumer until all pending entries have been claimed by other consumers. If the same consumers comes back (using a command like XREADGROUP) we cancel deleting the consumer.

That's an interesting idea. I like it.

Comment From: nyongja

I am worried about a similar problem.

I want to use k8s pod name as consumer name. Will there be a problem if I don't clean up the old consumer? I will handle the pending message to the old consumer with xautoclaim.

Or is there any other good way to pin a consumer name in K8S?

Comment From: Guillermogsjc

i am using Statefulset and environment variable HOSTNAME to achieve persistent names on consumers of groups in streams.

But it has problems, it introduces unnecesary STS when a Deployment pattern would be better or just fine. STS includes problems if you work with spot and scales much slowly.

A solution, would be to introduce in the stop of the pod, a call to a variant of XAUTOCLAIM that would not need to specify the target consumer, just delete the consumer in the group (imagine a pod with HOSTNAME my-deployment-2jdk4231 with ephemeral name)

i added a specific feature request regarding this cleanse https://github.com/redis/redis/issues/13602

Comment From: kodeine

i am facing the same issue, i jumped to statefulset but now the problem is when we push updates to the code we have to face downtime because all these statefulset pods will be killed. without statefulset it was perfect because while new pods are being set up old ones stay alive.