aof保存
aof是redis的一种持久化方式,可以根据配置自动触发保存,也可以手动执行保存
//在配置中
appendonly yes //打开aof保存
appendfilename “aof111.aof” // 设置aof报错路径
appendfsync always // 每次操作都aof保存
appendfsync everysec // 每秒aof保存
appendfsync no // 不保存
1.手动保存,当客户端执行bgrewriteaof命令,redis服务端会执行bgrewriteaofCommand方法开始相关的保存逻辑
2.自动保存,当客户端传来命令后,redis在call方法执行c->cmd->proc命令后,会对命令进行记录,将命令存放到server.aof_buf中,当下一个事件到来时(beforeSleep)或者服务器每毫秒的循环(serverCron)会根据配置看是否触发写文件


//1.beforeSleep手动保存
void bgrewriteaofCommand(redisClient *c)
{
    if (server.aof_child_pid != -1)
    {
        addReplyError(c, "Background append only file rewriting already in progress");
    }
    else if (server.rdb_child_pid != -1)
    {
        server.aof_rewrite_scheduled = 1;
        addReplyStatus(c, "Background append only file rewriting scheduled");
    }
    else if (rewriteAppendOnlyFileBackground() == REDIS_OK)
    {
        addReplyStatus(c, "Background append only file rewriting started");
    }
    else
    {
        addReply(c, shared.err);
    }
}
// 后台保存
int rewriteAppendOnlyFileBackground(void)
{
    pid_t childpid;
    long long start;

    // 已经有进程在进行 AOF 重写了
    if (server.aof_child_pid != -1)
        return REDIS_ERR;

    // 记录 fork 开始前的时间,计算 fork 耗时用
    start = ustime();

    if ((childpid = fork()) == 0)
    {
        char tmpfile[256];

        /* Child */

        // 关闭网络连接 fd
        closeListeningSockets(0);

        // 为进程设置名字,方便记认
        redisSetProcTitle("redis-aof-rewrite");

        // 创建临时文件,并进行 AOF 重写
        snprintf(tmpfile, 256, "temp-rewriteaof-bg-%d.aof", (int)getpid());
        if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK)
        {
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty)
            {
                redisLog(REDIS_NOTICE,
                         "AOF rewrite: %zu MB of memory used by copy-on-write",
                         private_dirty / (1024 * 1024));
            }
            // 发送重写成功信号
            exitFromChild(0);
        }
        else
        {
            // 发送重写失败信号
            exitFromChild(1);
        }
    }
    else
    {
        /* Parent */
        // 记录执行 fork 所消耗的时间
        server.stat_fork_time = ustime() - start;

        if (childpid == -1)
        {
            redisLog(REDIS_WARNING,
                     "Can't rewrite append only file in background: fork: %s",
                     strerror(errno));
            return REDIS_ERR;
        }

        redisLog(REDIS_NOTICE,
                 "Background append only file rewriting started by pid %d", childpid);

        // 记录 AOF 重写的信息
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;

        // 关闭字典自动 rehash
        updateDictResizePolicy();

        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. 
         *
         * 将 aof_selected_db 设为 -1 ,
         * 强制让 feedAppendOnlyFile() 下次执行时引发一个 SELECT 命令,
         * 从而确保之后新添加的命令会设置到正确的数据库中
         */
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}
//具体重写逻辑
int rewriteAppendOnlyFile(char *filename)
{
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. 
     *
     * 创建临时文件
     *
     * 注意这里创建的文件名和 rewriteAppendOnlyFileBackground() 创建的文件名稍有不同
     */
    snprintf(tmpfile, 256, "temp-rewriteaof-%d.aof", (int)getpid());
    fp = fopen(tmpfile, "w");
    if (!fp)
    {
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return REDIS_ERR;
    }

    // 初始化文件 io
    rioInitWithFile(&aof, fp);

    // 设置每写入 REDIS_AOF_AUTOSYNC_BYTES 字节
    // 就执行一次 FSYNC
    // 防止缓存中积累太多命令内容,造成 I/O 阻塞时间过长
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof, REDIS_AOF_AUTOSYNC_BYTES);

    // 遍历所有数据库
    for (j = 0; j < server.dbnum; j++)
    {

        char selectcmd[] = "*2\r\n6\r\nSELECT\r\n";

        redisDb *db = server.db + j;

        // 指向键空间
        dict *d = db->dict;
        if (dictSize(d) == 0)
            continue;

        // 创建键空间迭代器
        di = dictGetSafeIterator(d);
        if (!di)
        {
            fclose(fp);
            return REDIS_ERR;
        }

        /* SELECT the new DB         *
         * 首先写入 SELECT 命令,确保之后的数据会被插入到正确的数据库上
         */
        if (rioWrite(&aof, selectcmd, sizeof(selectcmd) - 1) == 0)
            goto werr;
        if (rioWriteBulkLongLong(&aof, j) == 0)
            goto werr;

        /* Iterate this DB writing every entry         *
         * 遍历数据库所有键,并通过命令将它们的当前状态(值)记录到新 AOF 文件中
         */
        while ((de = dictNext(di)) != NULL)
        {
            sds keystr;
            robj key, *o;
            long long expiretime;

            // 取出键
            keystr = dictGetKey(de);

            // 取出值
            o = dictGetVal(de);
            initStaticStringObject(key, keystr);

            // 取出过期时间
            expiretime = getExpire(db, &key);

            /* If this key is already expired skip it             *
             * 如果键已经过期,那么跳过它,不保存
             */
            if (expiretime != -1 && expiretime9\r\nPEXPIREAT\r\n";

                // 写入 PEXPIREAT expiretime 命令
                if (rioWrite(&aof, cmd, sizeof(cmd) - 1) == 0)
                    goto werr;
                if (rioWriteBulkObject(&aof, &key) == 0)
                    goto werr;
                if (rioWriteBulkLongLong(&aof, expiretime) == 0)
                    goto werr;
            }
        }

        // 释放迭代器
        dictReleaseIterator(di);
    }

    /* Make sure data will not remain on the OS's output buffers */
    // 冲洗并关闭新 AOF 文件
    if (fflush(fp) == EOF)
        goto werr;
    if (aof_fsync(fileno(fp)) == -1)
        goto werr;
    if (fclose(fp) == EOF)
        goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok.     *
     * 原子地改名,用重写后的新 AOF 文件覆盖旧 AOF 文件
     */
    if (rename(tmpfile, filename) == -1)
    {
        redisLog(REDIS_WARNING, "Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }

    redisLog(REDIS_NOTICE, "SYNC append only file rewrite performed");

    return REDIS_OK;

werr:
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING, "Write error writing append only file on disk: %s", strerror(errno));
    if (di)
        dictReleaseIterator(di);
    return REDIS_ERR;
}

