Redis可以接受并发请求但它是不是一个能够并发处理请求的服务器, 所有的操作都必须被单个线程在内存里快速完成操作, 所以他没有context switching, 没有false sharing, 寄存器和cache一直是热的. 但是缺点也很明显, 当数据比较大的时候, 实际可用的最慢的操作也必须是O(logn), 如果有一个O(n)的操作, 那么这个耗时操作将会阻塞后继的客户端请求. 而且处理器利用率不足, 在一个32核的服务器上也只能用其中一个核心处理大多数请求(housekeeping可以在其他核心上执行). 如果要充分利用处理器, 必须跑30+个redis进程. 但是这30多个进程彼此之间又不是共享数据的, 因此伸缩性受限. 但是redis仍然是一个非常有特色, 能够非常好的解决特定问题的应用. 本文可以帮助读者快速了解到redis在引擎盖下的事件机制, rdb存储, 和集群复制的实现细节, 由此告诉读者redis的一些使用时需要注意的问题.

事件机制简介

如果想深入了解redis, 那么你可能会问最简单的一个问题, set x 1是怎样处理的? 让我们从redis的Event Loop架构看起.

Redis没有使用libevent或libev, Redis用的自己的ae.c做nio的event lib. 因为libevent就比redis的code base要大很多, redis作者认为libevent不够轻量. redis初始化涉及三个部分, server.c, ae.c和networking.c. 前者会调用后面两个部分完成服务器的初始化. 注意, redis的代码一直在演化, 为了更好适配手机显示, 本文去掉了一些注释做了一点格式化处理, 所以你看到的代码可能会有所不同.

当服务器启动的时候, 从入口server.c的main()会执行到initServer(), 然后调用ae.c的aeCreateEventLoop去创建EventLoop和内存DB

server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
server.db = zmalloc(sizeof(redisDb)*server.dbnum);

aeCreateEventLoop的代码是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;

if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */

for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;

err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}

其中第16行aeApiCreate会调用底层的epoll/kqueue/select. 我的电脑是一台MacBook Pro, 所以在我的电脑上会去执行BSD的kqueue, 进入ae_kqueue.c, 如果是Linux会进入ae_epoll.c.

1
2
3
4
5
6
7
8
9
10
11
12
13
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif

在Mac/BSD下调用kqueue的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1;
state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->kqfd = kqueue();
if (state->kqfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}

state->kqfd = kqueue() 这一行拿到了kqueue的fd保存在state->kqfd里. 至此kqueue初始化结束, EventLoop在返回之后保存在server.el里.

如果是Linux, 创建epoll的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;

retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;

numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}

创建好了epoll/kqueue, 我们有了server.el这个Event Loop之后我们要去注册感兴趣的io事件了. initServer()紧接着调用aeCreateFileEvent()去创建event handler. 这里server有ipfd也有sofd, 前者是TCP监听端口的fd, 后者是unix socket的fd. 我们这里只关心ipfd.

1
2
3
4
5
6
7
8
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */

for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.ipfd file event.");
}
}

第一个我们感兴趣的事件就是新客户端的连接对吧, 所以上面的代码中acceptTcpHandler就是来做这个事情的. 它是定义在networking.c里的处理接受新进入的TCP连接的函数, 每当一个新的连接进入的EventLoop时候, redis会从event loop取出这个事件调用acceptCommonHandler, 然后再调用createClient去创建一个封装好的client对象, event loop的代码稍后在看, 此处我们先看如何创建一个client.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR) {
close(fd);
zfree(c);
return NULL;
}
}

我们可以看到redis默认是把TCP连接设置为non blocking, 打开了TCP_NODELAY来牺牲吞吐换低延迟, aeCreateFileEvent的调用会把客户端句柄fd的读事件注册到kqueue/epoll, 然后用readQueryFromClient的函数处理客户端请求. 至此, 客户端已经可以接受请求了.

现在回过头来看redis怎样从event loop取出新连接事件和客户端查询事件. redis服务的event loop就在aeMain里. 几乎是在server.c的main入口的最后一句, redis会进入aeMain:

