本章来看持久化RDB、AOF
RDB:生成指定时间间隔内的 Redis 内存中数据快照,是一个二进制文件 dumpr.rdb
AOF:记录 Redis 除了查询以外的所有写命令,并在Redis 服务启动时,通过重新执行这些命令来还原数据。
两者在官网有详细的对比https://redis.io/topics/persistence
// 先来看RDB
//1.save命令
// save 60 1000
void saveCommand(redisClient *c)
{
// 已经在bgsave
if (server.rdb_child_pid != -1)
{
addReplyError(c, "Background save already in progress");
return;
}
// 执行
if (rdbSave(server.rdb_filename) == REDIS_OK)
{
addReply(c, shared.ok);
}
else
{
addReply(c, shared.err);
}
}
//保存数据库到磁盘的具体实现
int rdbSave(char *filename)
{
dictIterator *di = NULL;
dictEntry *de;
char tmpfile[256];
char magic[10];
int j;
long long now = mstime();
FILE *fp;
rio rdb;
uint64_t cksum;
// 临时文件
snprintf(tmpfile, 256, "temp-%d.rdb", (int)getpid());
fp = fopen(tmpfile, "w");
if (!fp)
{
redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
strerror(errno));
return REDIS_ERR;
}
rioInitWithFile(&rdb, fp);
// 是否需要校验
if (server.rdb_checksum)
rdb.update_cksum = rioGenericUpdateChecksum;
snprintf(magic, sizeof(magic), "REDIS%04d", REDIS_RDB_VERSION);
if (rdbWriteRaw(&rdb, magic, 9) == -1)
goto werr;
// 遍历整个库
for (j = 0; j < server.dbnum; j++)
{
//哪个库
redisDb *db = server.db + j;
//库中具体存数据的字典
dict *d = db->dict;
// 字典是否为空
if (dictSize(d) == 0)
continue;
// 创建安全迭代器
di = dictGetSafeIterator(d);
if (!di)
{
fclose(fp);
return REDIS_ERR;
}
// 写入选择数据库标识符,REDIS_RDB_OPCODE_SELECTDB是特殊操作标识符
if (rdbSaveType(&rdb, REDIS_RDB_OPCODE_SELECTDB) == -1)
goto werr;
if (rdbSaveLen(&rdb, j) == -1)
goto werr;
// 遍历数据库
while ((de = dictNext(di)) != NULL)
{
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
// 根据 keystr ,在栈中创建一个 key 对象
initStaticStringObject(key, keystr);
expire = getExpire(db, &key);
// 保存键值对数据
if (rdbSaveKeyValuePair(&rdb, &key, o, expire, now) == -1)
goto werr;
}
dictReleaseIterator(di);
}
di = NULL;
// 写入某个数据库结束的特殊操作标识符
if (rdbSaveType(&rdb, REDIS_RDB_OPCODE_EOF) == -1)
goto werr;
// 校验和
cksum = rdb.cksum;
memrev64ifbe(&cksum);
rioWrite(&rdb, &cksum, 8);
if (fflush(fp) == EOF)
goto werr;
if (fsync(fileno(fp)) == -1)
goto werr;
if (fclose(fp) == EOF)
goto werr;
// 重命名覆盖原来的文件
if (rename(tmpfile, filename) == -1)
{
redisLog(REDIS_WARNING, "Error moving temp DB file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE, "DB saved on disk");
// 清零数据库脏状态
server.dirty = 0;
// 记录最后一次完成 SAVE 的时间
server.lastsave = time(NULL);
// 记录最后一次执行 SAVE 的状态
server.lastbgsave_status = REDIS_OK;
return REDIS_OK;
werr:
// 关闭文件
fclose(fp);
// 删除文件
unlink(tmpfile);
redisLog(REDIS_WARNING, "Write error saving DB on disk: %s", strerror(errno));
if (di)
dictReleaseIterator(di);
return REDIS_ERR;
}
// 保存键值
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
long long expiretime, long long now)
{
// 过期时间
if (expiretime != -1)
{
// 已过期
if (expiretime < now)
return 0;
// 以毫秒计算过期时间
if (rdbSaveType(rdb, REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1)
return -1;
if (rdbSaveMillisecondTime(rdb, expiretime) == -1)
return -1;
}
// 保存键值
// 保存值的类型
if (rdbSaveObjectType(rdb, val) == -1)
return -1;
// 保存键
if (rdbSaveStringObject(rdb, key) == -1)
return -1;
//保存值
if (rdbSaveObject(rdb, val) == -1)
return -1;
return 1;
}
// 保存值,根据不同值类型保存
int rdbSaveObject(rio *rdb, robj *o)
{
int n, nwritten = 0;
if (o->type == REDIS_STRING)
{
if ((n = rdbSaveStringObject(rdb, o)) == -1)
return -1;
nwritten += n;
}
else if (o->type == REDIS_LIST)
{
if (o->encoding == REDIS_ENCODING_ZIPLIST)
{
size_t l = ziplistBlobLen((unsigned char *)o->ptr);
if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1)
return -1;
nwritten += n;
}
else if (o->encoding == REDIS_ENCODING_LINKEDLIST)
{
list *list = o->ptr;
listIter li;
listNode *ln;
if ((n = rdbSaveLen(rdb, listLength(list))) == -1)
return -1;
nwritten += n;
listRewind(list, &li);
while ((ln = listNext(&li)))
{
robj *eleobj = listNodeValue(ln);
if ((n = rdbSaveStringObject(rdb, eleobj)) == -1)
return -1;
nwritten += n;
}
}
else
{
redisPanic("Unknown list encoding");
}
}
else if (o->type == REDIS_SET)
{
if (o->encoding == REDIS_ENCODING_HT)
{
dict *set = o->ptr;
dictIterator *di = dictGetIterator(set);
dictEntry *de;
if ((n = rdbSaveLen(rdb, dictSize(set))) == -1)
return -1;
nwritten += n;
while ((de = dictNext(di)) != NULL)
{
robj *eleobj = dictGetKey(de);
if ((n = rdbSaveStringObject(rdb, eleobj)) == -1)
return -1;
nwritten += n;
}
dictReleaseIterator(di);
}
else if (o->encoding == REDIS_ENCODING_INTSET)
{
size_t l = intsetBlobLen((intset *)o->ptr);
if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1)
return -1;
nwritten += n;
}
else
{
redisPanic("Unknown set encoding");
}
}
else if (o->type == REDIS_ZSET)
{
if (o->encoding == REDIS_ENCODING_ZIPLIST)
{
size_t l = ziplistBlobLen((unsigned char *)o->ptr);
if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1)
return -1;
nwritten += n;
}
else if (o->encoding == REDIS_ENCODING_SKIPLIST)
{
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
if ((n = rdbSaveLen(rdb, dictSize(zs->dict))) == -1)
return -1;
nwritten += n;
while ((de = dictNext(di)) != NULL)
{
robj *eleobj = dictGetKey(de);
double *score = dictGetVal(de);
if ((n = rdbSaveStringObject(rdb, eleobj)) == -1)
return -1;
nwritten += n;
if ((n = rdbSaveDoubleValue(rdb, *score)) == -1)
return -1;
nwritten += n;
}
dictReleaseIterator(di);
}
else
{
redisPanic("Unknown sorted set encoding");
}
}
else if (o->type == REDIS_HASH)
{
if (o->encoding == REDIS_ENCODING_ZIPLIST)
{
size_t l = ziplistBlobLen((unsigned char *)o->ptr);
if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1)
return -1;
nwritten += n;
}
else if (o->encoding == REDIS_ENCODING_HT)
{
dictIterator *di = dictGetIterator(o->ptr);
dictEntry *de;
if ((n = rdbSaveLen(rdb, dictSize((dict *)o->ptr))) == -1)
return -1;
nwritten += n;
while ((de = dictNext(di)) != NULL)
{
robj *key = dictGetKey(de);
robj *val = dictGetVal(de);
if ((n = rdbSaveStringObject(rdb, key)) == -1)
return -1;
nwritten += n;
if ((n = rdbSaveStringObject(rdb, val)) == -1)
return -1;
nwritten += n;
}
dictReleaseIterator(di);
}
else
{
redisPanic("Unknown hash encoding");
}
}
else
{
redisPanic("Unknown object type");
}
return nwritten;
}
//2.bgsave命令,bgsave会创建子进程进行保存
void bgsaveCommand(redisClient *c)
{
if (server.rdb_child_pid != -1)
{
addReplyError(c, "Background save already in progress");
}
else if (server.aof_child_pid != -1)
{
addReplyError(c, "Can't BGSAVE while AOF log rewriting is in progress");
}
else if (rdbSaveBackground(server.rdb_filename) == REDIS_OK)
{
addReplyStatus(c, "Background saving started");
}
else
{
addReply(c, shared.err);
}
}
int rdbSaveBackground(char *filename)
{
pid_t childpid;
long long start;
if (server.rdb_child_pid != -1)
return REDIS_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
start = ustime();
if ((childpid = fork()) == 0)
{
int retval;
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
// 执行保存操作
retval = rdbSave(filename);
if (retval == REDIS_OK)
{
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty)
{
redisLog(REDIS_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty / (1024 * 1024));
}
}
// 向父进程发送信号
exitFromChild((retval == REDIS_OK) ? 0 : 1);
}
else
{
/* Parent */
// 计算 fork() 执行的时间
server.stat_fork_time = ustime() - start;
// 如果 fork() 出错,那么报告错误
if (childpid == -1)
{
server.lastbgsave_status = REDIS_ERR;
redisLog(REDIS_WARNING, "Can't save in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
// 打印 BGSAVE 开始的日志
redisLog(REDIS_NOTICE, "Background saving started by pid %d", childpid);
// 记录数据库开始 BGSAVE 的时间
server.rdb_save_time_start = time(NULL);
// 记录负责执行 BGSAVE 的子进程 ID
server.rdb_child_pid = childpid;
// 关闭自动 rehash
updateDictResizePolicy();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
//3.dump.rdb恢复数据
// 在启动的main方法中执行loadDataFromDisk,会判断是否加载数据
int rdbLoad(char *filename)
{
uint32_t dbid;
int type, rdbver;
redisDb *db = server.db + 0;
char buf[1024];
long long expiretime, now = mstime();
FILE *fp;
rio rdb;
// 打开 rdb 文件
if ((fp = fopen(filename, "r")) == NULL)
return REDIS_ERR;
// 初始化写入流
rioInitWithFile(&rdb, fp);
rdb.update_cksum = rdbLoadProgressCallback;
rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(&rdb, buf, 9) == 0)
goto eoferr;
buf[9] = '\0';
// 检查版本号
if (memcmp(buf, "REDIS", 5) != 0)
{
fclose(fp);
redisLog(REDIS_WARNING, "Wrong signature trying to load DB from file");
errno = EINVAL;
return REDIS_ERR;
}
rdbver = atoi(buf + 5);
if (rdbver < 1 || rdbver > REDIS_RDB_VERSION)
{
fclose(fp);
redisLog(REDIS_WARNING, "Can't handle RDB format version %d", rdbver);
errno = EINVAL;
return REDIS_ERR;
}
// 将服务器状态调整到开始载入状态
startLoading(fp);
while (1)
{
robj *key, *val;
expiretime = -1;
/* Read type.
*
* 读入类型指示,决定该如何读入之后跟着的数据。
*
* 这个指示可以是 rdb.h 中定义的所有以
* REDIS_RDB_TYPE_* 为前缀的常量的其中一个
* 或者所有以 REDIS_RDB_OPCODE_* 为前缀的常量的其中一个
*/
if ((type = rdbLoadType(&rdb)) == -1)
goto eoferr;
// 读入过期时间值
if (type == REDIS_RDB_OPCODE_EXPIRETIME)
{
// 以秒计算的过期时间
if ((expiretime = rdbLoadTime(&rdb)) == -1)
goto eoferr;
/* We read the time so we need to read the object type again.
*
* 在过期时间之后会跟着一个键值对,我们要读入这个键值对的类型
*/
if ((type = rdbLoadType(&rdb)) == -1)
goto eoferr;
/* the EXPIRETIME opcode specifies time in seconds, so convert
* into milliseconds.
*
* 将格式转换为毫秒*/
expiretime *= 1000;
}
else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS)
{
// 以毫秒计算的过期时间
/* Milliseconds precision expire times introduced with RDB
* version 3. */
if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1)
goto eoferr;
/* We read the time so we need to read the object type again.
*
* 在过期时间之后会跟着一个键值对,我们要读入这个键值对的类型
*/
if ((type = rdbLoadType(&rdb)) == -1)
goto eoferr;
}
// 读入数据 EOF (不是 rdb 文件的 EOF)
if (type == REDIS_RDB_OPCODE_EOF)
break;
/* Handle SELECT DB opcode as a special case
*
* 读入切换数据库指示
*/
if (type == REDIS_RDB_OPCODE_SELECTDB)
{
// 读入数据库号码
if ((dbid = rdbLoadLen(&rdb, NULL)) == REDIS_RDB_LENERR)
goto eoferr;
// 检查数据库号码的正确性
if (dbid >= (unsigned)server.dbnum)
{
redisLog(REDIS_WARNING, "FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
exit(1);
}
// 在程序内容切换数据库
db = server.db + dbid;
// 跳过
continue;
}
/* Read key
*
* 读入键
*/
if ((key = rdbLoadStringObject(&rdb)) == NULL)
goto eoferr;
/* Read value
*
* 读入值
*/
if ((val = rdbLoadObject(type, &rdb)) == NULL)
goto eoferr;
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave.
*
* 如果服务器为主节点的话,
* 那么在键已经过期的时候,不再将它们关联到数据库中去
*/
if (server.masterhost == NULL && expiretime != -1 && expiretime < now)
{
decrRefCount(key);
decrRefCount(val);
// 跳过
continue;
}
/* Add the new object in the hash table
*
* 将键值对关联到数据库中
*/
dbAdd(db, key, val);
/* Set the expire time if needed
*
* 设置过期时间
*/
if (expiretime != -1)
setExpire(db, key, expiretime);
decrRefCount(key);
}
/* Verify the checksum if RDB version is >= 5
*
* 如果 RDB 版本 >= 5 ,那么比对校验和
*/
if (rdbver >= 5 && server.rdb_checksum)
{
uint64_t cksum, expected = rdb.cksum;
// 读入文件的校验和
if (rioRead(&rdb, &cksum, 8) == 0)
goto eoferr;
memrev64ifbe(&cksum);
// 比对校验和
if (cksum == 0)
{
redisLog(REDIS_WARNING, "RDB file was saved with checksum disabled: no check performed.");
}
else if (cksum != expected)
{
redisLog(REDIS_WARNING, "Wrong RDB checksum. Aborting now.");
exit(1);
}
}
// 关闭 RDB
fclose(fp);
// 服务器从载入状态中退出
stopLoading();
return REDIS_OK;
eoferr: /* unexpected end of file is handled here with a fatal exit */
redisLog(REDIS_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now.");
exit(1);
return REDIS_ERR; /* Just to avoid warning */
}
基于版本3.0.0版本,点击下载https://download.redis.io/releases/redis-3.0.0.tar.gz
本文地址,https://www.ccagml.com/?p=432