IO 多路复用的介绍
IO 多路转接也称为 IO 多路复用,它是一种网络通信的手段(机制),通过这种方式可以同时监测多个文件描述符并且这个过程是阻塞的,一旦检测到有文件描述符就绪(读数据/写数据)程序的阻塞就会被解除,之后就可以基于这些(一个或多个)就绪的文件描述符进行通信。
通过这种方式在单进程/线程的场景下也可以在服务器端实现并发。常见的 IO 多路转接方式有:select、poll、epoll。
下面对多进程/多线程并发和 IO 多路转接的并发处理流程进行对比(服务器端):
1、多进程/多线程并发
主进程/父线程:调用 accept() 监测客户端连接请求
如果没有新的客户端的连接请求,当前进程/线程会阻塞
如果有新的客户端连接请求解除阻塞,建立连接
子进程/子线程:和建立连接的客户端通信
调用 read()/recv() 接收客户端发送的通信数据,如果没有通信数据,当前进程/线程会阻塞,数据到达之后阻塞自动解除
调用 write()/send() 给客户端发送数据,如果写缓冲区已满,当前进程/线程会阻塞;否则,将待发送数据写入写缓冲区
2、IO 多路转接并发
使用 IO 多路转接函数委托内核检测服务器端所有的文件描述符(通信和监听两类),这个检测过程会导致进程/线程的阻塞。
如果检测到已就绪的文件描述符阻塞解除,并将这些已就绪的文件描述符传出。
根据类型对传出的所有已就绪文件描述符进行判断,并做出不同的处理。
对这些文件描述符继续进行下一轮的检测(循环往复…)。
(1)监听的文件描述符:和客户端建立连接
此时调用 accept() 是不会导致程序阻塞的,因为监听的文件描述符是已就绪的(有新请求)
(2)通信的文件描述符:调用通信函数和已建立连接的客户端通信
调用 read()/recv() 不会阻塞程序,因为通信的文件描述符是就绪的,读缓冲区内已有数据
调用 write()/send() 不会阻塞程序,因为通信的文件描述符是就绪的,写缓冲区不满,可以往里面写数据
与多进程和多线程技术相比,I/O 多路复用技术的优势是系统开销小,系统不必创建和维护进程/线程,从而大大减小了系统的开销。
select
函数原型
使用 select 这种 IO 多路转接方式需要调用一个同名函数 select(),这个函数是跨平台的,Linux、Mac、Windows 都是支持的。
通过调用这个函数可以委托内核检测若干个文件描述符的状态,其实就是检测这些文件描述符对应的读写缓冲区的状态:
(1)读缓冲区
检测有没有数据,如果有数据该缓冲区对应的文件描述符就绪。
(2)写缓冲区
检测写缓冲区是否可以写,如果有容量可以写,缓冲区对应的文件描述符就绪。
(3)读写异常
检测读写缓冲区是否有异常,如果有该缓冲区对应的文件描述符就绪。
委托检测的文件描述符被遍历检测完毕之后,已就绪的这些满足条件的文件描述符会通过 select() 的参数分 3 个集合传出,
得到这几个集合之后就可以分情况依次处理。
这个函数的函数原型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| #include <sys/select.h> struct timeval { time_t tv_sec; suseconds_t tv_usec; };
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval * timeout);
|
初始化 fd_set 类型的参数需要使用相关的一些操作函数,具体如下:
1 2 3 4 5 6 7 8
| void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
|
补充说明
select() 函数中第 2、3、4 个参数都是 fd_set 类型,它表示一个文件描述符的集合,类似于信号集 sigset_t,
这个类型的数据有 128 个字节,也就是 1024 个标志位,和内核中文件描述符表中的文件描述符个数是一样的。
这块内存中的每一个 bit 和文件描述符表中的每一个文件描述符是一一对应的关系,这样就可以使用最小的存储空间将要表达的意思描述出来。
下图中的 fd_set 中存储了要委托内核检测读缓冲区的文件描述符集合。
如果集合中的标志位为 0,表示不检测这个文件描述符状态。
如果集合中的标志位为 1,表示检测这个文件描述符状态。

