本章来看持久化RDB、AOF

RDB:生成指定时间间隔内的 Redis 内存中数据快照,是一个二进制文件 dumpr.rdb

AOF:记录 Redis 除了查询以外的所有写命令,并在Redis 服务启动时,通过重新执行这些命令来还原数据。

两者在官网有详细的对比https://redis.io/topics/persistence



// 先来看RDB
//1.save命令
// save 60 1000

void saveCommand(redisClient *c)
{

    // 已经在bgsave
    if (server.rdb_child_pid != -1)
    {
        addReplyError(c, "Background save already in progress");
        return;
    }

    // 执行
    if (rdbSave(server.rdb_filename) == REDIS_OK)
    {
        addReply(c, shared.ok);
    }
    else
    {
        addReply(c, shared.err);
    }
}


//保存数据库到磁盘的具体实现
int rdbSave(char *filename)
{
    dictIterator *di = NULL;
    dictEntry *de;
    char tmpfile[256];
    char magic[10];
    int j;
    long long now = mstime();
    FILE *fp;
    rio rdb;
    uint64_t cksum;

    // 临时文件
    snprintf(tmpfile, 256, "temp-%d.rdb", (int)getpid());
    fp = fopen(tmpfile, "w");
    if (!fp)
    {
        redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
                 strerror(errno));
        return REDIS_ERR;
    }
    rioInitWithFile(&rdb, fp);

    // 是否需要校验
    if (server.rdb_checksum)
        rdb.update_cksum = rioGenericUpdateChecksum;
    snprintf(magic, sizeof(magic), "REDIS%04d", REDIS_RDB_VERSION);
    if (rdbWriteRaw(&rdb, magic, 9) == -1)
        goto werr;

    // 遍历整个库
    for (j = 0; j < server.dbnum; j++)
    {
        //哪个库
        redisDb *db = server.db + j;
        //库中具体存数据的字典
        dict *d = db->dict;

        // 字典是否为空
        if (dictSize(d) == 0)
            continue;

        // 创建安全迭代器
        di = dictGetSafeIterator(d);
        if (!di)
        {
            fclose(fp);
            return REDIS_ERR;
        }

        // 写入选择数据库标识符,REDIS_RDB_OPCODE_SELECTDB是特殊操作标识符
        if (rdbSaveType(&rdb, REDIS_RDB_OPCODE_SELECTDB) == -1)
            goto werr;
        if (rdbSaveLen(&rdb, j) == -1)
            goto werr;

        // 遍历数据库
        while ((de = dictNext(di)) != NULL)
        {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;

            // 根据 keystr ,在栈中创建一个 key 对象
            initStaticStringObject(key, keystr);
            expire = getExpire(db, &key);

            // 保存键值对数据
            if (rdbSaveKeyValuePair(&rdb, &key, o, expire, now) == -1)
                goto werr;
        }
        dictReleaseIterator(di);
    }
    di = NULL;

    // 写入某个数据库结束的特殊操作标识符
    if (rdbSaveType(&rdb, REDIS_RDB_OPCODE_EOF) == -1)
        goto werr;
    // 校验和
    cksum = rdb.cksum;
    memrev64ifbe(&cksum);
    rioWrite(&rdb, &cksum, 8);
    if (fflush(fp) == EOF)
        goto werr;
    if (fsync(fileno(fp)) == -1)
        goto werr;
    if (fclose(fp) == EOF)
        goto werr;

    // 重命名覆盖原来的文件
    if (rename(tmpfile, filename) == -1)
    {
        redisLog(REDIS_WARNING, "Error moving temp DB file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }

    redisLog(REDIS_NOTICE, "DB saved on disk");

    // 清零数据库脏状态
    server.dirty = 0;
    // 记录最后一次完成 SAVE 的时间
    server.lastsave = time(NULL);
    // 记录最后一次执行 SAVE 的状态
    server.lastbgsave_status = REDIS_OK;
    return REDIS_OK;

werr:
    // 关闭文件
    fclose(fp);
    // 删除文件
    unlink(tmpfile);

    redisLog(REDIS_WARNING, "Write error saving DB on disk: %s", strerror(errno));

    if (di)
        dictReleaseIterator(di);

    return REDIS_ERR;
}

// 保存键值
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
                        long long expiretime, long long now)
{
    // 过期时间
    if (expiretime != -1)
    {
        // 已过期
        if (expiretime < now)
            return 0;
        // 以毫秒计算过期时间
        if (rdbSaveType(rdb, REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1)
            return -1;
        if (rdbSaveMillisecondTime(rdb, expiretime) == -1)
            return -1;
    }

    // 保存键值
    // 保存值的类型
    if (rdbSaveObjectType(rdb, val) == -1)
        return -1;
    // 保存键
    if (rdbSaveStringObject(rdb, key) == -1)
        return -1;
    //保存值
    if (rdbSaveObject(rdb, val) == -1)
        return -1;

    return 1;
}

// 保存值,根据不同值类型保存
int rdbSaveObject(rio *rdb, robj *o)
{
    int n, nwritten = 0;

    if (o->type == REDIS_STRING)
    {
        if ((n = rdbSaveStringObject(rdb, o)) == -1)
            return -1;
        nwritten += n;
    }
    else if (o->type == REDIS_LIST)
    {
        if (o->encoding == REDIS_ENCODING_ZIPLIST)
        {
            size_t l = ziplistBlobLen((unsigned char *)o->ptr);

            if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1)
                return -1;
            nwritten += n;
        }
        else if (o->encoding == REDIS_ENCODING_LINKEDLIST)
        {
            list *list = o->ptr;
            listIter li;
            listNode *ln;

            if ((n = rdbSaveLen(rdb, listLength(list))) == -1)
                return -1;
            nwritten += n;
            listRewind(list, &li);
            while ((ln = listNext(&li)))
            {
                robj *eleobj = listNodeValue(ln);
                if ((n = rdbSaveStringObject(rdb, eleobj)) == -1)
                    return -1;
                nwritten += n;
            }
        }
        else
        {
            redisPanic("Unknown list encoding");
        }
    }
    else if (o->type == REDIS_SET)
    {
        if (o->encoding == REDIS_ENCODING_HT)
        {
            dict *set = o->ptr;
            dictIterator *di = dictGetIterator(set);
            dictEntry *de;

            if ((n = rdbSaveLen(rdb, dictSize(set))) == -1)
                return -1;
            nwritten += n;

            while ((de = dictNext(di)) != NULL)
            {
                robj *eleobj = dictGetKey(de);
                if ((n = rdbSaveStringObject(rdb, eleobj)) == -1)
                    return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);
        }
        else if (o->encoding == REDIS_ENCODING_INTSET)
        {
            size_t l = intsetBlobLen((intset *)o->ptr);
            if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1)
                return -1;
            nwritten += n;
        }
        else
        {
            redisPanic("Unknown set encoding");
        }
    }
    else if (o->type == REDIS_ZSET)
    {
        if (o->encoding == REDIS_ENCODING_ZIPLIST)
        {
            size_t l = ziplistBlobLen((unsigned char *)o->ptr);
            if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1)
                return -1;
            nwritten += n;
        }
        else if (o->encoding == REDIS_ENCODING_SKIPLIST)
        {
            zset *zs = o->ptr;
            dictIterator *di = dictGetIterator(zs->dict);
            dictEntry *de;

            if ((n = rdbSaveLen(rdb, dictSize(zs->dict))) == -1)
                return -1;
            nwritten += n;
            while ((de = dictNext(di)) != NULL)
            {
                robj *eleobj = dictGetKey(de);
                double *score = dictGetVal(de);
                if ((n = rdbSaveStringObject(rdb, eleobj)) == -1)
                    return -1;
                nwritten += n;

                if ((n = rdbSaveDoubleValue(rdb, *score)) == -1)
                    return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);
        }
        else
        {
            redisPanic("Unknown sorted set encoding");
        }
    }
    else if (o->type == REDIS_HASH)
    {
        if (o->encoding == REDIS_ENCODING_ZIPLIST)
        {
            size_t l = ziplistBlobLen((unsigned char *)o->ptr);
            if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1)
                return -1;
            nwritten += n;
        }
        else if (o->encoding == REDIS_ENCODING_HT)
        {
            dictIterator *di = dictGetIterator(o->ptr);
            dictEntry *de;

            if ((n = rdbSaveLen(rdb, dictSize((dict *)o->ptr))) == -1)
                return -1;
            nwritten += n;
            while ((de = dictNext(di)) != NULL)
            {
                robj *key = dictGetKey(de);
                robj *val = dictGetVal(de);
                if ((n = rdbSaveStringObject(rdb, key)) == -1)
                    return -1;
                nwritten += n;
                if ((n = rdbSaveStringObject(rdb, val)) == -1)
                    return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);
        }
        else
        {
            redisPanic("Unknown hash encoding");
        }
    }
    else
    {
        redisPanic("Unknown object type");
    }
    return nwritten;
}