// 2.自动开启的aof
//redis.c
void call(redisClient *c, int flags)
{
    // 执行实现函数
    c->cmd->proc(c);
    // 将命令复制到 AOF 和 slave 节点
    if (flags&REDIS_CALL_PROPAGATE)
    {
        int flags = REDIS_PROPAGATE_NONE;

        // 强制 REPL 传播
        if (c->flags&REDIS_FORCE_REPL)
            flags |= REDIS_PROPAGATE_REPL;

        // 强制 AOF 传播
        if (c->flags&REDIS_FORCE_AOF)
            flags |= REDIS_PROPAGATE_AOF;

        // 如果数据库有被修改,那么启用 REPL 和 AOF 传播
        if (dirty)
            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);

        if (flags != REDIS_PROPAGATE_NONE)
            propagate(c->cmd, c->db->id, c->argv, c->argc, flags);
    }
}
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    // 传播到 AOF
    if (server.aof_state != REDIS_AOF_OFF && flags&REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd, dbid, argv, argc);

    // 传播到 slave
    if (flags&REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves, dbid, argv, argc);
}

/*
 * 将命令追加到 AOF 文件中,
 * 如果 AOF 重写正在进行,那么也将命令追加到 AOF 重写缓存中。
 */
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc)
{
    sds buf = sdsempty();
    robj *tmpargv[3];

    /* The DB this command was targeting is not the same as the last command
     * we appendend. To issue a SELECT command is needed.     *
     * 使用 SELECT 命令,显式设置数据库,确保之后的命令被设置到正确的数据库
     */
    if (dictid != server.aof_selected_db)
    {
        char seldb[64];

        snprintf(seldb, sizeof(seldb), "%d", dictid);
        buf = sdscatprintf(buf, "*2\r\n6\r\nSELECT\r\n$%lu\r\n%s\r\n",
                           (unsigned long)strlen(seldb), seldb);

        server.aof_selected_db = dictid;
    }

    // EXPIRE 、 PEXPIRE 和 EXPIREAT 命令
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand)
    {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT 
         *
         * 将 EXPIRE 、 PEXPIRE 和 EXPIREAT 都翻译成 PEXPIREAT
         */
        buf = catAppendOnlyExpireAtCommand(buf, cmd, argv[1], argv[2]);

        // SETEX 和 PSETEX 命令
    }
    else if (cmd->proc == setexCommand || cmd->proc == psetexCommand)
    {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT 
         *
         * 将两个命令都翻译成 SET 和 PEXPIREAT
         */

        // SET
        tmpargv[0] = createStringObject("SET", 3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf, 3, tmpargv);

        // PEXPIREAT
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf, cmd, argv[1], argv[2]);

        // 其他命令
    }
    else
    {
        /* All the other commands don't need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
        buf = catAppendOnlyGenericCommand(buf, argc, argv);
    }

    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. 
     *
     * 将命令追加到 AOF 缓存中,
     * 在重新进入事件循环之前,这些命令会被冲洗到磁盘上,
     * 并向客户端返回一个回复。
     */
    if (server.aof_state == REDIS_AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));

    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. 
     *
     * 如果 BGREWRITEAOF 正在进行,
     * 那么我们还需要将命令追加到重写缓存中,
     * 从而记录当前正在重写的 AOF 文件和数据库当前状态的差异。
     */
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char *)buf, sdslen(buf));

    // 释放
    sdsfree(buf);
}

