前面我们看完事件循环的主流程,这里我们开始看第一个命令set的在redis中的执行流程

// 一个set命令的执行流程

//src/networking.c
//在上文创建操作redis的客户端时使用readQueryFromClient方法接收处理连接新到达的命令

/*
 * 处理连接请求内容
 */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
{
    processInputBuffer(c);
}

// 在readQueryFromClient通过该方法处理请求内容
void processInputBuffer(redisClient *c)
{

    processMultibulkBuffer(c)
    processCommand(c)
}


int processMultibulkBuffer(redisClient *c)
{
    char *newline = NULL;
    int pos = 0, ok;
    long long ll;

    // 读入命令的参数个数
    // 比如执行命令是set aa 23333, 则这边的c->querybuf显示为
    // *3\r\n3\r\nset\r\n2\r\naa\r\n5\r\n23333
    // 客户端发过来的命令以 * 开头
    fprintf(stderr, "set aa 23333 c->querybuf %s \n", c->querybuf);

    if (c->multibulklen == 0)
    {
        /* Multi bulk length cannot be read without a \r\n */
        newline = strchr(c->querybuf, '\r');

        /* We know for sure there is a whole line since newline != NULL,
         * so go ahead and find out the multi bulk length. */
        redisAssertWithInfo(c, NULL, c->querybuf[0] == '*');
        // 获得参数的个数,从*之后到第一个\r\n之前,存到ll中
        ok = string2ll(c->querybuf + 1, newline - (c->querybuf + 1), &ll);
        pos = (newline - c->querybuf) + 2;
        // 设置参数数量
        c->multibulklen = ll;
        /* Setup argv array on client structure */
        if (c->argv)
            zfree(c->argv);
        c->argv = zmalloc(sizeof(robj *) * c->multibulklen);
    }

    // 从 c->querybuf 中读入参数,并创建各个参数对象到 c->argv
    while (c->multibulklen)
    {

        /* Read bulk length if unknown */
        if (c->bulklen == -1)
        {

            // 确保 "\r\n" 存在
            newline = strchr(c->querybuf + pos, '\r');
            /* Buffer should also contain \n */
            if (newline - (c->querybuf) > ((signed)sdslen(c->querybuf) - 2))
                break;

            // 检查刚好是
            // *3\r\n3\r\nset\r\n2\r\naa\r\n5\r\n23333
            if (c->querybuf[pos] != '')
            {
                addReplyErrorFormat(c,
                                    "Protocol error: expected '', got '%c'",
                                    c->querybuf[pos]);
                setProtocolError(c, pos);
                return REDIS_ERR;
            }

            // 读取长度
            // 比如 *3\r\n3\r\nset\r\n2\r\naa\r\n5\r\n23333
            ok = string2ll(c->querybuf + pos + 1, newline - (c->querybuf + pos + 1), &ll);
            if (!ok || ll < 0 || ll > 512 * 1024 * 1024)
            {
                addReplyError(c, "Protocol error: invalid bulk length");
                setProtocolError(c, pos);
                return REDIS_ERR;
            }

            // 定位到参数的开头
            // 比如
            // *3\r\n3\r\nset\r\n2\r\naa\r\n5\r\n23333
            // pos = 4 值为
            // 增加后 pos = 8 值为s
            // pos = 13 值为// 增加后 pos = 17 值为a
            // pos = 22 值为
            // 增加后 pos = 26 值为2
            pos += newline - (c->querybuf + pos) + 2;
            // 参数的长度
            c->bulklen = ll;
        }

        /* Read bulk argument */
        // 读入参数
        if (sdslen(c->querybuf) - pos < (unsigned)(c->bulklen + 2))
            /* Not enough data (+2 == trailing \r\n) */
            break;
        }
        else
        {
                c->argv[c->argc++] =
                    createStringObject(c->querybuf + pos, c->bulklen);
                pos += c->bulklen + 2;

            // 还需要读取的参数个数
            c->multibulklen--;
        }
    }

    /* Trim to pos */
    if (pos)
        sdsrange(c->querybuf, pos, -1);

    /* We're done when c->multibulk == 0 */
    if (c->multibulklen == 0)
        return REDIS_OK;

    /* Still not read to process the command */
    // 如果还有参数未读取完,那么就协议内容有错
    return REDIS_ERR;
}

/* If this function gets called we already read a whole
 * command, arguments are in the client argv/argc fields.
 * processCommand() execute the command or prepare the
 * server for a bulk read from the client.
 *
 * If 1 is returned the client is still alive and valid and
 * other operations can be performed by the caller. Otherwise
 * if 0 is returned the client was destroyed (i.e. after QUIT). 
 *
 */
int processCommand(redisClient *c)
{

    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    // 根据字符串查找命令, 检查参数个数
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd)
    {
        // 没有命令
    }
    else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
             (c->argc < -c->cmd->arity))
    {
        //参数错误
    }
    // 执行命令
    call(c, REDIS_CALL_FULL);
}