//2.bgsave命令,bgsave会创建子进程进行保存

void bgsaveCommand(redisClient *c)
{
    if (server.rdb_child_pid != -1)
    {
        addReplyError(c, "Background save already in progress");
    }
    else if (server.aof_child_pid != -1)
    {
        addReplyError(c, "Can't BGSAVE while AOF log rewriting is in progress");
    }
    else if (rdbSaveBackground(server.rdb_filename) == REDIS_OK)
    {
        addReplyStatus(c, "Background saving started");
    }
    else
    {
        addReply(c, shared.err);
    }
}

int rdbSaveBackground(char *filename)
{
    pid_t childpid;
    long long start;
    if (server.rdb_child_pid != -1)
        return REDIS_ERR;
    server.dirty_before_bgsave = server.dirty;
    server.lastbgsave_try = time(NULL);
    start = ustime();

    if ((childpid = fork()) == 0)
    {
        int retval;

        /* Child */
        closeListeningSockets(0);
        redisSetProcTitle("redis-rdb-bgsave");

        // 执行保存操作
        retval = rdbSave(filename);
        if (retval == REDIS_OK)
        {
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty)
            {
                redisLog(REDIS_NOTICE,
                         "RDB: %zu MB of memory used by copy-on-write",
                         private_dirty / (1024 * 1024));
            }
        }

        // 向父进程发送信号
        exitFromChild((retval == REDIS_OK) ? 0 : 1);
    }
    else
    {

        /* Parent */

        // 计算 fork() 执行的时间
        server.stat_fork_time = ustime() - start;

        // 如果 fork() 出错,那么报告错误
        if (childpid == -1)
        {
            server.lastbgsave_status = REDIS_ERR;
            redisLog(REDIS_WARNING, "Can't save in background: fork: %s",
                     strerror(errno));
            return REDIS_ERR;
        }

        // 打印 BGSAVE 开始的日志
        redisLog(REDIS_NOTICE, "Background saving started by pid %d", childpid);

        // 记录数据库开始 BGSAVE 的时间
        server.rdb_save_time_start = time(NULL);

        // 记录负责执行 BGSAVE 的子进程 ID
        server.rdb_child_pid = childpid;

        // 关闭自动 rehash
        updateDictResizePolicy();

        return REDIS_OK;
    }

    return REDIS_OK; /* unreached */
}

