Hello, I am trying to encapsulate a task scheduling framework using redis stream, My idea is that a function corresponds to a consumer group. I found a problem during the simulation, that is, every time I listen to a group, there is one more connection., What does this mean?If I have a lot of tasks, there will be unimaginable many connections
code
import uuid
from concurrent.futures import ThreadPoolExecutor
import redis
class Redis7:
def __init__(
self,
redis_pool: redis.ConnectionPool,
):
self.last_id = '0-0'
self.next_read_loop = False
self.client = redis.Redis(connection_pool=redis_pool)
def accept_messages(
self,
i,
consume_func,
queue_name='leek',
group_name='group_v1',
consumer_name=str(uuid.uuid4()),
):
print(i)
names = self.client.exists(queue_name)
if names < 1:
self.client.xgroup_create(queue_name, group_name, '0-0', mkstream=True)
check_backlog = True
self.client.xautoclaim(queue_name, group_name, consumer_name, 0, 0, count=25)
consumers = self.client.xinfo_consumers(queue_name, group_name)
for consumer in consumers:
if consumer['pending'] == 0:
self.client.xgroup_delconsumer(queue_name, group_name, consumer['name'])
while True:
if self.next_read_loop:
continue
if check_backlog:
stream_id = self.last_id
else:
stream_id = '>'
items = self.client.xreadgroup(
group_name, str(consumer_name),
{queue_name: stream_id},
block=100,
count=1
)
if not items:
continue
check_backlog = False if len(items[0][1]) == 0 else True
for ids, value in items[0][1]:
consume_func(value)
self.client.xack(queue_name, group_name, ids)
self.last_id = ids
def send(self, message, queue_name='leek'):
x = self.client.xadd(queue_name, message)
print(x)
if __name__ == '__main__':
def task(value):
print('task', value)
pool = ThreadPoolExecutor(100)
broker = 'redis://127.0.0.1:6379/0'
pool_redis = redis.ConnectionPool.from_url(broker, decode_responses=True)
r = Redis7(pool_redis)
for i in range(4):
r.send({'值': i})
for i in range(100):
pool.submit(r.accept_messages, i, task)
Ask a few questions!
- I don't know if my idea of a task function corresponding to a consumption group meets the reasonable concept of redis stream. If it meets, then it is normal to listen to multiple streams and generate multiple connections?
- I have not found the use of redis stream as a message intermediate task scheduling framework on github for the time being. If you know, you can leave a few warehouse links. Python go rust. net is fine.
Comment From: itamarhaber
Hello @InfernalAzazel
Every XREADGROUPneeds a connection, so the correct claim is that every consumer in the group requires a connection. That said, one assumes that your consume_func takes more time than reading the messages does (excluding block time), so you can close/return the connection to the pool before entering the items' processing loop.
If I have a lot of tasks, there will be unimaginable many connections
How much is "a lot" and "unimaginable many"? My laptop easily handles 10K concurrent clients and the Redis server :) To scale further, you can use the cluster to increase concurrency after sharding the data.
Comment From: InfernalAzazel
Thank you very much for your guidance, I currently have two plans, But it's hard to choose I want to hear your suggestions 1.Each consume_func corresponds to an XREADGROUP (Multiple connections) 2.list[consume_func] 对应只有一个XREADGROUP (Only one connection)