When using lookupKeyReadWithFlags()
to count missing_keys
, the preceding if statement checks for migrating_slot || importing_slot
, and the lookup operation explicitly uses the current server's database.
/* Migrating / Importing slot? Count keys we don't have.
* If it is pubsubshard command, it isn't required to check
* the channel being present or not in the node during the
* slot migration, the channel will be served from the source
* node until the migration completes with CLUSTER SETSLOT <slot>
* NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
if ((migrating_slot || importing_slot) && !pubsubshard_included)
{
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
else existing_keys++;
}
However, when assigning the importing_slot
variable, there is no check to verify whether the hash slot actually belongs to the current node.
if (firstkey == NULL) {
/* This is the first key we see. Check what is the slot
* and node. */
firstkey = thiskey;
slot = thisslot;
n = getNodeBySlot(slot);
/* Error: If a slot is not served, we are in "cluster down"
* state. However the state is yet to be updated, so this was
* not trapped earlier in processCommand(). Report the same
* error to the client. */
if (n == NULL) {
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}
/* If we are migrating or importing this slot, we need to check
* if we have all the keys in the request (the only way we
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (n == myself &&
getMigratingSlotDest(slot) != NULL)
{
migrating_slot = 1;
} else if (getImportingSlotSource(slot) != NULL) {
importing_slot = 1;
}
}