//3.dump.rdb恢复数据
// 在启动的main方法中执行loadDataFromDisk,会判断是否加载数据
int rdbLoad(char *filename)
{
    uint32_t dbid;
    int type, rdbver;
    redisDb *db = server.db + 0;
    char buf[1024];
    long long expiretime, now = mstime();
    FILE *fp;
    rio rdb;

    // 打开 rdb 文件
    if ((fp = fopen(filename, "r")) == NULL)
        return REDIS_ERR;

    // 初始化写入流
    rioInitWithFile(&rdb, fp);
    rdb.update_cksum = rdbLoadProgressCallback;
    rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
    if (rioRead(&rdb, buf, 9) == 0)
        goto eoferr;
    buf[9] = '\0';

    // 检查版本号
    if (memcmp(buf, "REDIS", 5) != 0)
    {
        fclose(fp);
        redisLog(REDIS_WARNING, "Wrong signature trying to load DB from file");
        errno = EINVAL;
        return REDIS_ERR;
    }
    rdbver = atoi(buf + 5);
    if (rdbver < 1 || rdbver > REDIS_RDB_VERSION)
    {
        fclose(fp);
        redisLog(REDIS_WARNING, "Can't handle RDB format version %d", rdbver);
        errno = EINVAL;
        return REDIS_ERR;
    }

    // 将服务器状态调整到开始载入状态
    startLoading(fp);
    while (1)
    {
        robj *key, *val;
        expiretime = -1;

        /* Read type. 
         *
         * 读入类型指示,决定该如何读入之后跟着的数据。
         *
         * 这个指示可以是 rdb.h 中定义的所有以
         * REDIS_RDB_TYPE_* 为前缀的常量的其中一个
         * 或者所有以 REDIS_RDB_OPCODE_* 为前缀的常量的其中一个
         */
        if ((type = rdbLoadType(&rdb)) == -1)
            goto eoferr;

        // 读入过期时间值
        if (type == REDIS_RDB_OPCODE_EXPIRETIME)
        {

            // 以秒计算的过期时间

            if ((expiretime = rdbLoadTime(&rdb)) == -1)
                goto eoferr;

            /* We read the time so we need to read the object type again. 
             *
             * 在过期时间之后会跟着一个键值对,我们要读入这个键值对的类型
             */
            if ((type = rdbLoadType(&rdb)) == -1)
                goto eoferr;

            /* the EXPIRETIME opcode specifies time in seconds, so convert
             * into milliseconds. 
             *
             * 将格式转换为毫秒*/
            expiretime *= 1000;
        }
        else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS)
        {

            // 以毫秒计算的过期时间

            /* Milliseconds precision expire times introduced with RDB
             * version 3. */
            if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1)
                goto eoferr;

            /* We read the time so we need to read the object type again.
             *
             * 在过期时间之后会跟着一个键值对,我们要读入这个键值对的类型
             */
            if ((type = rdbLoadType(&rdb)) == -1)
                goto eoferr;
        }

        // 读入数据 EOF (不是 rdb 文件的 EOF)
        if (type == REDIS_RDB_OPCODE_EOF)
            break;

        /* Handle SELECT DB opcode as a special case 
         *
         * 读入切换数据库指示
         */
        if (type == REDIS_RDB_OPCODE_SELECTDB)
        {

            // 读入数据库号码
            if ((dbid = rdbLoadLen(&rdb, NULL)) == REDIS_RDB_LENERR)
                goto eoferr;

            // 检查数据库号码的正确性
            if (dbid >= (unsigned)server.dbnum)
            {
                redisLog(REDIS_WARNING, "FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
                exit(1);
            }

            // 在程序内容切换数据库
            db = server.db + dbid;

            // 跳过
            continue;
        }

        /* Read key 
         *
         * 读入键
         */
        if ((key = rdbLoadStringObject(&rdb)) == NULL)
            goto eoferr;

        /* Read value 
         *
         * 读入值
         */
        if ((val = rdbLoadObject(type, &rdb)) == NULL)
            goto eoferr;

        /* Check if the key already expired. This function is used when loading
         * an RDB file from disk, either at startup, or when an RDB was
         * received from the master. In the latter case, the master is
         * responsible for key expiry. If we would expire keys here, the
         * snapshot taken by the master may not be reflected on the slave. 
         *
         * 如果服务器为主节点的话,
         * 那么在键已经过期的时候,不再将它们关联到数据库中去
         */
        if (server.masterhost == NULL && expiretime != -1 && expiretime < now)
        {
            decrRefCount(key);
            decrRefCount(val);
            // 跳过
            continue;
        }

        /* Add the new object in the hash table 
         *
         * 将键值对关联到数据库中
         */
        dbAdd(db, key, val);

        /* Set the expire time if needed 
         *
         * 设置过期时间
         */
        if (expiretime != -1)
            setExpire(db, key, expiretime);

        decrRefCount(key);
    }

    /* Verify the checksum if RDB version is >= 5 
     *
     * 如果 RDB 版本 >= 5 ,那么比对校验和
     */
    if (rdbver >= 5 && server.rdb_checksum)
    {
        uint64_t cksum, expected = rdb.cksum;

        // 读入文件的校验和
        if (rioRead(&rdb, &cksum, 8) == 0)
            goto eoferr;
        memrev64ifbe(&cksum);

        // 比对校验和
        if (cksum == 0)
        {
            redisLog(REDIS_WARNING, "RDB file was saved with checksum disabled: no check performed.");
        }
        else if (cksum != expected)
        {
            redisLog(REDIS_WARNING, "Wrong RDB checksum. Aborting now.");
            exit(1);
        }
    }

    // 关闭 RDB
    fclose(fp);

    // 服务器从载入状态中退出
    stopLoading();

    return REDIS_OK;

eoferr: /* unexpected end of file is handled here with a fatal exit */
    redisLog(REDIS_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now.");
    exit(1);
    return REDIS_ERR; /* Just to avoid warning */
}

基于版本3.0.0版本,点击下载https://download.redis.io/releases/redis-3.0.0.tar.gz

本文地址,https://www.ccagml.com/?p=432

发表评论