Hello, I am trying to encapsulate a task scheduling framework using redis stream, My idea is that a function corresponds to a consumer group. I found a problem during the simulation, that is, every time I listen to a group, there is one more connection., What does this mean?If I have a lot of tasks, there will be unimaginable many connections

Screenshot from 2022-12-09 10-06-39

code

import uuid
from concurrent.futures import ThreadPoolExecutor

import redis


class Redis7:
    def __init__(
            self,
            redis_pool: redis.ConnectionPool,
    ):
        self.last_id = '0-0'
        self.next_read_loop = False

        self.client = redis.Redis(connection_pool=redis_pool)

    def accept_messages(
            self,
            i,
            consume_func,
            queue_name='leek',
            group_name='group_v1',
            consumer_name=str(uuid.uuid4()),
    ):
        print(i)
        names = self.client.exists(queue_name)
        if names < 1:
            self.client.xgroup_create(queue_name, group_name, '0-0', mkstream=True)

        check_backlog = True

        self.client.xautoclaim(queue_name, group_name, consumer_name, 0, 0, count=25)
        consumers = self.client.xinfo_consumers(queue_name, group_name)
        for consumer in consumers:
            if consumer['pending'] == 0:
                self.client.xgroup_delconsumer(queue_name, group_name, consumer['name'])

        while True:
            if self.next_read_loop:
                continue
            if check_backlog:
                stream_id = self.last_id
            else:
                stream_id = '>'
            items = self.client.xreadgroup(
                group_name, str(consumer_name),
                {queue_name: stream_id},
                block=100,
                count=1
            )
            if not items:
                continue
            check_backlog = False if len(items[0][1]) == 0 else True
            for ids, value in items[0][1]:
                consume_func(value)
                self.client.xack(queue_name, group_name, ids)
                self.last_id = ids

    def send(self, message, queue_name='leek'):
        x = self.client.xadd(queue_name, message)
        print(x)


if __name__ == '__main__':
    def task(value):
        print('task', value)


    pool = ThreadPoolExecutor(100)
    broker = 'redis://127.0.0.1:6379/0'
    pool_redis = redis.ConnectionPool.from_url(broker, decode_responses=True)
    r = Redis7(pool_redis)
    for i in range(4):
        r.send({'值': i})
    for i in range(100):
        pool.submit(r.accept_messages, i, task)

Ask a few questions!

  1. I don't know if my idea of a task function corresponding to a consumption group meets the reasonable concept of redis stream. If it meets, then it is normal to listen to multiple streams and generate multiple connections?
  2. I have not found the use of redis stream as a message intermediate task scheduling framework on github for the time being. If you know, you can leave a few warehouse links. Python go rust. net is fine.

Comment From: itamarhaber

Hello @InfernalAzazel

Every XREADGROUPneeds a connection, so the correct claim is that every consumer in the group requires a connection. That said, one assumes that your consume_func takes more time than reading the messages does (excluding block time), so you can close/return the connection to the pool before entering the items' processing loop.

If I have a lot of tasks, there will be unimaginable many connections

How much is "a lot" and "unimaginable many"? My laptop easily handles 10K concurrent clients and the Redis server :) To scale further, you can use the cluster to increase concurrency after sharding the data.

Comment From: InfernalAzazel

Thank you very much for your guidance, I currently have two plans, But it's hard to choose I want to hear your suggestions 1.Each consume_func corresponds to an XREADGROUP (Multiple connections) 2.list[consume_func] 对应只有一个XREADGROUP (Only one connection)