Describe the bug

The code below causes redis 7.2 to get completely stuck (100% CPU, impossible to connect to the server, have to use kill -9). nb_subscribers = 2 will work fine.

The same code works as expected with redis 7.0.

To reproduce

subscriber.py

#!/usr/bin/env python3

import time

from multiprocessing import Process

from redis import Redis


nb_subscribers = 3


def subscriber(user_id):
    r = Redis(unix_socket_path='cache.sock')
    try:
        r.xgroup_create(name='tasks_queue', groupname='test', mkstream=True)
    except Exception:
        print('group already exists')

    while True:
        new_stream = r.xreadgroup(
            groupname='test', consumername=f'testuser-{user_id}', streams={'tasks_queue': '>'},
            block=2000, count=1)
        if not new_stream:
            time.sleep(5)
            continue
        print(new_stream)


processes = []
for i in range(nb_subscribers):
    p = Process(target=subscriber, args=(i,))
    p.start()
    processes.append(p)

while processes:
    new_p = []
    for p in processes:
        if p.is_alive():
            new_p.append(p)
    processes = new_p
    time.sleep(5)

print('all processes dead')

feeder.py

#!/usr/bin/env python3

import time
import uuid

from multiprocessing import Process

from redis import Redis

nb_feeders = 1


def feeder():

    r = Redis(unix_socket_path='cache.sock')

    while True:
        fields = {'task_uuid': str(uuid.uuid4())}
        r.xadd(name='tasks_queue', fields=fields, id='*', maxlen=5000)
        time.sleep(.1)


processes = []
for _ in range(nb_feeders):
    p = Process(target=feeder)
    p.start()
    processes.append(p)

while processes:
    new_p = []
    for p in processes:
        if p.is_alive():
            new_p.append(p)
    processes = new_p
    time.sleep(5)

print('all processes dead')

Comment From: enjoy-binbin

thanks for report, verified.

Comment From: enjoy-binbin

@ranshid look like it was due to #11012, it keep going, reprocess command -> blockForKeys -> reprocess command we need to find some ways to break the loop, below is a rough fix of mine, maybe you have a better view.

    /* Block if needed. */
    if (timeout != -1) {
        /* If we are not allowed to block the client, the only thing
         * we can do is treating it as a timeout (even with timeout 0). */
        if (c->flags & CLIENT_DENY_BLOCKING) {
            addReplyNullArray(c);
            goto cleanup;
        }
        /* We change the '$' to the current last ID for this stream. this is
         * Since later on when we unblock on arriving data - we would like to
         * re-process the command and in case '$' stays we will spin-block forever.
         */
        for (int id_idx = 0; id_idx < streams_count; id_idx++) {
            int arg_idx = id_idx + streams_arg + streams_count;
            if (strcmp(c->argv[arg_idx]->ptr,"$") == 0) {
                robj *argv_streamid = createObjectFromStreamID(&ids[id_idx]);
                rewriteClientCommandArgument(c, arg_idx, argv_streamid);
                decrRefCount(argv_streamid);
            }

+            /* In '>' case, if we have blocked before, reprocess the command and enter here,
+             * do not enter the block again, otherwise we will spin-block forever. */
+          if (c->bstate.timeout != 0 && strcmp(c->argv[arg_idx]->ptr, ">") == 0) {
+              goto cleanup;
            }
        }
        blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, xreadgroup);
        goto cleanup;
    }

@oranagra FYI, this issue needs attention

Comment From: oranagra

i think the right fix is to avoid an endless loop in handleClientsBlockedOnKey and handleClientsBlockedOnKeys, looks like there was some attempt in handleClientsBlockedOnKeys but maybe not sufficiently good. and it looks like using a similar trick in handleClientsBlockedOnKey is complicated. i.e. stashing the list on the stack and iterating on it after creating a fresh one for future use, is problematic since the code keeps accessing the global list. with the following ugly fix, the problem is solved, what's left is to write a tcl test that reproduces it.

diff --git a/src/blocked.c b/src/blocked.c
index 1b3a804b1..af1d5c039 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -325,7 +325,7 @@ void handleClientsBlockedOnKeys(void) {
      * (i.e. not from call(), module context, etc.) */
     serverAssert(server.also_propagate.numops == 0);

-    while(listLength(server.ready_keys) != 0) {
+    if (listLength(server.ready_keys) != 0) {
         list *l;

         /* Point server.ready_keys to a fresh list and save the current one
@@ -563,8 +563,8 @@ static void handleClientsBlockedOnKey(readyList *rl) {
         listNode *ln;
         listIter li;
         listRewind(clients,&li);
-
-        while((ln = listNext(&li))) {
+        long count = listLength(clients);
+        while((ln = listNext(&li)) && count--) {
             client *receiver = listNodeValue(ln);
             robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NOEFFECTS);
             /* 1. In case new key was added/touched we need to verify it satisfy the

Comment From: selfboot

I still have a question. If the feeder and subscribe client processes are killed, theoretically, the server should be able to close the connections of these two clients. Even in version 7.2 with the infinite loop bug, it shouldn't keep putting disconnected clients back into the queue, right?

In addition, I have another question. When I use gdb attach to the server and see that there is an infinite loop in the while loop below, I didn't see at what point Li added a client during the processing. Can you help explain this?

static void handleClientsBlockedOnKey(readyList *rl) {
        ...
        while((ln = listNext(&li))) {
            client *receiver = listNodeValue(ln);
            ... 
       }

Comment From: selfboot

By the way, here are two screenshots of my troubleshooting process after reproducing the issue myself, including a CPU flame graph analyzed by eBPF.

20230613_bug_redis_deadlock_cpu

Added r.client_setname(f'subscriber_{user_id}') in the test script, and during GDB analysis, it was observed that the receiver kept processing between two clients. client_repeat

Comment From: oranagra

I still have a question. If the feeder and subscribe client processes are killed, theoretically, the server should be able to close the connections of these two clients. Even in version 7.2 with the infinite loop bug, it shouldn't keep putting disconnected clients back into the queue, right?

maybe it shouldn't (keep processing disconnected clients), but it does. redis doesn't attempt to write (or read) to the socket while it processes a command. it'll only realize that the client dropped when it's done processing all the commands of the event loop and it either writes a reply to the socket, or attempts to read more data from it.

In addition, I have another question. When I use gdb attach to the server and see that there is an infinite loop in the while loop below, I didn't see at what point Li added a client during the processing. Can you help explain this?

when we re-process the command (and realize it should be blocked again), it calls blockForKeys (which adds another element to that list)

Comment From: selfboot

I still have a question. If the feeder and subscribe client processes are killed, theoretically, the server should be able to close the connections of these two clients. Even in version 7.2 with the infinite loop bug, it shouldn't keep putting disconnected clients back into the queue, right?

maybe it shouldn't (keep processing disconnected clients), but it does. redis doesn't attempt to write (or read) to the socket while it processes a command. it'll only realize that the client dropped when it's done processing all the commands of the event loop and it either writes a reply to the socket, or attempts to read more data from it.

In addition, I have another question. When I use gdb attach to the server and see that there is an infinite loop in the while loop below, I didn't see at what point Li added a client during the processing. Can you help explain this?

when we re-process the command (and realize it should be blocked again), it calls blockForKeys (which adds another element to that list)

Thank you very much for your answer. I will continue to learn the implementation here.