• redis源码从main开始1中initServerConfig方法初始化配置,在这之后,initServer方法根据配置创建redis所需要的相关内容,创建监听端口等相关操作

void initServer()
{
    int j;

    // 设置信号处理函数
    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
    setupSignalHandlers();

    // 设置 syslog
    if (server.syslog_enabled)
    {
        openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
                server.syslog_facility);
    }
    // 初始化并创建数据结构
    server.current_client = NULL;
    server.clients = listCreate(); // 一个链表,保存了所有客户端状态结构
    server.clients_to_close = listCreate(); // 链表,保存了所有待关闭的客户端
    server.slaves = listCreate();           // 链表,保存了所有从服务器
    server.monitors = listCreate();         // 链表,保存了所有监视器
    server.slaveseldb = -1; /* Force to emit the first SELECT command. */
    server.unblocked_clients = listCreate(); // 取消所有在 unblocked_clients 链表中的客户端的阻塞状态
    server.ready_keys = listCreate();        //  If the specified key has clients blocked waiting for list pushes, this* function will put the key reference into the server.ready_keys list.
    server.clients_waiting_acks = listCreate(); // 等待命令返回的客户端
    server.get_ack_from_slaves = 0;             /* If true we send REPLCONF GETACK. */
    server.clients_paused = 0;                  /* True if clients are currently paused */

    // 创建共享对象 通过复用来减少内存碎片,以及减少操作耗时的共享对象
    createSharedObjects();
    adjustOpenFilesLimit();                                                         // 检查文件打开数够不够
    server.el = aeCreateEventLoop(server.maxclients + REDIS_EVENTLOOP_FDSET_INCR);  //创建事件循环
    server.db = zmalloc(sizeof(redisDb) * server.dbnum);

    /* Open the TCP listening socket for the user commands. */
    // 打开 TCP 监听端口,用于等待客户端的命令请求
    if (server.port != 0 &&
        listenToPort(server.port, server.ipfd, &server.ipfd_count) == REDIS_ERR)
        exit(1);

    /* Open the listening Unix domain socket. */
    // 打开 UNIX 本地端口
    if (server.unixsocket != NULL)
    {
        unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr, server.unixsocket,
                                     server.unixsocketperm, server.tcp_backlog);
        if (server.sofd == ANET_ERR)
        {
            redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL, server.sofd);
    }

    /* Abort if there are no listening sockets at all. */
    if (server.ipfd_count == 0 && server.sofd < 0)
    {
        redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
        exit(1);
    }

    /* Create the Redis databases, and initialize other internal state. */
    // 创建并初始化数据库结构
    for (j = 0; j < server.dbnum; j++)
    {
        server.db[j].dict = dictCreate(&dbDictType, NULL);
        server.db[j].expires = dictCreate(&keyptrDictType, NULL);
        server.db[j].blocking_keys = dictCreate(&keylistDictType, NULL);
        server.db[j].ready_keys = dictCreate(&setDictType, NULL);
        server.db[j].watched_keys = dictCreate(&keylistDictType, NULL);
        server.db[j].eviction_pool = evictionPoolAlloc();
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
    }

    // 创建 PUBSUB 相关结构
    server.pubsub_channels = dictCreate(&keylistDictType, NULL);
    server.pubsub_patterns = listCreate();
    listSetFreeMethod(server.pubsub_patterns, freePubsubPattern);
    listSetMatchMethod(server.pubsub_patterns, listMatchPubsubPattern);

    server.cronloops = 0; // /* Number of times the cron function run */
    server.rdb_child_pid = -1; /* PID of RDB saving child */
    server.aof_child_pid = -1; /* PID if rewriting process */
    aofRewriteBufferReset();
    server.aof_buf = sdsempty();
    server.lastsave = time(NULL); /* At startup we consider the DB saved. */
    server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
    server.rdb_save_time_last = -1;
    server.rdb_save_time_start = -1;
    server.dirty = 0;
    resetServerStats();
    /* A few stats we don't want to reset: server startup time, and peak mem. */
    server.stat_starttime = time(NULL); /* Server start time */
    server.stat_peak_memory = 0;        /* Max used memory record */
    server.resident_set_size = 0;       /* RSS sampled in serverCron(). */
    server.lastbgsave_status = REDIS_OK;
    server.aof_last_write_status = REDIS_OK;
    server.aof_last_write_errno = 0;
    server.repl_good_slaves_count = 0;
    updateCachedTime();

    /* Create the serverCron() time event, that's our main way to process
     * background operations. */
    // 为 serverCron() 创建时间事件
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR)
    {
        redisPanic("Can't create the serverCron time event.");
        exit(1);
    }

    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    // 为 TCP 连接关联连接应答(accept)处理器
    // 用于接受并应答客户端的 connect() 调用
    for (j = 0; j < server.ipfd_count; j++)
    {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                              acceptTcpHandler, NULL) == AE_ERR)
        {
            redisPanic(
                "Unrecoverable error creating server.ipfd file event.");
        }
    }

    // 为本地套接字关联应答处理器
    if (server.sofd > 0 && aeCreateFileEvent(server.el, server.sofd, AE_READABLE,
                                             acceptUnixHandler, NULL) == AE_ERR)
        redisPanic("Unrecoverable error creating server.sofd file event.");

    /* Open the AOF file if needed. */
    // 如果 AOF 持久化功能已经打开,那么打开或创建一个 AOF 文件
    if (server.aof_state == REDIS_AOF_ON)
    {
        server.aof_fd = open(server.aof_filename,
                             O_WRONLY | O_APPEND | O_CREAT, 0644);
        if (server.aof_fd == -1)
        {
            redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
                     strerror(errno));
            exit(1);
        }
    }

    /* 32 bit instances are limited to 4GB of address space, so if there is
     * no explicit limit in the user provided configuration we set a limit
     * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
     * useless crashes of the Redis instance for out of memory. */
    // 对于 32 位实例来说,默认将最大可用内存限制在 3 GB
    if (server.arch_bits == 32 && server.maxmemory == 0)
    {
        redisLog(REDIS_WARNING, "Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
        server.maxmemory = 3072LL * (1024 * 1024); /* 3 GB */
        server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
    }

    // 如果服务器以 cluster 模式打开,那么初始化 cluster
    if (server.cluster_enabled)
        clusterInit();

    // 初始化复制功能有关的脚本缓存
    replicationScriptCacheInit();

    // 初始化脚本系统
    scriptingInit();

    // 初始化慢查询功能
    slowlogInit();

    // 初始化 BIO 系统
    bioInit();
}