1
2
3
4
5
    aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}

aeMain是一个busy loop, 他会不停地循环, 调用aeProcessEvents去处理Event Loop里的事件. 在aeProcessEvents里, 事件被读取之后没有交给其它的线程去拼包处理, 而是在当前线程直接处理, 没有线程切换, CPU的cache line一直很热, 这样效率很高. Jetty曾经有一次重构尝试让读取post body和解析post body在两个线程里执行来提升并发能力, 结果测试发现性能反而会下降, 也是同样的原因, cache line要热起来CPU的每个cycle才能充分利用起来. redis主进程是nio的单线程模型, 原因在此.

1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

如果有新客户端连接或者有查询的事件到达Event Loop, 那么aeProcessEvents会调用aeApiPoll去从kqueue/epoll检查新事件.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;

if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}

这里的关键在于这一行

numevents = aeApiPoll(eventLoop, tvp);

无论是epolll还是kqueue, 这里都会从kernel拿回来eventLoop的事件封装到eventLoop->apidata->events内, ae.c提供了这样一次抽象.

另外一点要注意: tvp是没有事件的情况下的等待时间, 因为这是一个busy loop, 如果持续没有事件到达我们不能不停空转, 那样会消耗掉所有CPU资源.

Mac下使用kqueue的实现(ae_kqueue.c):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;

if (tvp != NULL) {
struct timespec timeout;
timeout.tv_sec = tvp->tv_sec;
timeout.tv_nsec = tvp->tv_usec * 1000;
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout);
} else {
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, NULL);
}

if (retval > 0) {
int j;
numevents = retval;
for(j = 0; j < numevents; j++) {
int mask = 0;
struct kevent *e = state->events+j;
if (e->filter == EVFILT_READ) mask |= AE_READABLE;
if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->ident;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}

Linux下使用epoll的实现(ae_epoll.c)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;

retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}

从kqueue/epoll得到事件之后就是标准的Nio做法, 读取事件的buffer. 如果一个command比较大会产生多次read事件. 不过因为大多数redis的command都很短, 所以一般不太会出现.

对于一个set x 1这样的操作, 客户端的请求字节流通过kqueue/epoll取出来到buffer之后, 接下来就是调用 fe->rfileProc(eventLoop,fd,fe->clientData,mask); 去处理. 对于返回的写入, 通过 fe->wfileProc(eventLoop,fd,fe->clientData,mask);去处理.

1
2
3
4
5
6
7
8
9
10
    if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}

那么fe->wfileProc和fe->rfileProc是什么呢? 还记得刚才创建client对象的时候注册的event handler么? networking.c里的readQueryFromClient这个函数就是fe->rfileProc. 这个函数会调用processInputBuffer去解析client的buffer中的请求. 接下来就是各种解析请求的代码, 比较繁琐, 这里不列出来了.

至此, redis的事件模型我们就介绍完了. 接下来我们看redis如何处理set x 1这个命令.

处理请求, Command Handler

当用户请求被networkign.c里的processInputBuffer处理解析之后会由server.c中的processCommand(client *c)接下来处理. 服务器在初始化的时候还会做一个command table, 里面包含了所有的命令, 此处processCommand会把客户端请求解析出来之后查表找到这个command.

c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

最后执行这个command, 如果客户端发到是MULTI原子事物的话, 这里会先入队列等待EXEC到达才执行, 否则立即执行call函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;

call函数比较长, 里面最重要的一行就是

c->cmd->proc(c);

c->cmd就是刚才从command table里找出来的command handler, 这里set x 1执行的是set command, 其实就是t_string.c里的setCommand(client *c).

1
2
3
4
5
6
setKey(c->db,key,val);
server.dirty++;
if (expire) setExpire(c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire",key,c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);

最终t_string.c会setKey(c->db,key,val), 然后addReplly(c, ok_reply ? ok_reply : shared.ok); 此处因为ok_reply为NULL, 所以只要回传一个共享的常量ok.

那么setKey是怎样更改内存和写入rdb文件的呢? 接下来我们看redis的存储.

存储

这里的key和val都是robj的指针, robj是redis里最基础的存储结构. 定义如下:

1
2
3
4
5
6
7
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* lru time (relative to server.lruclock) */
int refcount;
void *ptr;
} robj;