/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags)
{
    // 执行命令的函数.在前面的lookupCommand会给cmd设置值
    // 在例子set aa 23333 中相当于执行 setCommand方法
    c->cmd->proc(c);
}

/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void setCommand(redisClient *c)
{
    int j;
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = REDIS_SET_NO_FLAGS;

    // 设置选项参数
    for (j = 3; j < c->argc; j++)
    {
        char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc - 1) ? NULL : c->argv[j + 1];

        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0')
        {
            flags |= REDIS_SET_NX;
        }
        else if ((a[0] == 'x' || a[0] == 'X') &&
                 (a[1] == 'x' || a[1] == 'X') && a[2] == '\0')
        {
            flags |= REDIS_SET_XX;
        }
        else if ((a[0] == 'e' || a[0] == 'E') &&
                 (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next)
        {
            unit = UNIT_SECONDS;
            expire = next;
            j++;
        }
        else if ((a[0] == 'p' || a[0] == 'P') &&
                 (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next)
        {
            unit = UNIT_MILLISECONDS;
            expire = next;
            j++;
        }
        else
        {
            addReply(c, shared.syntaxerr);
            return;
        }
    }

    // 对参数编码
    c->argv[2] = tryObjectEncoding(c->argv[2]);

    setGenericCommand(c, flags, c->argv[1], c->argv[2], expire, unit, NULL, NULL);
}

void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply)
{

    long long milliseconds = 0; /* initialized to avoid any harmness warning */

    // 取出过期时间
    if (expire)
    {

        // 取出 expire 参数的值
        // T = O(N)
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK)
            return;

        // expire 参数的值不正确时报错
        if (milliseconds <= 0)
        {
            addReplyError(c, "invalid expire time in SETEX");
            return;
        }

        // 将秒转成毫秒
        if (unit == UNIT_SECONDS)
            milliseconds *= 1000;
    }

    // 如果设置了 NX 或者 XX 参数,那么检查条件是否不符合这两个设置
    // 在条件不符合时报错,报错的内容由 abort_reply 参数决定
    if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db, key) != NULL) ||
        (flags & REDIS_SET_XX && lookupKeyWrite(c->db, key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }

    // 将键值关联到数据库
    setKey(c->db, key, val);
    server.dirty++;

    // 为键设置过期时间
    if (expire)
        setExpire(c->db, key, mstime() + milliseconds);

    // 发送通知
    notifyKeyspaceEvent(REDIS_NOTIFY_STRING, "set", key, c->db->id);

    // 发送通知
    if (expire)
        notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
                            "expire", key, c->db->id);

    // 设置成功回复客户端
    addReply(c, ok_reply ? ok_reply : shared.ok);
}

/* High level Set operation. This function can be used in order to set
 * a key, whatever it was existing or not, to a new object.
 *
 * 1) The ref count of the value object is incremented.
 *
 * 2) clients WATCHing for the destination key notified.
 *
 * 3) The expire time of the key is reset (the key is made persistent). 
 */
void setKey(redisDb *db, robj *key, robj *val)
{

    // 添加或覆写数据库中的键值对
    if (lookupKeyWrite(db, key) == NULL)
    {
        // 没有值就添加新值
        dbAdd(db, key, val);
    }
    else
    {
        // 有值就覆盖
        dbOverwrite(db, key, val);
    }

    incrRefCount(val);

    // 移除键的过期时间
    removeExpire(db, key);

    // 发送键修改通知
    signalModifiedKey(db, key);
}

/* Add the key to the DB. It's up to the caller to increment the reference
 * counter of the value if needed.
 * The program is aborted if the key already exists. 
 *
 */
void dbAdd(redisDb *db, robj *key, robj *val)
{

    // 复制操作的键
    sds copy = sdsdup(key->ptr);

    // 尝试添加键值对
    int retval = dictAdd(db->dict, copy, val);

    // 如果键已经存在,那么停止
    redisAssertWithInfo(NULL, key, retval == REDIS_OK);

    // 如果开启了集群模式,那么将键保存到槽里面
    if (server.cluster_enabled)
        slotToKeyAdd(key);
}

/* Overwrite an existing key with a new value. Incrementing the reference
 * count of the new value is up to the caller.
 *
 * This function does not modify the expire time of the existing key.
 *
 * The program is aborted if the key was not already present. 
 */
void dbOverwrite(redisDb *db, robj *key, robj *val)
{
    dictEntry *de = dictFind(db->dict, key->ptr);

    // 节点必须存在,否则中止
    redisAssertWithInfo(NULL, key, de != NULL);

    // 覆写旧值
    dictReplace(db->dict, key->ptr, val);
}

基于版本3.0.0版本,点击下载https://download.redis.io/releases/redis-3.0.0.tar.gz

本文地址,https://www.ccagml.com/?p=387

发表评论