int listenToPort(int port, int *fds, int *count)
{
    int j;

    /* Force binding of 0.0.0.0 if no bind address is specified, always
     * entering the loop if j == 0. */
    if (server.bindaddr_count == 0)
        server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++)
    {
        if (server.bindaddr[j] == NULL)
        {
            /* Bind * for both IPv6 and IPv4, we enter here only if
             * server.bindaddr_count == 0. */
            fds[*count] = anetTcp6Server(server.neterr, port, NULL,
                                         server.tcp_backlog);
            if (fds[*count] != ANET_ERR)
            {
                anetNonBlock(NULL, fds[*count]);
                (*count)++;
            }
            // 接收监听
            fds[*count] = anetTcpServer(server.neterr, port, NULL,
                                        server.tcp_backlog);
            if (fds[*count] != ANET_ERR)
            {
                anetNonBlock(NULL, fds[*count]);
                (*count)++;
            }
            /* Exit the loop if we were able to bind * on IPv4 or IPv6,
             * otherwise fds[*count] will be ANET_ERR and we'll print an
             * error and return to the caller with an error. */
            if (*count)
                break;
        }
        else if (strchr(server.bindaddr[j], ':'))
        {
            /* Bind IPv6 address. */
            fds[*count] = anetTcp6Server(server.neterr, port, server.bindaddr[j],
                                         server.tcp_backlog);
        }
        else
        {
            /* Bind IPv4 address. */
            fds[*count] = anetTcpServer(server.neterr, port, server.bindaddr[j],
                                        server.tcp_backlog);
        }
        if (fds[*count] == ANET_ERR)
        {
            redisLog(REDIS_WARNING,
                     "Creating Server TCP listening socket %s:%d: %s",
                     server.bindaddr[j] ? server.bindaddr[j] : "*",
                     port, server.neterr);
            return REDIS_ERR;
        }
        anetNonBlock(NULL, fds[*count]);
        (*count)++;
    }
    return REDIS_OK;
}


//ipv4开始监听, 返回socket监听的fd
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
    int s, rv;
    char _port[6]; /* strlen("65535") */
    struct addrinfo hints, *servinfo, *p;

    snprintf(_port, 6, "%d", port);
    memset(&hints, 0, sizeof(hints));
    hints.ai_family = af;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */

    if ((rv = getaddrinfo(bindaddr, _port, &hints, &servinfo)) != 0)
    {
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    for (p = servinfo; p != NULL; p = p->ai_next)
    {
        if ((s = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
            continue;

        if (af == AF_INET6 && anetV6Only(err, s) == ANET_ERR)
            goto error;
        if (anetSetReuseAddr(err, s) == ANET_ERR)
            goto error;
        if (anetListen(err, s, p->ai_addr, p->ai_addrlen, backlog) == ANET_ERR)
            goto error;
        goto end;
    }
    if (p == NULL)
    {
        anetSetError(err, "unable to bind socket");
        goto error;
    }

error:
    s = ANET_ERR;
end:
    freeaddrinfo(servinfo);
    return s;
}

int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{
    return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}

基于版本3.0.0版本,点击下载https://download.redis.io/releases/redis-3.0.0.tar.gz

本文地址,https://www.ccagml.com/?p=363

发表评论