其中type可以是五种类型

1
2
3
4
5
#define REDIS_STRING 0
#define REDIS_LIST 1
#define REDIS_SET 2
#define REDIS_ZSET 3
#define REDIS_HASH 4

encoding是指实际的存储方式, redis的存储有很多的优化, 比如string类型的”123”可以变成整数123. lru和refcount一看名字就知道干什么的了. 而这个ptr指针是void*的, 它可以指向任何东西, 保存着真正的内容.

如果你执行的是”set x 1”那么debug输出为:

(lldb) p (char*) (key->ptr)
(char *) $17 = 0x0000000100630253 "x"
(lldb) p val->ptr
(void *) $19 = 0x0000000000000001

保存这些key/val的是c->db, 这是一个redisDb的对象

1
2
3
4
5
6
7
8
9
10
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
struct evictionPoolEntry *eviction_pool; /* Eviction pool of keys */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;

这个redisDb的id就是你select的那个db id, 第一个dict就是存储key/val的地方. dict的定义如下:

1
2
3
4
5
6
7
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
} dict;

其中我们看到有两个dictht, 这是因为redis做了一个拿空间换低延迟的优化. 我们看看dict.c中的dictAddRaw函数:

1
2
3
4
5
6
7
8
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;

/* Set the hash entry fields. */
dictSetKey(d, entry, key);

一般情况下请求被写入到ht[0], 但是在rehashing的时候, 请求被写入到ht[1], 这时候查询请求会访问两个ht. 这个过程中ht[0]会向ht[1]复制, 直到ht[0].used=0, 全部复制结束了,这时候rehash结束, redis会把ht[0].table指向ht[1].table, 并释放ht[1].

至于dictht还是非常简单的, 就是传统的hashtable, 冲突是通过链表追加的.

1
2
3
4
5
6
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;

至于get x这样的请求, 处理也是非常类似的. 读到这里你已经可以给redis增加自己的command了.

RDB的导出

rdb是redis持久化的重要基础, 除了配合AOF文件保证数据尽可能的一致不丢之外, rdb还在集群复制中有重要的作用. Redis 3.0之后的集群模式的基础还是master/slave复制, 复制的时候master会以rdb文件作为一个基础snapshot, 然后增量的把所有的变化复制到slave. 所以rdb导出是一个非常重要的功能.

因为rdb可能会有十几个G, 而磁盘又比较慢, redis对内存的读写又是单线程的, 所以高效的导出rdb不是很容易. redis采取了Copy-On-Write的方式来解决这个问题, 当一个rdb需要被导出的时候redis会fork出来一个child process, 这个process会和parent process共享虚存, 当有一个page被更改的时候child process会复制那个page. 如果你的磁盘很慢, 更新又很频繁那么是有可能会造成一个时间窗口内的延迟发生抖动的.

rdb的导出发生在rdb.c的rdbSaveBackground, 下面fork调用就会分为两个进程去执行, parent process会得到childpid不为0, 执行到后面的else, parent基本没事干直接返回去处理客户端请求去了, child process会得到0进入if分支, 去调用rdbSave干具体的活.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    start = ustime();
if ((childpid = fork()) == 0) {
int retval;
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
server.lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return C_ERR;
}
serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return C_OK;
}
return C_OK; /* unreached */
}