内核在遍历这个读集合的过程中,如果被检测的文件描述符对应的读缓冲区中没有数据,内核将修改这个文件描述符在读集合 fd_set 中对应的标志位改为 0;如果有数据,那么这个标志位的值不变(还是 1)。

当 select() 函数解除阻塞之后,被内核修改过的读集合通过参数传出,此时集合中只要标志位的值为 1,那么它对应的文件描述符就是就绪的,就可以基于这个文件描述符和客户端建立新连接或者通信。
select并发处理

服务端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h>
int main() { int lfd = socket(AF_INET, SOCK_STREAM, 0); if (lfd == -1) { perror("socket"); exit(-1); }
struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(9999); saddr.sin_addr.s_addr = INADDR_ANY; int ret = bind(lfd, (struct sockaddr*) &saddr, sizeof(saddr)); if (ret == -1) { perror("bind"); exit(-1); }
ret = listen(lfd, 128); if (ret == -1) { perror("listen"); exit(-1); }
fd_set rdset; fd_set rdtmp; FD_ZERO(&rdset); FD_SET(lfd, &rdset); int maxfd = lfd; while (1) { rdtmp = rdset; int ret = select(maxfd + 1, &rdtmp, NULL, NULL, NULL); if (FD_ISSET(lfd, &rdtmp)) { struct sockaddr_in caddr; int len = sizeof(caddr); int cfd = accept(lfd, (struct sockaddr*) &caddr, &len); FD_SET(cfd, &rdset); maxfd = cfd > maxfd ? cfd : maxfd;
char ip[32]; inet_ntop(AF_INET, &caddr.sin_addr.s_addr, ip, sizeof(ip)); unsigned short port = ntohs(caddr.sin_port); printf("client ip: %s, port: %d\n", ip, port); } for (int i = 0; i < maxfd + 1; ++i) { if (i != lfd && FD_ISSET(i, &rdtmp)) { char buf[1024]; int len = recv(i, buf, sizeof(buf), 0); if (len == -1) { perror("recv error"); exit(-1); } else if (len == 0) { printf("client closed...\n"); FD_CLR(i, &rdset); close(i); break; } printf("recv buf = %s\n", buf);
ret = send(i, buf, strlen(buf) + 1, 0); if (ret == -1) { perror("send error"); exit(-1); } } } }
close(lfd);
return 0; }
|
在上面的代码中,创建了两个 fd_set 变量,用于保存要检测的读集合。
rdset 用于保存要检测的原始数据,这个变量不能作为参数传递给 select(),因为在函数内部这个变量中的值会被内核修改。
因此需要通过 rdtmp 变量将原始数据传递给内核,select() 调用完毕之后再将内核数据传出,这两个变量的功能是不一样的。
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h>
int main() { int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd == -1) { perror("socket"); exit(-1); }
struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(9999); inet_pton(AF_INET, "192.168.3.194", &saddr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*) &saddr, sizeof(saddr)); if (ret == -1) { perror("connect"); exit(-1); }
while (1) { char recvBuf[1024]; fgets(recvBuf, sizeof(recvBuf), stdin); write(fd, recvBuf, strlen(recvBuf) + 1); read(fd, recvBuf, sizeof(recvBuf)); printf("recv buf: %s\n", recvBuf);
sleep(1); }
close(fd);
return 0; }
|
客户端不需要使用 IO 多路转接进行处理,因为客户端和服务器的对应关系是 1:N,也就是说客户端只能和一个连接成功的服务器通信。
虽然使用 select 这种 IO 多路转接技术可以降低系统开销,提高程序效率,但是它也有局限性:
(1)待检测集合(第 2、3、4 个参数)频繁地在用户区和内核区之间进行数据的拷贝,效率低
(2)内核对于 select 传递进来的待检测集合的检测方式是线性的
如果集合内待检测的文件描述符很多,检测效率会比较低
如果集合内待检测的文件描述符相对较少,检测效率会比较高
(3)使用 select 能够检测的文件描述符个数有上限,默认是1024,这是在内核中被写死的。
多线程select并发
服务端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <pthread.h> #include <sys/select.h>
pthread_mutex_t mutex;
typedef struct fdinfo { int fd; int* maxfd; fd_set* rdset; } FdInfo;
void* acceptConn(void* arg) { printf("child thread id: %ld\n", pthread_self()); FdInfo* info = (FdInfo*)arg;
struct sockaddr_in caddr; int len = sizeof(caddr); int cfd = accept(info->fd, (struct sockaddr*) &caddr, &len); pthread_mutex_lock(&mutex); FD_SET(cfd, info->rdset); *info->maxfd = cfd > *info->maxfd ? cfd : *info->maxfd; pthread_mutex_unlock(&mutex);
char ip[32]; inet_ntop(AF_INET, &caddr.sin_addr.s_addr, ip, sizeof(ip)); unsigned short port = ntohs(caddr.sin_port); printf("client ip: %s, port: %d\n", ip, port);
free(info);
return NULL; }
void* communication(void* arg) { printf("child thread id: %ld\n", pthread_self()); FdInfo* info = (FdInfo*)arg;
char buf[1024]; int len = recv(info->fd, buf, sizeof(buf), 0); if (len == -1) { perror("recv error"); free(info); return NULL; } else if (len == 0) { printf("client closed...\n"); pthread_mutex_lock(&mutex); FD_CLR(info->fd, info->rdset); pthread_mutex_unlock(&mutex); close(info->fd); free(info); return NULL; }
printf("recv buf = %s\n", buf); int ret = send(info->fd, buf, strlen(buf) + 1, 0); if (ret == -1) { perror("send error"); } free(info);
return NULL; }
int main() { pthread_mutex_init(&mutex, NULL);
int lfd = socket(AF_INET, SOCK_STREAM, 0); if (lfd == -1) { perror("socket"); exit(-1); }
struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(9999); saddr.sin_addr.s_addr = INADDR_ANY; int ret = bind(lfd, (struct sockaddr*) &saddr, sizeof(saddr)); if (ret == -1) { perror("bind"); exit(-1); }
ret = listen(lfd, 128); if (ret == -1) { perror("listen"); exit(-1); }
fd_set rdset; fd_set rdtmp; FD_ZERO(&rdset); FD_SET(lfd, &rdset); int maxfd = lfd; while (1) { pthread_mutex_lock(&mutex); rdtmp = rdset; pthread_mutex_unlock(&mutex); int ret = select(maxfd + 1, &rdtmp, NULL, NULL, NULL); if (FD_ISSET(lfd, &rdtmp)) { pthread_t tid; FdInfo* info = (FdInfo*)malloc(sizeof(FdInfo)); info->fd = lfd; info->maxfd = &maxfd; info->rdset = &rdset; pthread_create(&tid, NULL, acceptConn, info); pthread_detach(tid); } for (int i = 0; i < maxfd + 1; ++i) { if (i != lfd && FD_ISSET(i, &rdtmp)) { pthread_t tid; FdInfo* info = (FdInfo*)malloc(sizeof(FdInfo)); info->fd = i; info->rdset = &rdset; pthread_create(&tid, NULL, communication, info); pthread_detach(tid); } } }
close(lfd); pthread_mutex_destroy(&mutex);
return 0; }
|
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h>
int main() { int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd == -1) { perror("socket"); exit(-1); }
struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(9999); inet_pton(AF_INET, "192.168.3.194", &saddr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*) &saddr, sizeof(saddr)); if (ret == -1) { perror("connect"); exit(-1); }
int num = 0; while (1) { char buf[1024]; sprintf(buf, "hello world, %d\n", num ++); write(fd, buf, strlen(buf) + 1);
int len = read(fd, buf, sizeof(buf)); if (len == -1) { perror("read error"); exit(1); } printf("recv buf: %s\n", buf); sleep(1); }
close(fd);
return 0; }
|
poll
poll 的机制与 select 类似,与 select 在本质上没有多大差别,使用方法也类似,二者的对比如下:
相同点:
内核对应文件描述符的检测也是以线性的方式进行轮询,根据描述符的状态进行处理。
poll 和 select 检测的文件描述符集合在检测过程中频繁地进行用户区和内核区的拷贝,它的开销随着文件描述符数量的增加而线性增长,效率不高。
不同点:
select 检测的文件描述符个数上限是 1024,poll 没有最大文件描述符数量的限制。
select 可以跨平台使用,poll 只能在 Linux 平台使用。
函数原型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| #include <poll.h>
struct pollfd { int fd; short events; short revents; };
struct pollfd myfd[100]; int poll(struct pollfd *fds, nfds_t nfds, int timeout);
|
示例代码
服务端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <poll.h>
int main() { int lfd = socket(AF_INET, SOCK_STREAM, 0); if (lfd == -1) { perror("socket"); exit(-1); }
struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(9999); saddr.sin_addr.s_addr = INADDR_ANY; int ret = bind(lfd, (struct sockaddr*) &saddr, sizeof(saddr)); if (ret == -1) { perror("bind"); exit(-1); }
ret = listen(lfd, 100); if (ret == -1) { perror("listen"); exit(-1); }
struct pollfd fds[1024]; for (int i = 0; i < 1024; ++i) { fds[i].fd = -1; fds[i].events = POLLIN; } fds[0].fd = lfd;
int maxfd = 0; while (1) { ret = poll(fds, maxfd + 1, -1); if (ret == -1) { perror("poll error"); exit(-1); }
if (fds[0].revents & POLLIN) { struct sockaddr_in caddr; int len = sizeof(caddr); int cfd = accept(lfd, (struct sockaddr*) &caddr, &len); int i; for (i = 0; i < 1024; ++i) { if (fds[i].fd == -1) { fds[i].fd = cfd; break; } } maxfd = i > maxfd ? i : maxfd; }
for (int i = 1; i < maxfd + 1; ++i) { if (fds[i].revents & POLLIN) { char buf[128]; int ret = read(fds[i].fd, buf, sizeof(buf)); if (ret == -1) { perror("read error"); exit(-1); } else if (ret == 0) { printf("client closed...\n"); close(fds[i].fd); fds[i].fd = -1; } else { printf("read client data: %s\n", buf); write(fds[i].fd, buf, strlen(buf) + 1); } } } }
close(lfd);
return 0; }
|
从上面的测试代码可知,使用 poll 和 select 进行 IO 多路转接的处理思路是完全相同的,使用 poll 编写的代码看起来更直观。
select 使用的位图的方式来标记要委托内核检测的文件描述符(每个bit对应一个唯一的文件描述符),对这个 fd_set 类型的位图变量进行读写还需要使用一系列的宏函数,操作比较麻烦;poll 将要检测的文件描述符的相关信息封装到了一个结构体 struct pollfd,可以直接读写这个结构体变量。
另外 poll 的第二个参数有两种赋值方式,都和第一个参数的数组有关:
(1)使用参数 1 数组的元素个数
(2)使用参数 1 数组中存储的最后一个有效元素对应的下标值 + 1
内核会根据第二个参数传递的值对参数 1 数组中的文件描述符进行线性遍历,这一点和 select 类似。
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h>
int main() { int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd == -1) { perror("socket"); exit(-1); }
struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(9999); inet_pton(AF_INET, "192.168.3.194", &saddr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*) &saddr, sizeof(saddr)); if (ret == -1) { perror("connect"); exit(-1); }
while (1) { char recvBuf[1024]; fgets(recvBuf, sizeof(recvBuf), stdin); write(fd, recvBuf, strlen(recvBuf) + 1); read(fd, recvBuf, sizeof(recvBuf)); printf("recv buf: %s\n", recvBuf);
sleep(1); }
close(fd);
return 0; }
|
epoll
epoll 全称 eventpoll,是 linux 内核实现 IO 多路转接/复用(IO multiplexing)的一个实现。
IO 多路转接的意思是在一个操作里同时监听多个输入输出源,在其中一个或多个输入输出源可用的时候返回,然后对其的进行读写操作。
epoll 是 select 和 poll 的升级版,epoll 改进了工作方式,因此它更加高效。
(1)对于待检测集合 select 和 poll 是基于线性方式处理的;epoll 是基于红黑树来管理待检测集合的。
(2)select 和 poll 每次都会线性扫描整个待检测集合,集合越大速度越慢;epoll 使用的是回调机制,处理效率高,不会随着检测集合的变大而下降。
(3)select 和 poll 工作过程中存在内核/用户空间数据的频繁拷贝问题;在 epoll 中内核和用户区使用的是共享内存(基于mmap内存映射区实现),省去了不必要的内存拷贝。
(4)对 select 和 poll 返回的集合进行判断才能知道哪些文件描述符是就绪的;通过 epoll 可以直接得到已就绪的文件描述符集合,无需再次检测。
(5)使用 epoll 没有最大文件描述符的限制,仅受系统中进程能打开的最大文件数目限制。
当多路复用的文件数量庞大、IO 流量频繁的时候,这种情况下 select() 和 poll() 表现较差,推荐使用 epoll()。
函数原型
epoll 中一共提供是三个 API 函数,分别处理不同的操作,函数原型如下:
1 2 3 4 5 6 7
| #include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
|
select/poll 低效的原因之一是将“添加/维护待检测任务”和“阻塞进程/线程”两个步骤合二为一。
每次调用 select 都需要这两步操作,然而大多数应用场景中,需要监视的 socket 个数相对固定,并不需要每次都修改。
epoll 将这两个操作分开,先用 epoll_ctl() 维护等待队列,再调用 epoll_wait() 阻塞进程。
通过下图的对比明显看出,epoll 的效率得到了提升。

