Using RESP3 Streaming string + key locking (https://github.com/redis/redis/issues/11396)
Streaming strings implementation
We may want to periodically consume the buffer on the target side when streaming in the data. If we are migrating a 5GB command, we don't really want to buffer it in the target input buffer and then process it all at once, given that it will cause a latency spike. All of the RDB loading code allows incremental loading, so we can periodically call the load operation as we are streaming the command.
We may also instead consider doing chunking on the command side. We could introduce a command like RESTORE CHUNK X and allow incrementally restoring pieces of a large command. Example:
RESTORE CHUNK-START <data>
RESTORE CHUNK <data>
RESTORE CHUNK <data>
RESTORE CHUNK-COMPLETE <data> <final checksum>
The current proposal for the atomic slot migration also obviates the need for the non-blocking dump command. So we can consider decoupling the dump vs restore commands.
Comment From: DarrenJiang13
@madolson Here is a draft design for dump/restore chunks. In Alibaba we have a similar design which has already been implemented online for days and we are glad to see this issue being brought in community.
I would like to introduce from 2 aspects: 1. command design ; 2. a non-blocking restore mechanism with state machine. (Actually dump command is not essential when we do async chunk restore.)
This is a related article in Chinese version, not translated yet: https://developer.aliyun.com/article/919126
1. Command design
If we just want to write code for a dump+restore command in chunks, it should be not that complicated. Here is one draft design for chunk dump and restore.
1.1 DUMP
DUMP CHUNK START|END KEYNAMEDefineserver.migrating_keyswhich is a dict to lock keys to avoid unexpected modifications. Add keys to it when dump chunk starts, and remove it when ends.DUMP CHUNK KEYNAME: split an object to a series of rdb chunks, and return 2 paramsDUMPED_PAYLOAD,DUMPED_INDEXDUMPED_PAYLOAD: split a long rdb to chunks likeTYPE | CHUNK-DATA1 | RDB_VERSION | CRC64、TYPE | CHUNK-DATA2 | RDB_VERSION | CRC64DUMPED_INDEX: return number of left element to be dumped. If reach 0, dump finished.
1.2 RESTORE
RESTORE key ttl serialized-value [CHUNK START|STEP|END VERSION] [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency]
- RESTORE key ttl serialized-value [CHUNK START|END VERSION]...
- Defineserver.importing_keyswhich is a dict to lock keys to avoid unexpected modifications. Add keys to it when
- Only load ttl for CHUNK END VERSION
- RESTORE key ttl serialized-value [CHUNK STEP VERSION]...
- Restore object chunk by chunk, nothing special.
- VERSIONis preserved for some scenes like migration to keep consistency.
1.3 Other issues to be mentions in this process
- Big value: could not be split like a string whose size is larger than 2GB;
- Modules: needs to have a common API which could dump and restore by chunks (this could be discussed later);
- Commands ban: flushall/flushdb/... should be banned when chunk commands is running;
- Expire/evict ban: source/target nodes during chunk migration should not do expire/evict;
- Replication: if the target primary has replicas, propagate chunks one by one rather than propagate all chunks together. And force flushing to slave when restore ends;
Here is a graph for this process
Comment From: DarrenJiang13
2. Async restore design in migrating with state machine
If we do this in code process rather than pure dump + restore commands, we could do async chunk restore with state machine. Here is a draft design.
2.1 global view
Here is a global view:
- Source node registers a read event with
readFromDestNode()and a write event withwriteToDestNode(); - Run
runMigrateMachine()in loops, which would write data to buffer calledmigrating->wbuf, and change state machine; writeToDestNode()would transmit data inmigrating->wbufto target node asynchronously, so there would be no blocking waiting for sync;readFromDestNode()would also change the state machine;
2.2 state machine
Here is the state machine in source node for async chunk restore:
- When start migrating
key1, start fromMIGRATING_STEP_UNUSE, lockkey1in source node; - If
key1is not a bigkey, lockkey1in target node, dumpkey1to buffer likeRESTORE key1 ..., go back toMIGRATING_STEP_UNUSE, unlockkey1in both sides and wait for next task; - If
key1is a bigkey,lockkey1in target node, go toMIGRATING_STEP_CHUNK_START, sendRESTORE CHUNK STARTto target node. In target node, addkey1toserver.importing_keysto lockkey1; - When source node receives OK from target node, switch to
MIGRATING_STEP_CHUNK_PAYLOADand dumpkey1to buffer in formatRESTORE CHUNK STEP ...; - When chunk restore finishes, unlock
key1fromserver.migrating_keysin source node; - Send
RESTORE CHUNK ENDto target with ttl, unlockkey1fromserver.importing_keysin target node, go back toMIGRATING_STEP_UNUSEand wait for next task; - Write event will transmit dumped data to target node asynchronously in eventloop.
Actually for slot migration we could lock the whole slot instead of multiple key locks.
Comment From: madolson
Cool, looks like a promising design. One high level comment I have is why do we need to insert the key we are restoring into the database until before we are done recreating it. We could just leave it as a detached key, and then add it at the end. It seems like that would handle the eviction issue that was mentioned.
Also this sentence seems to trail off:
Defineserver.importing_keyswhich is a dict to lock keys to avoid unexpected modifications. Add keys to it when
I'll take a deeper look shortly.
Comment From: DarrenJiang13
Cool, looks like a promising design. One high level comment I have is why do we need to insert the key we are restoring into the database until before we are done recreating it. We could just leave it as a detached key, and then add it at the end. It seems like that would handle the eviction issue that was mentioned.
Yes! "add as a detached key at first and dbAdd it at the end" is a good way to avoid eviction in target node. But in source node we still need to avoid eviction cause this key has already been added.
Define server.importing_keys which is a dict to lock keys to avoid unexpected modifications.
This key locking mechanism still works.
For example, when we try to restore-chunk a zset key zsetkey1 to redis, this key is added as detached at first. Then we write set zsetkey1 123 to redis. When we finish restoring chunks, these two keys with same name would bring conflicts.
So we should block set zsetkey1 123 during chunk migrating process and return an ERROR like -IMPORTING or -CHUNK.