Using RESP3 Streaming string + key locking (https://github.com/redis/redis/issues/11396)

Streaming strings implementation

We may want to periodically consume the buffer on the target side when streaming in the data. If we are migrating a 5GB command, we don't really want to buffer it in the target input buffer and then process it all at once, given that it will cause a latency spike. All of the RDB loading code allows incremental loading, so we can periodically call the load operation as we are streaming the command.

We may also instead consider doing chunking on the command side. We could introduce a command like RESTORE CHUNK X and allow incrementally restoring pieces of a large command. Example:

RESTORE CHUNK-START <data>
RESTORE CHUNK <data>
RESTORE CHUNK <data>
RESTORE CHUNK-COMPLETE <data> <final checksum>

The current proposal for the atomic slot migration also obviates the need for the non-blocking dump command. So we can consider decoupling the dump vs restore commands.

Comment From: DarrenJiang13

@madolson Here is a draft design for dump/restore chunks. In Alibaba we have a similar design which has already been implemented online for days and we are glad to see this issue being brought in community.

I would like to introduce from 2 aspects: 1. command design ; 2. a non-blocking restore mechanism with state machine. (Actually dump command is not essential when we do async chunk restore.)

This is a related article in Chinese version, not translated yet: https://developer.aliyun.com/article/919126

1. Command design

If we just want to write code for a dump+restore command in chunks, it should be not that complicated. Here is one draft design for chunk dump and restore.

1.1 DUMP

  • DUMP CHUNK START|END KEYNAME Defineserver.migrating_keyswhich is a dict to lock keys to avoid unexpected modifications. Add keys to it when dump chunk starts, and remove it when ends.
  • DUMP CHUNK KEYNAME: split an object to a series of rdb chunks, and return 2 params DUMPED_PAYLOAD,DUMPED_INDEX
    • DUMPED_PAYLOAD: split a long rdb to chunks like TYPE | CHUNK-DATA1 | RDB_VERSION | CRC64TYPE | CHUNK-DATA2 | RDB_VERSION | CRC64
    • DUMPED_INDEX: return number of left element to be dumped. If reach 0, dump finished.

1.2 RESTORE

RESTORE key ttl serialized-value [CHUNK START|STEP|END VERSION] [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] - RESTORE key ttl serialized-value [CHUNK START|END VERSION]... - Defineserver.importing_keyswhich is a dict to lock keys to avoid unexpected modifications. Add keys to it when
- Only load ttl for CHUNK END VERSION - RESTORE key ttl serialized-value [CHUNK STEP VERSION]... - Restore object chunk by chunk, nothing special. - VERSIONis preserved for some scenes like migration to keep consistency.

1.3 Other issues to be mentions in this process

  • Big value: could not be split like a string whose size is larger than 2GB;
  • Modules: needs to have a common API which could dump and restore by chunks (this could be discussed later);
  • Commands ban: flushall/flushdb/... should be banned when chunk commands is running;
  • Expire/evict ban: source/target nodes during chunk migration should not do expire/evict;
  • Replication: if the target primary has replicas, propagate chunks one by one rather than propagate all chunks together. And force flushing to slave when restore ends;

Here is a graph for this process non-blocking-chunk-restore

Comment From: DarrenJiang13

2. Async restore design in migrating with state machine

If we do this in code process rather than pure dump + restore commands, we could do async chunk restore with state machine. Here is a draft design.

2.1 global view

global-view Here is a global view:

  1. Source node registers a read event with readFromDestNode() and a write event with writeToDestNode();
  2. Run runMigrateMachine() in loops, which would write data to buffer called migrating->wbuf, and change state machine;
  3. writeToDestNode() would transmit data in migrating->wbuf to target node asynchronously, so there would be no blocking waiting for sync;
  4. readFromDestNode() would also change the state machine;

2.2 state machine

state-machine Here is the state machine in source node for async chunk restore:

  1. When start migrating key1, start from MIGRATING_STEP_UNUSE , lock key1 in source node;
  2. If key1 is not a bigkey, lock key1 in target node, dump key1to buffer like RESTORE key1 ..., go back to MIGRATING_STEP_UNUSE, unlock key1 in both sides and wait for next task;
  3. If key1 is a bigkey,lock key1 in target node, go to MIGRATING_STEP_CHUNK_START, send RESTORE CHUNK START to target node. In target node, add key1to server.importing_keys to lock key1;
  4. When source node receives OK from target node, switch to MIGRATING_STEP_CHUNK_PAYLOAD and dump key1 to buffer in format RESTORE CHUNK STEP ...;
  5. When chunk restore finishes, unlock key1 fromserver.migrating_keys in source node;
  6. Send RESTORE CHUNK END to target with ttl, unlock key1 fromserver.importing_keys in target node, go back to MIGRATING_STEP_UNUSE and wait for next task;
  7. Write event will transmit dumped data to target node asynchronously in eventloop.

Actually for slot migration we could lock the whole slot instead of multiple key locks.

Comment From: madolson

Cool, looks like a promising design. One high level comment I have is why do we need to insert the key we are restoring into the database until before we are done recreating it. We could just leave it as a detached key, and then add it at the end. It seems like that would handle the eviction issue that was mentioned.

Also this sentence seems to trail off:

Defineserver.importing_keyswhich is a dict to lock keys to avoid unexpected modifications. Add keys to it when

I'll take a deeper look shortly.

Comment From: DarrenJiang13

Cool, looks like a promising design. One high level comment I have is why do we need to insert the key we are restoring into the database until before we are done recreating it. We could just leave it as a detached key, and then add it at the end. It seems like that would handle the eviction issue that was mentioned.

Yes! "add as a detached key at first and dbAdd it at the end" is a good way to avoid eviction in target node. But in source node we still need to avoid eviction cause this key has already been added.

Define server.importing_keys which is a dict to lock keys to avoid unexpected modifications.

This key locking mechanism still works. For example, when we try to restore-chunk a zset key zsetkey1 to redis, this key is added as detached at first. Then we write set zsetkey1 123 to redis. When we finish restoring chunks, these two keys with same name would bring conflicts.

So we should block set zsetkey1 123 during chunk migrating process and return an ERROR like -IMPORTING or -CHUNK.