libevent 的并发工作做得太好了

我最近的工作呢,主要是在梳理我司核心平台的档案。

话说我司的这个后台,那可是牛逼的很啊。它作为基础设施,从无到有搞起来,前辈们把它撸得风生水起,支撑了其他产品的正常运转近十年。

系统的架构,小明在入职时就已经被培训过了,但是里面各个模块的细节却知之甚少。这段时间,计划把后台每个模块的代码 one by one 地啃一遍。这不,发现有好几个模块都用到了 libevent 这个库,但我之前没用过诶,为了看懂代码,必须要具备些 libevent 的基础知识。

libevent 是一个用C语言编写的、轻量级的开源高性能网络库。

主要有以下几个亮点:

  • 事件驱动( event-driven),高性能;
  • 轻量级,专注于网络通信;
  • 源代码相当精炼、易读;
  • 跨平台,支持 WindowsLinuxMac Os
  • 支持多种 I/O 多路复用技术;
  • 支持定时器和信号等事件;

官网上列出了使用这个库开发的各种应用:

原来大名鼎鼎的 ChromiumMemcached都用到了它,真是 excited

看来,当初前辈们选用 libevent 也是一个英明的撅腚啊。

要了解 libvent 如何牛逼,首先要知道传统的 socket 通信是如何的弱鸡。

一开始,大家使用的都是阻塞式I/O函数,如果一个函数在操作完成之前,或者在超时之前,都不会返回,那么就说这个函数是同步的。

比如当你对一个 TCP 连接调用 connect(),你的操作系统会有一个队列,一个保存发送出去的SYN 请求的队列。然后对于每个 SYN 请求,系统尝试等待 TCP 另一端返回对应的确认码,即SYN ACK。在确认码 ACK 返回之前,或者直到超时,同步的函数是不会返回,称之为阻塞I/O

下面有个使用阻塞I/O函数的例子,它打开一个连接,连接到 www.google.com,发送一个简单的HTTP请求,然后打印出返回内容到stdout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For gethostbyname */
#include <netdb.h>

#include <unistd.h>
#include <string.h>
#include <stdio.h>

int main(int c, char **v)
{
const char query[] =
"GET / HTTP/1.0\r\n"
"Host: www.google.com\r\n"
"\r\n";
const char hostname[] = "www.google.com";
struct sockaddr_in sin;
struct hostent *h;
const char *cp;
int fd;
ssize_t n_written, remaining;
char buf[1024];

/* Look up the IP address for the hostname.
Watch out;
this isn't threadsafe on most platforms. */
h = gethostbyname(hostname);
if (!h) {
fprintf(stderr, "Couldn't lookup %s: %s", hostname, hstrerror(h_errno));
return 1;
}
if (h->h_addrtype != AF_INET) {
fprintf(stderr, "No ipv6 support, sorry.");
return 1;
}

/* Allocate a new socket */
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
perror("socket");
return 1;
}

/* Connect to the remote host. */
sin.sin_family = AF_INET;
sin.sin_port = htons(80);
sin.sin_addr = *(struct in_addr*)h->h_addr;
if (connect(fd, (struct sockaddr*) &sin, sizeof(sin))) {
perror("connect");
close(fd);
return 1;
}

/* Write the query. */
/* XXX Can send succeed partially? */
cp = query;
remaining = strlen(query);
while (remaining) {
n_written = send(fd, cp, remaining, 0);
if (n_written <= 0) {
perror("send");
return 1;
}
remaining -= n_written;
cp += n_written;
}

/* Get an answer back. */
while (1) {
ssize_t result = recv(fd, buf, sizeof(buf), 0);
if (result == 0) {
break;
} else if (result < 0) {
perror("recv");
close(fd);
return 1;
}
fwrite(buf, 1, result, stdout);
}

close(fd);
return 0;
}

上面使用的网络相关函数都是阻塞式的。

  • gethostbyname在成功解析域名www.google.com或超时前是不会返回的;
  • connect在成功连接后才返回;
  • recv接收数据才返回,或者对方关闭了sock也会让recv返回;
  • send也阻塞,直到把数据复制到系统内核buffer之中。

如果你在同一时间内只做一个事情,I/O阻塞函数也没有什么不好。但假若你的程序里要同时响应多个连接,比如你需要同时从 2 个连接sock中接收数据,而且你不知道哪个数据先到来,那么,你不能这样写你的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* This won't work. */
char buf[1024];
int i, n;
while (i_still_want_to_read()) {
for (i=0; i<n_sockets; ++i) {
n = recv(fd[i], buf, sizeof(buf), 0);
if (n==0)
handle_close(fd[i]);
else if (n<0)
handle_error(fd[i], errno);
else
handle_input(fd[i], buf, n);
}
}

因为如果 fd[2]的数据先到来的话,这段代码不会想当然地马上去读取fd[2]的数据,因为
I/O是阻塞的,它必须读取完 fd[0]fd[1]的数据后才能读 fd[2],而你并不能事先保证哪个 fd 上的数据先到来。

当然也可以使用多个进程/线程来处理每个sock,每个sock的数据处理互不影响,A进程阻塞了,并不影响到B进程的工作。

那么,这是最好的同时处理多个连接的方案吗?