rdbSave会调用rdbSaveRio去保存文件. 保存文件过程中, 用一个iterator遍历所有的key, 然后调用rdb.c的rdbSaveKeyValuePair去保存每个entry. 保存代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
long long expiretime, long long now)
{
/* Save the expire time */
if (expiretime != -1) {
/* If this key is already expired skip it */
if (expiretime < now) return 0;
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}

/* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val) == -1) return -1;
return 1;
}

有两条路径可以触发这部分代码, 一个是定期的, 一个是主从复制请求的.

定期导出的频率取决于配置, 以下配置中第一行表示900秒后只要超过一条数据变化就会被dump. 第二行表示300秒后如果有10条记录发生变化就会dump.

save 900 1 save 300 10
save 60 10000

系统的事件循环中aeMain调用aeProcessEvents除了调用之前提到的networking和command, 还会调用一个processTimeEvents, 服务器初始化的时候会在eventLoop里加入一些timer events, 其中有这里会执行的serverCron. 复制的时候serverCron会在master process里每秒10次执行 (server.hz的配置), 然后按照rdb复制间隔的配置调用rdbSaveBackground. 这个触发过程比较简单.

另外一条触发路径是slave发送SYNC/PSYNC过来主动要求同步的时候. 一个新启动的slave会向它的master发起一个PSYNC请求(partial sync)发起同步, 这个过程稍微复杂点. 当master接收到slave的PSYNC请求后调用syncCommand, 然后调用masterTryPartialResynchronization发起真正的同步. 如果之前有过中断的那么master会发回一个CONTINUE, 如果是第一次PSYNC或者master重启过导致无法增量传输, 那么master会返回C_ERR让slave从头开始. 接下来master看是否有正在执行的rdb导出, 如果没有就通过startBgsaveForReplication启动一个, 如果有就等下次BGSAVE结束, 接下来的步骤和第一种触发基本一样.

值得一提的是从2.8之后, redis的复制可以在master上开一个in-memory backlog, 来解决连接断开重连之后要重新同步的问题. 如果master不重启那么runid就不会变, 所有的slave都会得到同一个runid, 只要backlog足够大, 断开时间足够短, slave就不用全量复制, slave只要看看runid变了没, 没变的话把上次的offset发给master, master就可以从backlog继续给slave发数据. 而老版本的redis每次slave连上server都要全量同步一次rdb snapshot. 如果已经有了10G的数据, 传输快完成的时候网络断了一会, 那么slave要再传输10G过来. 增量复制的代码可以在slaveTryPartialResynchronization和masterTryPartialResynchronization中找到.

集群复制

了解了rdb导出, 下面来看如何把snapshot传给slave.

每次rdb导出结束的时候会调用到backgroundSaveDoneHandler, 并且最终调用updateSlavesWaitingBgsave去打开rdb文件, 把文件fd保存到slave->repldbfd里. 然后用aeCreateFileEvent把可写事件注册到event loop中, 并把回调函数设为sendBulkToSlave.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = SLAVE_STATE_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);

aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}

在可写的回调函数sendBulkToSlave中, 把文件指针挪到slave-repldboff这个偏移量上, 读出buflen个字节, 然后向slave发送, 直到完全复制结束后调用putSlaveOnline(slave)让slave上线.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    leek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
if (buflen <= 0) {
serverLog(LL_WARNING,"Read error sending DB to slave: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}
if ((nwritten = write(fd,buf,buflen)) == -1) {
if (errno != EAGAIN) {
serverLog(LL_WARNING,"Write error sending DB to slave: %s",
strerror(errno));
freeClient(slave);
}
return;
}
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
putSlaveOnline(slave);
}

rdb导出的过程中, 如果有后继新的请求进入master, 那么master会在call()处理请求的时候顺便把请求指令通过propagate/replicationFeedSlaves写入一个环状buffer, 这就是server.repl_backlog.

内存使用

redis的内存使用非常高效, 有很多不错的优化.

比如, hset/hget.

1
2
3
4
HSET daniel gender male
HSET daniel age 36
HGET daniel gender
"male"

不看代码你会猜测hset/hget会用一个dict来保存daniel的各个字段对吧? 其实redis会做一个小的key=value的array, 这样可以避免填充因子, 节省内存. 如果array太大才会变成hash table. 有趣的是, 从经典算法分析来看, 数组查找O(N)比hash table O(1)速度会慢, 但是处理器的cache line通常是64字节, 如果64字节以内的多个字段都在一个cache line里, 这时候数组的查找性能不差, 甚至更好, 因为只要一次访问主存就可以把所有数据都放在L1, 而hash table的entry可能会保存在多个cache line里, 需要多次访问内存, 而访问内存要几百个CPU cycle才能完成. 另外因为redis是单进程执行, cache line又不会有false sharing, 所以redis的性能非常稳定.

有一个影响redis效率的情况是他的指针大小. 如果你真的想要节省内存, 那么你可以按照32位模式编译, 这样虽然有4G虚存的限制, 但是你可以跑很多个进程. 32位模式的好处是redis里每个对象指针只要4bytes. Redis不支持指针压缩这很遗憾. JVM这一点做的就非常好, 在64位VM中只要虚存不超过32G, 指针就会被压缩成8字节. (内存控制器的读取必须要按照byte访问, 所以指针的低3位永远都是0, 因此可以用一个32bit的指针, 右移3位, 变成实际35位的指针, 那么寻址是4G*2^3=32G. decode的时候, 高32位全0, 左移3位到高32位, 得到35位有效的指针). 希望redis未来能够实现类似JVM的指针压缩, 在64位32G内存的情况下获得和32位一样的空间优势.

但是redis也有一些小坑需要注意, 比如:

老版本的Redis自己管理swap file, key总是在内存, value可能会被交换出去, 从2.4之后, Redis不再支持VM. 数据必须全部放在内存里. 所以redis不是一个数据库! 最多算是一个In-memory database. 所以在使用redis的时候数据量的预估非常重要. 还有一点, 你从top里看到RSS如果是16G, 但是redis报告只用了8G, 这时候不要惊奇. 这是因为有很多page中有残留的活动数据, redis不做compaction.

但是大多数人遇到的主要问题可能是redis做fork时的延迟抖动. 通过刚才代码的分析我们知道当Redis生成rdb snapshot的时候, master process会fork一个child process, child process可以看到master process的内存, 而且只有当master process的page发生改变之后child process才会把改变的page复制到自己的VM. 如果snapshot不是非常大, 写入不是非常频繁, 可能大多数page都没有变化, 所以如果我们允许linux overcommit可以让child process在剩余内存比较少的时候仍然能够导出snapshot. 但是如果写入非常频繁, 在最极端的情况下, 理论上snapshot的导出可能需要两倍的内存. 而且heap越大fork的时间越久, 允许overcommit就会有非常大的风险. 你可以考虑关闭overcommit, 但是不管怎样保留足够大的空闲内存非常有必要. 即便你有足够大内存, 假如你的redis占用了很多内存, fork的时候仍然是一个比较消耗资源的工作. 下面这张表格是不同大小的redis内存使用情况下做一个fork需要的时间 (来源: https://redislabs.com/blog/testing-fork-time-on-awsxen-infrastructure#.VpOyZFnvNSw )

Instance type Memory limit Memory used Memory usage Fork time
m1.small 1.7 GB 1.22 GB 71.76% 0.76 sec
m1.large 7.5 GB 5.75 GB 76.67% 1.98 sec
m1.xlarge 15 GB 11.46 GB 76.40% 3.46 sec
m2.xlarge 34 GB 24.8 GB 72.94% 5.67 sec
cc1.4xlarge 23 GB 18.4 GB 80.00% 0.22 sec

前四行是Xen Hypervisor的实例, 明显内存越大, fork越慢. (最后一行是Xen HVM的实例, 速度要好很多, fork在不同虚拟化技术上的表现不太一致.) 可见, 复制会产生一定的延迟抖动. 如果redis能够在99%的情况下用非常低的延迟处理完请求, 但是会有1%的请求延迟很高, 这也是不能接受的. 原因有两个. 从用户体验来讲, 一个页面通常会有很多请求, 2012年的大型网站平均每个页面有42个请求. 如果99%的延迟很好, 0.99^42=0.66, 也就是说会有33%的用户会看到页面的某个部分显示的很慢, 如果碰巧是页面加载的关键路径, 那么会导致整个页面渲染都非常慢. 另外一个角度来看, redis本身的TPS可以轻松达到100K以上, 而redis是单线程模型, 一单一个请求出现延迟, 对整体吞吐影响非常的大. 所以对抗延迟是redis的一个主要问题. 除非数据非常小, 我们一般是不能用O(N)的操作的, 而且不要再xen上做复制, heap要尽量小. Redis有slowlog get N 或者 latency命令可以查看延迟情况. 另外THP (transparent hugepage)也尽量禁止, 因为THP可以分配2MB的huge page, 那么fork的时候CoW即使很少的改动也会copy整个2MB的huge page,这时候反而不如4KB的page效率高。极端情况下fork可能真的需要两倍的内存!另外,因为redis可以使用jemalloc,当它归还内存的时候如果是MADV_DONTNEED,那么kernel有时是无法真正归还整个huge page的,你最多就会有2MB的内存浪费掉, 结果就是RSS飙高。

要不要连接池

用不用连接池这是一个很有趣的问题, 很多人都会问, redis基本上是一个单线程模型的程序, 那么是不是每个客户端只要一个连接就足够了呢? 如果是一个多线程的客户端我是不是一个只要一个固定大小为1的连接池就足够了么? 简单讲, 不够!

来看一个例子: multi.c里接收到multi指令之后只是把client->flags翻为CLIENT_MULTI

1
2
3
4
5
6
7
8
void multiCommand(client *c) {
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
c->flags |= CLIENT_MULTI;
addReply(c,shared.ok);
}

然后客户端的请求会在客户端入队列, 直到exec一起发过来, 其中call那一行就是具体的执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;

/* write to AOF and slave */
if (!must_propagate && !(c->cmd->flags & CMD_READONLY)) {
execCommandPropagateMulti(c);
must_propagate = 1;
}

call(c,CMD_CALL_FULL);

/* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}

从这个过程来看, 单个客户端multi - exec之间是不能干别的事情的. 客户端中某个线程发出multi, 发出command 1, 然后进行一些本地的耗时计算, 再发出command 2, exec这样的命令序列, 客户端会对指令进入本地待发队列, 其他线程是无法在redis事务中进行操作的. 你的耗时计算要多久, 其它线程就要等待多久. 如果你有一个连接池, 包含多个连接, 那么其他线程会继续把并发请求至少都可以先发出去, 而不是要等这个非常慢的请求的response回来后继的请求才能发出去, 降低了延迟. 当然了, 这不是一个特别好的例子, 因为任何稍微了解redis的程序员都不会在multi-exec之间做耗时操作, 这是一个极端化的例子, 更一般的情况是网络传输延迟的隐藏. 尽管redis单线程处理, 但是请求可以先排队进入read buffer, 多个连接可以让请求不需要等慢的请求先行处理. 这在网络延时比较高而且不方便用pipeline的时候比较有用, 我们可以拿并发来隐藏延迟.

所以, 客户端有没有必要用连接池? 有, 大小? 小点, 太大了没用.

提及到低延迟的设计, 我们结尾的时候顺便提一句, redis是一个追求低延迟的设计, 有时候甚至会牺牲吞吐来换得低延迟, 比如创建连接的时候redis会禁止Nagle算法

1
2
3
4
5
6
7
8
9
10
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) {
close(fd); zfree(c); return NULL;
}
}

如果有大量不足一个MSS的小消息, 比如一个字节的消息, 那么IP header本身需要20字节, TCP header需要20字节, 这样一个1字节的消息需要41个字节, 禁止了Nagle意味着客户端不会等待消息堆积到一个MSS再发出, 这相当于会有40倍的带宽被浪费, 吞吐会大幅下降. 如果是redis独享的服务器, 可以调小tcp delayed ack的时间, 在吞吐和延迟之间找一个平衡.

对于redis来讲, 低延迟是第一设计目标, 在满足这个目标的前提下尽量提高吞吐.

总结

Redis是一个非常独特的应用, 单线程这种模式让它没有线程切换, CPU cache line总是很热, 跑起来延迟非常小, 但是他的单线程模型有时候也让人觉得很不方便, 尤其是部署在一个很多处理器核心的服务器上的时候, 尤其是要注意多个redis实例万一同时做fork的时候, 内存很可能会瞬间吃紧. 总之, redis和其他优秀的应用一样, 它不是银弹, 只有同时理解redis的优势和缺陷, 你才能更好地驾驭它.