We have been evaluating the STREAMS branch of redis, and are very pleased with its behaviour. It fulfils a really important pattern for our workflow:
upstream data source emits events into a redis stream production consumer uses XREAD to pull new data immediately into the live system. The live system's database atomically updates the last read messageID along with assimilating the new data, in a transaction. non-production (e.g. development) environments can be spun up using a backup image of the production system. By using XREAD, whatever messages were queued since the backup was generated are replayed in the same order, quickly bringing the new environment into alignment with our production system, without any additional load on the upstream data source.
The only minor issue is that we would prefer to truncate the streams based on maximum message age, rather than by using MAXLEN. Some days may have many more messages than other days, and what really matters is that we are guaranteed the messages go back to prior to the last backup image.
Ideally for us, we could specify the oldest message we wish to keep. Could be by relative or absolute time, it doesn't really matter to me. Whatever is easiest or most logical to implement, so for example:
XADD MAXAGE ~ 8640000 mystream foo bar
or
XADD MINVAL ~ 1510704127488 mystream foo bar
should both remove the initial entries in the stream, up until the designated timepoint.
I imagine it would be important to support the use of MAXAGE and MAXLEN simultaneously. In my mind this should remove the maximum number of entries of the individual options.
Comment From: antirez
Hello @nicois, this feature was actually part of the initial specification, and later removed because I do not yet have a sounding solution for its implementation. The problem is that, basically, the MAXAGE option could do an unbound amount of work causing the server to block for a long time. See for instance the following scenario with MAXLEN instead of MAXAGE:
- A client pushes 2000000 elements with MAXLEN set to 1000000. At the end 1000000 elements are inside the stream, of course, since as I add after the first million Redis deletes back items from the other end.
- The client is paused for 30 minutes.
- The client resumes with the exact same workload. The length of the stream will still be 1000000 items, and as I add every new item, just a single item will be removed.
Now, let's model the same thing in the case of MAXAGE:
- A client pushes 2000000 elements with MAXAGE set to 1 hour. At the end N elements are inside the stream, since as I add after once there are already elements older than 1 hour, Redis deletes back items from the other end.
- The client is paused for 30 minutes.
- The client resumes with the exact same workload. At the first XADD command, half the items will be found to be expired, and Redis will block to recall them all until the stream is of length N/2 approximately.
The problem with "3" is that it creates a latency issue if Redis will really obey removing all the expired items. If it stops after M items, than the semantics of the command will be hard to explain. Moreover all the clients accessing to reading the stream from moment "2" to moment "3" will see expired items that should not be there.
Basically it looks like a sounding feature, but after considering the actual behavior, it looks like not reliable. On top of that, the actual stream that we can store without problem, is mostly memory-bound, and memory is proportional to the number of items, so even planning for the age is not simple and depends on the amount of items added per unit of time.
Because of all the problems above, for now I decided to pause and do not add the feature until it's more clear what to do... I hope this makes sense. Thanks!
Comment From: nicois
Thanks for that clarification, @antirez. I understand the difficulty with the implementation, and agree that it is better to hold off than risk adding latency.
Even if it is not ideal, businesses can certainly monitor the oldest element with XRANGE, and take action should its age be less than a given threshold.
With absolutely no knowledge about the internal implementation of streams, I would have thought it possible to mark an element of the stream as the oldest, and defer the garbage collection to a background thread. But there are probably performance or stability implications I am overlooking.
Thanks again for your clarifications.
Comment From: antirez
Thank you @nicois. About possible solutions, Redis is single threaded so even if threads are used no operation on shared data structures are actually used, and in general there is a stress about not implementing O(N) operations if not really semantically needed. However there are other potential solutions. For instance Redis streams are already able to purge elements (when MAXLEN is used) by wohle-nodes in the radix tree. Because each node contains many tens of elements, the reclaiming process when, for instance, the user changes the MAXLEN argument from 1000000 to 500000, is much faster than it would be otherwise. By applying the same concept to age expiration, and bounding the max number of nodes we remove per call, it looks like quite an acceptable feature, so I believe we'll end doing something like that...
If we check the example above again, we go from 1 million to 5k elements because of the 30 seconds delay. Suppose each radix tree node contains 100 items (not always but reasonable for small stream items), even removing just 100 nodes maximum per XADD-MAXAGE call, we can recover to the right stream length in just 50 calls, so externally there will be no obvious added latency nor old elements remaining active for a long time.
This needs some documentation effort in order for the users to understand the exact semantics, that is, the XADD+MAXAGE call may not remove all the items that have already a TTL which is too large in order to avoid blocking, but will remove a maximum of (for instance) 100 nodes that may translate to a number of items in the range of 100-20000 depending on the size of the items. After all this feature may end to be so needed to justify the complexity to explain such an implementation. Keeping the issue open...
Comment From: TravisJoe
I am looking to implement streams for a time series project. Often there is some concept of data retention, expiration (like Redis expire), or some max age (like requested here). I see there are a couple ways to limit based on memory size or max length, but they are not necessarily time based.
If there is not a way to set an entries "life" settings when adding it, does this mean the best way to deal with this is to have a worker monitor the stream and then explicitly delete old entries?
Comment From: jakelowen
I have no idea if this is a feasible approach, but I worked up this little snippet / set of helper functions in nodejs to a) interrogate a stream for it's consumer groups b) interrogate all consumers to identify oldest pending/processed keys c) identify index of said key in the stream and finally d) trim the stream to that key length
This is NOT battle tested. Just a sketch. Would love feedback on this approach: https://gist.github.com/jakelowen/22cb8a233ac0cdbb8e77808e17e0e1fc
Comment From: arikastarvo
How about this LUA script? Really hackish solution, but does the trick (in this example - keeps the data for the last 60 sec). No idea how will it perform under heavy load.
Brief desc: find all stream keys, iterate -> get all elements in key older than x, get key total length, call xtrim maxlen with (key total length - count(old elements))
local keys = redis.call('KEYS','streams:*')
local time = (redis.call('TIME')[1] - 60) * 1000
local i = 0
for _,key in pairs(keys) do
local objs = redis.call('XRANGE', key, '-', time)
local total = redis.call('XLEN', key)
local keep = (total - #objs)
if keep < total then
i = i + redis.call('XTRIM', key, 'MAXLEN', '~', keep)
end
end
return i
**Comment From: jaamison**
@antirez, what about a "MINID" (minimum id) trimming scheme? This could be applied to both XADD and XTRIM to keep parity between the two commands.
`XADD mystream MINID ~ 1570385173689-0 * field1 value1`
and
`XTRIM mystream MINID ~ 1570385173689-0`
Instead of worrying about an entry's age (which is the dynamic interval CURRENT_TIME - ENTRY_CREATION_TIME) why don't we take advantage of the correlation of stream IDs and timestamps and instead address entries relative to their creation time instead of their current "age"?
The MINID parameter is determined by the application for each command it sends. This way, any extra latency incurred will only be at the explicit direction of the user. Yes, it is possible to trigger a large, costly computation, but at least the user will be in complete control over their expectations regarding performance, instead of unpredictable surprise latency spikes. And regular XADD calls (without MINID or MAXLEN) will never be adversely affected by any automatic background trimming or gc.
This inverts the thinking of a TTL for entries, by making the operation explicit and 100% user-initiated, rather than maintaining each entry's own TTL as a sort of implicit state. IMO, this stateless, declarative per-command approach fits much better with the Redis philosophy.
If an application is already going to send an XADD command to the server, then quickly calculating a MINID param to send along with it should be trivial.
For example, an application that writes to a stream once every minute and wants to maintain roughly the last hour of entries would send something like the following:
XADD mystream MINID ~ 1570385100000-0 * field1 value1 XADD mystream MINID ~ 1570385160000-0 * field1 value1 XADD mystream MINID ~ 1570385220000-0 * field1 value1 XADD mystream MINID ~ 1570385280000-0 * field1 value1 XADD mystream MINID ~ 1570385340000-0 * field1 value1
Of course the user would now have complete control over how frequently they instruct the stream to be trimmed. Any performance penalty for frequent trims would only be incurred at the explicit direction of the user. Likewise any performance penalty for large/expensiver trims would only be incurred at the explicit direction of the user. Certain operations that are performance-sensitive can issue a plain XADD with no MINID or MAXLEN and can be guaranteed the constant time performance of a classic XADD operation without any regard to trimming.
**Comment From: nicois**
I'm not sure what benefit the MINID argument would provide. The real cost
is not converting a timestamp to a message ID, but rather the freeing up of
a variable number of chunks of stream components. From Antirez' original
reply, the message chunks need to be freed serially, in linear time, which
is the main reason he didn't like this idea.
A second point he had, which also has merit, is that a time-based trim
means you may end up with a variable number of messages, potentially
overloading your system during periods of high usage and causing redis to
randomly drop objects in its database. By tuning each stream via a maximum
stream size, this is more stable, albeit at the cost of how far back in
time the stream goes. He saw this as a lesser evil, and I can understand
why.
In addition, for your example above with MINID, if you write every minute
and want to keep the last hour, you can equally well tell it to keep only
the last ~60 messages for the same effect, using currently available syntax.
On Mon, 7 Oct 2019 at 06:33, jamison judge <notifications@github.com> wrote:
> @antirez <https://github.com/antirez>, what about a "MINID" (minimum id)
> trimming scheme? This could be applied to both XADD and XTRIM to keep
> parity between the two commands.
>
> XADD mystream MINID ~ 1570385173689-0 * field1 value1
>
> and
>
> XTRIM mystream MINID ~ 1570385173689-0
>
> Instead of worrying about an entry's age (which is the dynamic interval
> CURRENT_TIME - ENTRY_CREATION_TIME) why don't we take advantage of the
> correlation of stream IDs and timestamps and instead address entries
> relative to their creation time instead of their current "age"?
>
> The MINID parameter is determined by the application for each command it
> sends. This way, any extra latency incurred will only be at the explicit
> direction of the user. Yes, it is possible to trigger a large, costly
> computation, but at least the user will be in complete control over their
> expectations regarding performance, instead of unpredictable surprise
> latency spikes. And regular XADD calls (without MINID or MAXLEN) will never
> be adversely affected by any automatic background trimming or gc.
>
> This inverts the thinking of a TTL for entries, by making the operation
> explicit and 100% user-initiated, rather than maintaining each entry's own
> TTL as a sort of implicit state. IMO, this stateless, declarative
> per-command approach fits much better with the Redis philosophy.
>
> If an application is already going to send an XADD command to the server,
> then quickly calculating a MINID param to send along with it should be
> trivial.
>
> For example, an application that writes to a stream once every minute and
> wants to maintain roughly the last hour of entries would send something
> like the following:
>
> XADD mystream MINID ~ 1570385100000-0 * field1 value1
> XADD mystream MINID ~ 1570385160000-0 * field1 value1
> XADD mystream MINID ~ 1570385220000-0 * field1 value1
> XADD mystream MINID ~ 1570385280000-0 * field1 value1
> XADD mystream MINID ~ 1570385340000-0 * field1 value1
>
> Of course the user would now have complete control over how frequently
> they instruct the stream to be trimmed. Any performance penalty for
> frequent trims would only be incurred at the explicit direction of the
> user. Likewise any performance penalty for large/expensiver trims would
> only be incurred at the explicit direction of the user. Certain operations
> that are performance-sensitive can issue a plain XADD with no MINID or
> MAXLEN and can be guaranteed the constant time performance of a classic
> XADD operation without any regard to trimming.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/antirez/redis/issues/4450?email_source=notifications&email_token=AABZYEWH6M2LTPYC5LHIFVDQNI4QXA5CNFSM4EEM7BK2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEAOSBAY#issuecomment-538779779>,
> or mute the thread
> <https://github.com/notifications/unsubscribe-auth/AABZYEVB5ADGMZKFA5QP6FLQNI4QXANCNFSM4EEM7BKQ>
> .
>
**Comment From: jaamison**
@nicois I didn't mean to imply that timestamp->id conversion was a hurdle (and I'm not sure how it could be: in the vast majority of cases they're the same.....). Yes, I understand that (as with any trimming scheme) there is a potentially unbounded amount of computation that will be required that cannot be known in advance, and that creates the potential for unexpected latency.
This solution does not eliminate that cost - it simply confines it to situations when the client is acknowledges that it is aware of and prepared for the cost; while at the same time allowing requests that are NOT willing to incur the cost to be 100% exempted from it. Essentially, this is pushing the trade-off decision down to the user/application rather than the server bearing responsibility for determining and maintaining the balance between functionality and performance.
Regardless of these difficulties, there are lots of valid use cases for time-based (rather than positional index) retention policies for streams, and this has been a frequently raised issue in the community fairly consistently ever since the beginning of redis streams.
The point is that it is possible to acheive time-based trimming without unwanted or unexpected latency from a client's perspective. All I am saying is that **IF** redis wants to address this concern, then this is a way of implementing that functionality without incurring some of the negative side effects that @antirez mentioned above.
I agree with the thinking that tuning a stream's length to a constant number of entries is a sort of "lesser of two evils", but what I am proposing is a system in which each and every consumer can decide for themselves which "evil" (if any) they are able to tolerate, and in which the performance of any single command is not dependent on the server's existing state or on the previous side effects of other clients.
Also, my one-per-minute case was just a naive example to illustrate the client providing its own incrementing id arguments. My point was to demonstrate a case where the client is explicitly issuing trim parameters on a per-command basis rather than trimming happening implicitly in the background by the server. It's just an abstract example, not a real world problem that needs a specific solution.
**Comment From: jaamison**
Re-reading I think I misunderstood the discussion here. I had been assuming (incorrectly I believe) that a hypothetical MAXAGE would have imposed an automatic TTL on stream entries, as opposed to a one-time trim operation performed in conjunction with the XADD. I can see now why my comment must have come across as weird & confusing - sorry for the distraction!
**Comment From: jaamison**
As far as documentation and explaining behavior to users, one possibility could be
XADD mystream ALLOWTRIMTO 1234234232-2 * field1 value1 XTRIM mystream ALLOWTRIMTO 1232132133-0 ```
Using the term "allow" makes it less of a direct instruction and more of an understanding that someone/something else may (or may not) perform the action on your behalf at a later time.
Docs could state that specifying ALLOWTRIMTO with an entry ID permits the server to opportunistically attempt to remove as many old items up to ID from the stream as it reasonably can without degrading performance. There is no guarantee that it will remove any items at all, but it will never remove the item at ID or newer.
Comment From: dudo
This is essentially garbage collection. Have we considered a separate process to clean up the tail of the stream when expiration happens? Seems like the worry is around blocking when xadd is called, due to the cleanup on call.
I’m curious, why doesn’t this same worry impact traditional keys? They take an expire key.
Comment From: thestephenromano
Traditional keys are O(1) lookups. A stream's bound could really be anything and a subsequent "trim" operation would be non-deterministic in time. I believe that is the crux of the problem.
Comment From: remzicanaksoy
If a daily clean-up is wanted, can't we just run daily jobs which will find the difference between today's length and yesterday's length and delete the old values by using the difference. In other words, we can keep a key called previousLength. The daily job basically will get the current length of the stream and calculate the difference as numberOfEntriesInLastDay = currentLength - previousLength. And basically run XTRIM mystream MAXLEN numberOfEntriesInLastDay and update previousLength as numberOfEntriesInLastDay. (Or you should update as the length of the stream after command if you are using ~)
Comment From: oranagra
8169 added MINID trimming strategy, which can be used as MAXAGE.
i.e. setting MINID to the current time minus the desired age.