// 将aof_buf中的命令刷到文件
void flushAppendOnlyFile(int force)
{
    ssize_t nwritten;
    int sync_in_progress = 0;

    // 缓冲区中没有任何内容,直接返回
    if (sdslen(server.aof_buf) == 0)
        return;

    // 策略为每秒 FSYNC
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        // 是否有 SYNC 正在后台进行?
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

    // 每秒 fsync ,并且强制写入为假
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force)
    {

        /* With this append fsync policy we do background fsyncing.
         *
         * 当 fsync 策略为每秒钟一次时, fsync 在后台执行。
         *
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. 
         *
         * 如果后台仍在执行 FSYNC ,那么我们可以延迟写操作一两秒
         * (如果强制执行 write 的话,服务器主线程将阻塞在 write 上面)
         */
        if (sync_in_progress)
        {

            // 有 fsync 正在后台进行 。。。

            if (server.aof_flush_postponed_start == 0)
            {
                /* No previous write postponinig, remember that we are
                 * postponing the flush and return. 
                 *
                 * 前面没有推迟过 write 操作,这里将推迟写操作的时间记录下来
                 * 然后就返回,不执行 write 或者 fsync
                 */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            }
            else if (server.unixtime - server.aof_flush_postponed_start < 2)
            {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. 
                 *
                 * 如果之前已经因为 fsync 而推迟了 write 操作
                 * 但是推迟的时间不超过 2 秒,那么直接返回
                 * 不执行 write 或者 fsync
                 */
                return;
            }

            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. 
             *
             * 如果后台还有 fsync 在执行,并且 write 已经推迟 >= 2 秒
             * 那么执行写操作(write 将被阻塞)
             */
            server.aof_delayed_fsync++;
            redisLog(REDIS_NOTICE, "Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }

    /* If you are following this code path, then we are going to write so
     * set reset the postponed flush sentinel to zero. 
     *
     * 执行到这里,程序会对 AOF 文件进行写入。
     *
     * 清零延迟 write 的时间记录
     */
    server.aof_flush_postponed_start = 0;

    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     *
     * 执行单个 write 操作,如果写入设备是物理的话,那么这个操作应该是原子的
     *
     * While this will save us against the server being killed I don't think
     * there is much to do about the whole server stopping for power problems
     * or alike 
     *
     * 当然,如果出现像电源中断这样的不可抗现象,那么 AOF 文件也是可能会出现问题的
     * 这时就要用 redis-check-aof 程序来进行修复。
     */
    nwritten = write(server.aof_fd, server.aof_buf, sdslen(server.aof_buf));
    if (nwritten != (signed)sdslen(server.aof_buf))
    {

        static time_t last_write_error_log = 0;
        int can_log = 0;

        /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
        // 将日志的记录频率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE)
        {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }

        /* Lof the AOF write error and record the error code. */
        // 如果写入出错,那么尝试将该情况写入到日志里面
        if (nwritten == -1)
        {
            if (can_log)
            {
                redisLog(REDIS_WARNING, "Error writing to the AOF file: %s",
                         strerror(errno));
                server.aof_last_write_errno = errno;
            }
        }
        else
        {
            if (can_log)
            {
                redisLog(REDIS_WARNING, "Short write while writing to "
                                        "the AOF file: (nwritten=%lld, "
                                        "expected=%lld)",
                         (long long)nwritten,
                         (long long)sdslen(server.aof_buf));
            }

            // 尝试移除新追加的不完整内容
            if (ftruncate(server.aof_fd, server.aof_current_size) == -1)
            {
                if (can_log)
                {
                    redisLog(REDIS_WARNING, "Could not remove short write "
                                            "from the append-only file.  Redis may refuse "
                                            "to load the AOF the next time it starts.  "
                                            "ftruncate: %s",
                             strerror(errno));
                }
            }
            else
            {
                /* If the ftrunacate() succeeded we can set nwritten to
                 * -1 since there is no longer partial data into the AOF. */
                nwritten = -1;
            }
            server.aof_last_write_errno = ENOSPC;
        }

        /* Handle the AOF write error. */
        // 处理写入 AOF 文件时出现的错误
        if (server.aof_fsync == AOF_FSYNC_ALWAYS)
        {
            /* We can't recover when the fsync policy is ALWAYS since the
             * reply for the client is already in the output buffers, and we
             * have the contract with the user that on acknowledged write data
             * is synched on disk. */
            redisLog(REDIS_WARNING, "Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
            exit(1);
        }
        else
        {
            /* Recover from failed write leaving data into the buffer. However
             * set an error to stop accepting writes as long as the error
             * condition is not cleared. */
            server.aof_last_write_status = REDIS_ERR;

            /* Trim the sds buffer if there was a partial write, and there
             * was no way to undo it with ftruncate(2). */
            if (nwritten > 0)
            {
                server.aof_current_size += nwritten;
                sdsrange(server.aof_buf, nwritten, -1);
            }
            return; /* We'll try again on the next call... */
        }
    }
    else
    {
        /* Successful write(2). If AOF was in error state, restore the
         * OK state and log the event. */
        // 写入成功,更新最后写入状态
        if (server.aof_last_write_status == REDIS_ERR)
        {
            redisLog(REDIS_WARNING,
                     "AOF write error looks solved, Redis can write again.");
            server.aof_last_write_status = REDIS_OK;
        }
    }

    // 更新写入后的 AOF 文件大小
    server.aof_current_size += nwritten;

    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). 
     *
     * 如果 AOF 缓存的大小足够小的话,那么重用这个缓存,
     * 否则的话,释放 AOF 缓存。
     */
    if ((sdslen(server.aof_buf) + sdsavail(server.aof_buf)) < 4000)
    {
        // 清空缓存中的内容,等待重用
        sdsclear(server.aof_buf);
    }
    else
    {
        // 释放缓存
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }

    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. 
     *
     * 如果 no-appendfsync-on-rewrite 选项为开启状态,
     * 并且有 BGSAVE 或者 BGREWRITEAOF 正在进行的话,
     * 那么不执行 fsync 
     */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
        return;

    /* Perform the fsync if needed. */

    // 总是执行 fsnyc
    if (server.aof_fsync == AOF_FSYNC_ALWAYS)
    {
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */

        // 更新最后一次执行 fsnyc 的时间
        server.aof_last_fsync = server.unixtime;

        // 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒
    }
    else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
              server.unixtime > server.aof_last_fsync))
    {
        // 放到后台执行
        if (!sync_in_progress)
            aof_background_fsync(server.aof_fd);
        // 更新最后一次执行 fsync 的时间
        server.aof_last_fsync = server.unixtime;
    }

    // 其实上面无论执行 if 部分还是 else 部分都要更新 fsync 的时间
    // 可以将代码挪到下面来
    // server.aof_last_fsync = server.unixtime;
}

// 每次处理事件之前执行
void beforeSleep(struct aeEventLoop *eventLoop)
{
    /* Write the AOF buffer on disk */
    // 将 AOF 缓冲区的内容写入到 AOF 文件
    flushAppendOnlyFile(0);
}

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
{

    // 根据 AOF 政策,
    // 考虑是否需要将 AOF 缓冲区中的内容写入到 AOF 文件中
    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    if (server.aof_flush_postponed_start)
        flushAppendOnlyFile(0);

    /* AOF write errors: in this case we have a buffer to flush as well and
     * clear the AOF error in case of success to make the DB writable again,
     * however to try every second is enough in case of 'hz' is set to
     * an higher frequency. */
    run_with_period(1000)
    {
        if (server.aof_last_write_status == REDIS_ERR)
            flushAppendOnlyFile(0);
    }


}

基于版本3.0.0版本,点击下载https://download.redis.io/releases/redis-3.0.0.tar.gz

本文地址,https://www.ccagml.com/?p=437

发表评论