Write redis stream app Each app can only listen to different consumer names Use this way docker swarm deploy Can't dynamically expand capacity Because it will cause all copies to listen to a consumer name I tried to automatically generate consumer names, but redis will have many consumer names

Screenshot from 2022-11-25 15-31-27

Screenshot from 2022-11-25 15-35-16

Simulate distributed

import time
import uuid

import redis
import multiprocessing


def read():
    c = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
    msg = c.xpending('测试', '小组')
    res = c.xrange('测试', min=msg['min'], max=msg['max'], count=msg['pending'])
    print(msg)
    print(res)
    for k, v in res:
        print(k)
        c.xgroup_setid('测试', '小组', k)


last_id = '0-0'


def task(consumer_name, number):
    print(consumer_name, number)
    global last_id
    check_backlog = True
    c = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)

    while True:
        if check_backlog:
            stream_id = last_id
        else:
            stream_id = '>'
        items = c.xreadgroup('小组', str(consumer_name), {'测试': stream_id}, block=100, count=1)
        if not items:
            continue
        check_backlog = False if len(items[0][1]) == 0 else True
        for ids, value in items[0][1]:
            print(f'处理对象{consumer_name}:', value)
            # c.xack('测试', '小组', ids)
            # print(message)
            last_id = ids
        time.sleep(2)


if __name__ == '__main__':

    pool = multiprocessing.Pool(5)
    for i in range(4):
        pool.apply_async(task, args=('i', i))
    # pool.apply_async(read)
    pool.close()
    pool.joi
  1. Listening to the same consumer will lead to repeated consumption
  2. If you manually control the listening consumer name, but you can't deploy it in a distributed manner
  3. If consumer names are generated, redis will have many consumer names that are difficult to maintain.

Comment From: itamarhaber

Hello @InfernalAzazel

I'm afraid I can't follow most of what you've posted. However.

If consumer names are generated, redis will have many consumer names that are difficult to maintain.

Consumer names should be generated and should be unique for each consumer. Consumers can die. The application should handle claiming pending messages from idle consumers (see XAUTOCLAIM). The app can also call XINFO CONSUMERS and issue an XGROUP DELCONSUMER for any consumer with no pending messages and idle time above some threshold.

Comment From: InfernalAzazel

Hello @InfernalAzazel

I'm afraid I can't follow most of what you've posted. However.

If consumer names are generated, redis will have many consumer names that are difficult to maintain.

Consumer names should be generated and should be unique for each consumer. Consumers can die. The application should handle claiming pending messages from idle consumers (see XAUTOCLAIM). The app can also call XINFO CONSUMERS and issue an XGROUP DELCONSUMER for any consumer with no pending messages and idle time above some threshold.

Thank you for your guidance

This effect has been achieved

import time
import uuid

import redis
import multiprocessing

last_id = '0-0'


def task(consumer_name, number):
    print(consumer_name, number)
    global last_id
    check_backlog = True
    c = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
    xc = c.xautoclaim('测试', '小组', consumer_name, 0, 0, count=25)
    consumers = c.xinfo_consumers('测试', '小组')
    for consumer in consumers:
        if consumer['pending'] == 0:
            c.xgroup_delconsumer('测试', '小组', consumer['name'])

    while True:
        if check_backlog:
            stream_id = last_id
        else:
            stream_id = '>'
        items = c.xreadgroup('小组', str(consumer_name), {'测试': stream_id}, block=100, count=1)
        if not items:
            continue
        check_backlog = False if len(items[0][1]) == 0 else True
        for ids, value in items[0][1]:
            print(f'处理对象{consumer_name} {number}:', value)
            # c.xack('测试', '小组', ids)
            # print(message)
            last_id = ids
        time.sleep(2)


if __name__ == '__main__':
    pool = multiprocessing.Pool(5)
    for i in range(4):
        pool.apply_async(task, args=(str(uuid.uuid4()), i))
    pool.close()
    pool.join()