epoll_create() 函数的作用是创建一个红黑树模型的实例,用于管理待检测的文件描述符的集合。
1 2 3 4 5 6 7 8 9
| int epoll_create(int size);
|
epoll_ctl() 函数的作用是管理红黑树实例上的节点,可以进行添加、删除、修改操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t;
struct epoll_event { uint32_t events; epoll_data_t data; };
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
|
epoll_wait() 函数的作用是检测创建的 epoll 实例中有没有就绪的文件描述符。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
|
epoll 的示例程序
服务端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/epoll.h>
int main() { int lfd = socket(AF_INET, SOCK_STREAM, 0); if (lfd == -1) { perror("socket error"); exit(-1); }
struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(9999); saddr.sin_addr.s_addr = htonl(INADDR_ANY);
int optval = 1; setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
int ret = bind(lfd, (struct sockaddr*) &saddr, sizeof(saddr)); if (ret == -1) { perror("bind error"); exit(-1); }
ret = listen(lfd, 100); if (ret == -1) { perror("listen error"); exit(-1); }
int epfd = epoll_create(1); if (epfd == -1) { perror("epoll_create error"); exit(-1); }
struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = lfd; ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev); if (ret == -1) { perror("epoll_ctl error"); exit(-1); }
struct epoll_event evs[1024]; int size = sizeof(evs) / sizeof(evs[0]); while (1) { int num = epoll_wait(epfd, evs, size, -1); printf("num = %d\n", num); for (int i = 0; i < num; i ++) { int fd = evs[i].data.fd; if (fd == lfd) { int cfd = accept(fd, NULL, NULL); ev.events = EPOLLIN; ev.data.fd = cfd; epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev); } else { char buf[1024]; int len = read(fd, buf, sizeof(buf)); if (len == -1) { perror("read error"); exit(-1); } else if (len == 0) { printf("client closed...\n"); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev); close(fd); break; } else { printf("read client data: %s\n", buf); write(fd, buf, strlen(buf) + 1); } } } }
close(lfd);
return 0; }
|
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h>
int main() { int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd == -1) { perror("socket"); exit(-1); }
struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(9999); inet_pton(AF_INET, "192.168.3.194", &saddr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*) &saddr, sizeof(saddr)); if (ret == -1) { perror("connect"); exit(-1); }
while (1) { char recvBuf[1024]; fgets(recvBuf, sizeof(recvBuf), stdin); write(fd, recvBuf, strlen(recvBuf) + 1); read(fd, recvBuf, sizeof(recvBuf)); printf("recv buf: %s\n", recvBuf);
sleep(1); }
close(fd);
return 0; }
|
epoll 的工作模式
水平模式
水平模式称为 LT(level triggered) 模式,是缺省的工作方式,并且同时支持 block 和 no-block socket。
在这种做法中,内核通知使用者哪些文件描述符已经就绪,之后就可以对这些已就绪的文件描述符进行 IO 操作。
如果不作任何操作,内核还是会继续通知使用者。
特点:
(1)读事件:如果文件描述符对应的读缓冲区还有数据,读事件就会被触发,epoll_wait() 解除阻塞。
当读事件被触发,epoll_wait() 解除阻塞,之后就可以接收数据。
如果接收数据的 buf 很小,不能全部将缓冲区数据读出,那么读事件会继续被触发,直到数据被全部读出;如果接收数据的空间相对较大,读数据的效率也会相对较高(减少了读数据的次数)。
因为读数据是被动的,必须通过读事件才能知道有数据到达,因此对于读事件的检测是必须的。
(2)写事件:如果文件描述符对应的写缓冲区可写,写事件就会被触发,epoll_wait() 解除阻塞。
当写事件被触发,epoll_wait() 解除阻塞,之后就可以将数据写入到写缓冲区。
写事件的触发在写数据之前,被写入到写缓冲区中的数据是由内核自动发送出去的。
如果写缓冲区没有被写满,写事件会一直被触发。
因为写数据是主动的,并且写缓冲区一般情况下都是可写的(缓冲区不满),因此对于写事件的检测不是必须的。
边沿模式
边沿模式称为 ET(edge-triggered) 模式,只支持 no-block socket。
在这种模式下,当文件描述符从未就绪变为就绪时,内核会通过 epoll 通知使用者。然后它会假设使用者知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知(only once)。如果我们对这个文件描述符做 IO 操作,从而导致它再次变成未就绪,当这个未就绪的文件描述符再次变成就绪状态,内核会再次进行通知,并且还是只通知一次。
ET 模式在很大程度上减少了epoll 事件被重复触发的次数,因此效率要比 LT 模式高。
特点:
(1)读事件:当读缓冲区有新的数据进入,读事件被触发一次,没有新数据不会触发该事件。
如果有新数据进入到读缓冲区,读事件被触发,epoll_wait() 解除阻塞。
读事件被触发,可以通过调用 read()/recv() 函数将缓冲区数据读出。
如果数据没有被全部读走,并且没有新数据进入,读事件不会再次触发,只通知一次。
如果数据被全部读走或者只读走一部分,此时有新数据进入,读事件被触发,并且只通知一次。
(2)写事件:当写缓冲区状态可写,写事件只会触发一次。
如果写缓冲区被检测到可写,写事件被触发,epoll_wait() 解除阻塞。
写事件被触发,就可以通过调用 write()/send() 函数将数据写入到写缓冲区。
写缓冲区从不满到被写满,期间写事件只会被触发一次。
写缓冲区从满到不满,状态变为可写,写事件只会被触发一次。
总结:epoll 的边沿模式下 epoll_wait() 检测到文件描述符有新事件才会通知,
如果不是新的事件就不通知,通知的次数比水平模式少,效率比水平模式要高。
ET 模式的设置:
边沿模式不是默认的 epoll 模式,需要额外进行设置。
epoll 管理的红黑树实例中每个节点都是 struct epoll_event 类型,只需要将 EPOLLET 添加到结构体的 events 成员中。
1 2
| struct epoll_event ev; ev.events = EPOLLIN | EPOLLET;
|
设置非阻塞:
对于写事件的触发一般情况下是不需要进行检测的,因为写缓冲区大部分情况下都是有足够的空间可以进行数据的写入。
对于读事件的触发就必须要检测了,因为服务器也不知道客户端什么时候发送数据。
如果使用 epoll 的边沿模式进行读事件的检测,有新数据达到只会通知一次,那么必须保证得到通知后将数据全部从读缓冲区中读出。
应该如何读这些数据呢?
(1)准备一块特别大的内存,用于存储从读缓冲区中读出的数据,但是这种方式有很大的弊端:
内存的大小没有办法界定,太大浪费内存,太小又不够用。
系统能够分配的最大堆内存也是有上限的,栈内存就更小了。
(2)循环接收数据
1 2 3 4 5
| int len = 0; while((len = recv(fd, buf, sizeof(buf), 0)) > 0) { }
|
这样做也是有弊端的,因为套接字操作默认是阻塞的。当读缓冲区数据被读完之后,读操作就阻塞了。
调用的 read()/recv() 函数被阻塞了,当前进程/线程被阻塞之后就无法处理其他操作了。
解决阻塞问题,需要将套接字默认的阻塞行为修改为非阻塞,需要使用 fcntl() 函数进行处理:
1 2 3 4
| int flag = fcntl(cfd, F_GETFL); flag |= O_NONBLOCK; fcntl(cfd, F_SETFL, flag);
|
总结:epoll 在边沿模式下,必须要将套接字设置为非阻塞模式。但是,在非阻塞模式下,循环地将读缓冲区数据读到内存中,当缓冲区数据被读完了,调用的 read()/recv() 函数还会继续从缓冲区中读数据,此时函数调用就失败了,返回 -1,对应的全局变量 errno 值为 EAGAIN 或者 EWOULDBLOCK,错误信息为 Resource temporarily unavailable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| int len = recv(fd, buf, sizeof(buf), 0); if(len == -1) { if(errno == EAGAIN) { printf("recv data end...\n"); } else { perror("recv"); exit(0); } }
|
服务端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/epoll.h> #include <fcntl.h> #include <errno.h>
int main() { int lfd = socket(AF_INET, SOCK_STREAM, 0); if (lfd == -1) { perror("socket error"); exit(-1); }
struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(9999); saddr.sin_addr.s_addr = htonl(INADDR_ANY);
int optval = 1; setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
int ret = bind(lfd, (struct sockaddr*) &saddr, sizeof(saddr)); if (ret == -1) { perror("bind error"); exit(-1); }
ret = listen(lfd, 100); if (ret == -1) { perror("listen error"); exit(-1); }
int epfd = epoll_create(1); if (epfd == -1) { perror("epoll_create error"); exit(-1); }
struct epoll_event ev; ev.events = EPOLLIN | EPOLLET; ev.data.fd = lfd; ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev); if (ret == -1) { perror("epoll_ctl error"); exit(-1); }
struct epoll_event evs[1024]; int size = sizeof(evs) / sizeof(evs[0]); while (1) { int num = epoll_wait(epfd, evs, size, -1); printf("num = %d\n", num); for (int i = 0; i < num; i ++) { int fd = evs[i].data.fd; if (fd == lfd) { int cfd = accept(fd, NULL, NULL); int flag = fcntl(cfd, F_GETFL); flag |= O_NONBLOCK; fcntl(cfd, F_SETFL, flag); ev.events = EPOLLIN | EPOLLET; ev.data.fd = cfd; epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev); } else { char buf[5]; while (1) { int len = recv(fd, buf, sizeof(buf), 0); if (len == -1) { if (errno == EAGAIN) { printf("recv data end...\n"); break; } else { perror("recv error"); exit(-1); } } else if (len == 0) { printf("client closed...\n"); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); close(fd); break; } else { printf("recv client data: %s\n", buf); ret = send(fd, buf, len, 0); if (ret == -1) { perror("send error"); exit(-1); } } } } } }
close(lfd);
return 0; }
|
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h>
int main() { int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd == -1) { perror("socket"); exit(-1); }
struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(9999); inet_pton(AF_INET, "192.168.3.194", &saddr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*) &saddr, sizeof(saddr)); if (ret == -1) { perror("connect"); exit(-1); }
while (1) { char recvBuf[1024]; fgets(recvBuf, sizeof(recvBuf), stdin); write(fd, recvBuf, strlen(recvBuf) + 1); read(fd, recvBuf, sizeof(recvBuf)); printf("recv buf: %s\n", recvBuf);
sleep(1); }
close(fd);
return 0; }
|
参考资料
https://subingwen.cn/linux/select
https://subingwen.cn/linux/poll
https://subingwen.cn/linux/epoll