Write redis stream app Each app can only listen to different consumer names Use this way docker swarm deploy Can't dynamically expand capacity Because it will cause all copies to listen to a consumer name I tried to automatically generate consumer names, but redis will have many consumer names
Simulate distributed
import time
import uuid
import redis
import multiprocessing
def read():
c = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
msg = c.xpending('测试', '小组')
res = c.xrange('测试', min=msg['min'], max=msg['max'], count=msg['pending'])
print(msg)
print(res)
for k, v in res:
print(k)
c.xgroup_setid('测试', '小组', k)
last_id = '0-0'
def task(consumer_name, number):
print(consumer_name, number)
global last_id
check_backlog = True
c = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
while True:
if check_backlog:
stream_id = last_id
else:
stream_id = '>'
items = c.xreadgroup('小组', str(consumer_name), {'测试': stream_id}, block=100, count=1)
if not items:
continue
check_backlog = False if len(items[0][1]) == 0 else True
for ids, value in items[0][1]:
print(f'处理对象{consumer_name}:', value)
# c.xack('测试', '小组', ids)
# print(message)
last_id = ids
time.sleep(2)
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
for i in range(4):
pool.apply_async(task, args=('i', i))
# pool.apply_async(read)
pool.close()
pool.joi
- Listening to the same consumer will lead to repeated consumption
- If you manually control the listening consumer name, but you can't deploy it in a distributed manner
- If consumer names are generated, redis will have many consumer names that are difficult to maintain.
Comment From: itamarhaber
Hello @InfernalAzazel
I'm afraid I can't follow most of what you've posted. However.
If consumer names are generated, redis will have many consumer names that are difficult to maintain.
Consumer names should be generated and should be unique for each consumer. Consumers can die. The application should handle claiming pending messages from idle consumers (see XAUTOCLAIM). The app can also call XINFO CONSUMERS and issue an XGROUP DELCONSUMER for any consumer with no pending messages and idle time above some threshold.
Comment From: InfernalAzazel
Hello @InfernalAzazel
I'm afraid I can't follow most of what you've posted. However.
If consumer names are generated, redis will have many consumer names that are difficult to maintain.
Consumer names should be generated and should be unique for each consumer. Consumers can die. The application should handle claiming pending messages from idle consumers (see
XAUTOCLAIM). The app can also callXINFO CONSUMERSand issue anXGROUP DELCONSUMERfor any consumer with no pending messages and idle time above some threshold.
Thank you for your guidance
This effect has been achieved
import time
import uuid
import redis
import multiprocessing
last_id = '0-0'
def task(consumer_name, number):
print(consumer_name, number)
global last_id
check_backlog = True
c = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
xc = c.xautoclaim('测试', '小组', consumer_name, 0, 0, count=25)
consumers = c.xinfo_consumers('测试', '小组')
for consumer in consumers:
if consumer['pending'] == 0:
c.xgroup_delconsumer('测试', '小组', consumer['name'])
while True:
if check_backlog:
stream_id = last_id
else:
stream_id = '>'
items = c.xreadgroup('小组', str(consumer_name), {'测试': stream_id}, block=100, count=1)
if not items:
continue
check_backlog = False if len(items[0][1]) == 0 else True
for ids, value in items[0][1]:
print(f'处理对象{consumer_name} {number}:', value)
# c.xack('测试', '小组', ids)
# print(message)
last_id = ids
time.sleep(2)
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
for i in range(4):
pool.apply_async(task, args=(str(uuid.uuid4()), i))
pool.close()
pool.join()