当然不是!

首先,在一些平台上,创建一个进程/线程的代价是很昂贵的。实际开发中你会使用一个线程池,而不是创建一个新进程。不过,假若你需要处理数以千万个连接,维护这么多线程,性能也许没有你期待的那么美好。

使用线程不是最好的答案。在Unix下,你可以设置sock为非阻塞,使用函数fcntl

1
2
3
4
5
6
7
8
9
10
11
fcntl(fd, F_SETFL, O_NONBLOCK); 
//fd对应于sock的文件描述符(file descriptor)

//其实一般先获取sock的flag,修改flag,再设置新的flag,如下大概
/* Set a socket as nonblocking */
int flags;
if ((flags = fcntl (fd, F_GETFL, 0)) < 0)
err_sys("F_GETFL error");
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0)
err_sys("F_SETFL error");

一旦你对sock fd设置非阻塞,那么对这个fd调用网络相关的函数,比如recv,函数会马上返回,这时你要检查返回码以及全局变量errno

从多个sock读取数据的代码段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* This will work, but the performance will be unforgivably bad. */
int i, n;
char buf[1024];
for (i=0; i < n_sockets; ++i)
fcntl(fd[i], F_SETFL, O_NONBLOCK);

while (i_still_want_to_read()) {
for (i=0; i < n_sockets; ++i) {
n = recv(fd[i], buf, sizeof(buf), 0);
if (n == 0) {
handle_close(fd[i]);
} else if (n < 0) {
if (errno == EAGAIN)
; /* The kernel didn't have any data for us to read. */
else
handle_error(fd[i], errno);
} else {
handle_input(fd[i], buf, n);
}
}
}

上面的代码也存在性能问题,2个原因:

  • 如果没有数据到来,代码不断循环,不断消耗cpu
  • 每次轮询都会调用系统调用,因为有没有数据可以读取,一般是检查内核数据buffer,这个过程由系统调用帮我们做检查。我们不断轮询,每次产生系统调用的消耗,这明显不是很环保的做法。

我们需要更为智能的方式,当数据最后可读时让内核主动告诉我们。

最古老的方式是使用 select:

1
2
3
4
5
int select(int nfds, 
fd_set *restrict readfds,
fd_set *restrict writefds,
fd_set *restrict errorfds,
struct timeval *restrict timeout);

select 系统调用使用了3sock fd集合, 分别对应:

  • 可读的fd集合,告诉select请检查这个集合内的fd,若其中某一个可读,请select返回,告诉我集合中有多少个fd有数据可以读了,其它两个也是类似的意思;
  • 可写的fd集合;
  • 异常的fd集合;

select 返回后使用 FD_ISSET 来测试具体是哪个 fd 有数据了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* If you only have a couple dozen fds, this version won't be awful */
fd_set readset;
int i, n;
char buf[1024];

while (i_still_want_to_read()) {
int maxfd = -1;
FD_ZERO(&readset);

/* Add all of the interesting fds to readset */
for (i=0; i < n_sockets; ++i) {
if (fd[i]>maxfd) maxfd = fd[i];
FD_SET(fd[i], &readset);
}

/* Wait until one or more fds are ready to read */
select(maxfd+1, &readset, NULL, NULL, NULL);

/* Process all of the fds that are still set in readset */
for (i=0; i < n_sockets; ++i) {
if (FD_ISSET(fd[i], &readset)) {
n = recv(fd[i], buf, sizeof(buf), 0);
if (n == 0) {
handle_close(fd[i]);
} else if (n < 0) {
if (errno == EAGAIN)
; /* The kernel didn't have any data for us to read. */
else
handle_error(fd[i], errno);
} else {
handle_input(fd[i], buf, n);
}
}
}
}

但是,随着每个fd集合中fd数量的增多,每次检查也相应要花费更多时间。

另外,由于每个系统中可以监控的fd数目有限,FD_SET 其实是一个位数组,linux 默认是 1024 bit,而 FD_SET 只是简单的把 fd 当作一个序号按位向位数组写数据。所以当 fd 大于 1024时,将导致写越界,这是一个很容易被程序员忽视的坑,具体案例参考云风写的《一起 select 引起的崩溃》(http://t.cn/8FW0zXv)。

鉴于此,不同的系统提供不同的优化方案,包括poll, epoll, kqueue, evports, /dev/poll

所有这些优化都能获得更好的性能,而且除了poll,其他的函数,增加、删除一个fd,或者测试sock是否可读写,这些操作都是O(1)的效率。

可惜这些优化的方案,都不是标准。

linux使用epollBSDs(包括苹果内核)使用kqueueSolaris使用evports/dev/poll, 致命的是,同个系统只使用他们的优化方案,不包括其他,比如linux上就没有使用kqueue

所以,如果你想要写一个高性能异步I/O的程序,若考虑移植和跨平台,你还需要做一些额外的包装。

可喜的是,libevent 帮程序员做了上面提到的这些工作。

libevent 提供了一个比 epoll更为友好的操作接口,将我等程序员从网络I/O处理的细节中解放出来,让我们可以专注于具体业务的处理上。

看懂了吗?识得唔识得呀?这就是 libvent 的由来。

彦祖老师 wechat