Hi all,
I am pretty new in using redis streams, I am having issues in retrieving all the messages that being sent from one stream to another.
Please note that I am not using the consumer groups in this case.
How I am using streams in my services:
Service A:
`const sendOcppMessage = async (message) => {
await redisClient.xadd(
config.redis.serverStream,
"MAXLEN",
"~",
"1000",
"*",
"message",
JSON.stringify(message)
);
};
`
Service B: Consumer stream
`async getMessages(streamName: string): Promise
const msgs = messages![0][1].map((message) => {
const id = message[0];
const rawMsg = message[1][1];
return { id: id, rawMsg: rawMsg };
});
return msgs;
} catch (err) {
logger.error({
error: `Failed to read the message from stream - ${streamName} !`,
});
}
}`
In my consumer stream I am using a while loop to call the getMessages as such:
while(true){
const msgs = await getMessages(streamName);
}
And then I am using a for of loop to process those block of messages retrieved as an array.
for await (const msg of msgs) {
}
Despite this I am not getting all the data that's being passed from service A to B. Is there anything that I am doing wrong?
Any help and guidance would be highly appreciated.
Thank you
Comment From: ranshid
@Kripu77 just a side not: since this is a general Redis question and not a reported issue, I suggest next time open a discussion item rather than an issue.
To you question I can see that you consumer is always using the '0' id in the xread command. this means that the read will always attempt to read the massages from the beginning of the stream. Usually applications wanting to consume a stream keeps the last read msg ID and use it in the next xread attempt thus reading only new massages. also the stream max size in your case is being managed by the producer service (via MAXLEN) which means that in caese many producers are placing msgs to the stream, it will potentially clear out msgs which will not be consumed. In case it is important for your application to make sure all msgs are read from the stream, I suggest you make your consumer services trim the stream after reading msg bulks (via XTRIM command)
Comment From: Kripu77
Hi @ranshid Thank you for your feedback.
I was using the xread command with '0' just to ensure that all the messages are being read even in case of failover. In addition, I was deleting each message once they were being read and completed some business operations as such:
`for await (const msg of msgs) {
const {id, rawMsg} = msg;
redis.xdel(streamName, id);
}`
Thank you for other insights, I will definitely consider the usage of XTRIM command.
Comment From: Kripu77
I was able to solve my issue by creating consumer groups in my consumer service and using the XREADGROUP command.