三组I/O复用对比
概述
- 上述select,poll和epoll都能同时监听多个文件描述符。它们将等待由timeout参数指定的超时时间,直到一个或多个文件描述符上有事件发生时,返回值是就绪的文件描述符的数量。返回0表示没有事件发生。
- 这3个函数都通过某种结构体变量来告诉内核监听哪些文件描述符上的哪些事件,并使用该结构体类型的参数来获取内核处理的结果。
select
- select的参数类型fd_set没有将文件描述符和事件绑定,它仅仅是一个文件描述符集合,因此select还需要提供3个这种类型的参数来分别传入和输出可读、可写或异常等事件。
- 这一方面使得select不能处理更多类型的事件,另一方面由于内核对fd_set集合的在线修改,应用程序下次调用select前不得不重置这3个fd_set集合。
poll
- poll的参数类型pollfd更“聪明”一些。因为它把文件描述符和事件都定义在结构体当中了,任何事件都被统一处理,从而使得编程接口简洁得多。
- 并且内核每次修改的都是pollfd结构体的revents成员,而events成员保持不变,因此下次调用poll时应用程序无须重置pollfd类型的事件集参数。
- 由于每次select和poll调用都返回整个用户注册的事件集合(其中包括就绪的未就绪的),所以应用程序索引就绪文件描述符的时间复杂度为O(n)。
epoll
- epoll则采用了与select和poll完全不同的方式来管理用户注册的事件。它在内核中维护一个事件表,并提供了一个独立的系统调用epoll_ctl来控制往其中添加、删除、修改事件。
- 这样,每次epoll_wait调用都直接从该内核事件表中取得用户注册的事件,而无须反复从用户空间读入这些事件。
- epoll_wait系统调用的events参数仅用来返回就绪的事件,这使得应用程序索引就绪文件描述符的时间复杂度达到了O(1)。
综合
- poll和epoll_wait分别用nfds和maxevents参数指定最多监听多少个文件描述符和事件。
这两个数值都能达到系统允许打开的最大文件描述符数目,即65535个(cat/proc/sys/filemax
)。
而select允许监听的最大文件描述符数量通常有限制,虽然用户可以修改这个限制,但可能会导致不可预期的后果。 - select和poll都只能工作在效率相对较低的LT模式,而epoll可以工作在效率较高的ET模式。
并且epoll还支持EPOLLONESHOT事件,该事件能进一步减少可读、可写和异常等事件被触发的次数。 - 从实现原理上来讲:
- select和epoll都是轮询的方式,即每次调用都要扫描整个注册文件描述符集合,并且将其中就绪的文件描述符返回给用户程序,因此它们检测就绪事件的算法的时间复杂度是O(n)。
- epoll_wait则不同,它采用的是回调的方式。内核检测到就绪的文件描述符时,将触发回调函数,回调函数就将该文件描述符上对应的事件插入内核就绪事件队列。
内核最后在适当的时机将该就绪事件队列中的内容拷贝到用户空间。
因此epoll_wait无须轮询整个文件描述符集合来检测哪些事件已就绪,其算法时间复杂度是O(1)。
- 需要注意的是,当活动连接比较多时,epoll_wait的效率未必比select和poll高,因为此时回调函数被触发得过于频繁。
所以epoll_wait适用于连接数量多,但活动连接较少的情况。
I/O复用应用:非阻塞connect
- connect系统调用的手册中描述了connect出错时的一种errno值:EINPROGRESS。
这种错误发生在对非阻塞的socket调用connect,而连接又没有立即建立时。
根据文件解释,这种情况下,可以调用select、poll等函数来监听这个连接失败的socket上的可写事件。
当select、poll等函数返回后,再利用getsockopt来读取错误码并清除该socket上的错误。
如果错误码是0,表示连接成功建立,否则连接失败。 - 下面的代码存在移植性问题:
- 非阻塞的socket可能导致connect始终失败
- select对于处于EINPROGRESS状态下的socket可能不起作用
- 对于出错的socket,getsockopt在有些系统(比如Linux)上返回-1,而在有些系统(比如原子伯克利的UNIX)上则返回0。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdlib.h> #include <assert.h> #include <stdio.h> #include <time.h> #include <errno.h> #include <fcntl.h> #include <sys/ioctl.h> #include <unistd.h> #include <string.h> #define BUFFER_SIZE 1023 int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } // 超时连接函数,参数分别是:服务器IP,端口号,超时时间(毫秒) // 成功时返回已经处于连接状态的socket,失败时返回-1 int unblock_connect(const char* ip, int port, int time) { int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int sockfd = socket(PF_INET, &SOCK_STREAM, 0); int fdopt = setnonblocking(sockfd); ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address)); if (ret == 0) { // 如果连接成功,则恢复socket的属性,并立即返回 printf("connect with server immediately\n"); fcntl(sockfd, F_SETFL, fdopt); return sockfd; } else { printf("unblock connect not support\n"); return -1; } fd_set readfds; fd_set writefds; struct timeval timeout; FD_ZERO(&readfds); FD_ZERO(&writefds); timeout.tv_sec = time; timeout.tv_usec = 0; ret = select(sockfd+1, NULL, &writefds, NULL, &timeout); if (ret < 0) { // select连接超时或出错,立即返回 printf("connection time out\n"); close(sockfd); return -1; } if (!FD_ISSET(sockfd, &writefds)) { printf("no events on sockfd found\n"); close(sockfd); return -1; } int error = 0; socklen_t length = sizeof(error); // 调用getsockopt来获取并清除sockfd上的错误 if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &length) < 0) { printf("get socket option failed\n"); close(sockfd); return -1; } // 错误号不为0表示连接出错 if (error != 0) { printf("connection failed after select with the error: %d\n", error); close(sockfd); return -1; } // 连接成功 printf("connection ready after select with the socket %d\n", sockfd); fcntl(sockfd, F_SETFL, fdopt); return sockfd; } int main(int argc, char* argv[]) { if(argc <= 2) { printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[0]); int sockfd = unblock_connect(ip, port, 10); if (sockfd < 0) { return 1; } close(sockfd); return 0; } |
I/O复用应用:聊天室
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
#define _GNU_SOURCE 1 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <poll.h> #include <fcntl.h> #define BUFFER_SIZE 64 int main(int argc, char* argv[]) { if (argc <= 2) { printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(arv[2]); struct sockaddr_in server_address; bzero(&server_address, sizeof(server_address)); server_address.sin_family = AF_INET; inet_pton(AF_INET, ip, &server_address.sin_addr); server_address.sin_port = htons(port); int sockfd = socket(PF_INET, SOCK_STREAM, 0); assert(sockfd >= 0); if(connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { printf("connection failed\n"); close(sockfd); return 1; } pollfd fds[2]; // 注册文件描述符0(标准输入)和文件描述符sockfd上的可读事件 fds[0].fd = 0; fds[0].events = POLLIN; fds[0]revents = 0; fds[1].fd = sockfd; fds[1].events = POLLIN | POLLRDHUP; fds[1].revents = 0; char read_buf(BUFFER_SIZE); int pipefd[2]; int ret = pipe(pipefd); assert(ret != -1); while(1) { ret = poll(fds, 2 -1); if (ret < 0) { printf("poll failure\n"); break; } if(fds[1].revents & POLLRDHUP) { printf("server close the connection\n"); break; } else if (fds[1].revents & POLLIN) { memset(read_buf, '\0', BUFFER_SIZE-1, 0); printf("%s\n", read_buf); } if(fds[0].revents & POLLIN) { // 使用splice将用户输入的数据直接写到sockfd上(零拷贝) ret = splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE); ret = splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE); } } close(sockfd); return 0; } |
服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
#define _GNU_SOURCE 1 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <poll.h> #include <fcntl.h> #define USER_LIMIT 5 // 最大用户数量 #define BUFFER_SIZE 64 // 读缓冲区大小 #define FD_LIMIT 65535 // 文件描述符数量限制 struct client_data { sockaddr_in address; char* write_buf; char buf[BUFFER_SIZE]; }; int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } int main(int argc, char* argv[]) { if (argc <= 2) { printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); // 创建一个users数组,分配FD_LIMIT个client_data对象,可以预期: // 每个可能的socket连接都可以获得一个这样的对象,并且socket的值可以直接 // 用来索引(作为数组的下标)socket连接对应的client_data对象, // 这是将socket和客户数据关联的简单而高效的方式 client_data* users = new client_data[FD_LIMIT]; //尽管分配了足够多的client_data对象,但为了提高poll的性能,仍然有必要限制用户数量 pollfd fds[USER_LIMIT + 1]; int user_counter = 0; for(int i = 1; i <= USER_LIMIT; ++i) { fds[i].fd = -1; fds[i].events = 0; } fds[0].fd = listenfd; fds[0].events = POLLIN | POLLERR; fds[0].revents = 0; while(1) { ret = poll(fds, user_counter + 1, -1); if (ret < 0) { printf("poll failure\n"); break; } for(int i = 0; i < user_counter+1; ++i) { if ((fds[i].fd == listenfd) && (fds[i].revents & POLLIN)) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); if (connfd < 0) { printf("errno is : %d\n", errno); continue; } // 如果请求太多,则关闭新的连接 if (user_counter >= USER_LIMIT) { const char* info = "too many users\n"; printf("%s", info); send(connfd, info, strlen(info), 0); close(connfd); continue; } // 对于新的连接,同时修改fds和suers数组, user_counter++; users[connfd].address = client_address; setnonblocking(connfd); fds[user_counter].fd = connfd; fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR; fds[user_counter].revents = 0; printf("comes a new user,now have %d users\n", user_counter); } else if (fds[i].revents & POLLERR) { printf("get an error from %d\n", fds[i].fd); char errors[100]; memset(errors, '\0', 100); socklen_t length = sizeof(errors); if(getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length) < 0) { printf("get socket option failed\n"); } continue; } else if (fds[i].revents & POLLRDHUP) { // 如果客户端关闭连接,则服务器也关闭对应连接,并将用户数减少1 users[fds[i].fd] = users[fds[user_counter].fd]; close(fds[i].fd); i--; user_counter--; printf("a client left\n"); } else if (fds[i].revents & POLLIN) { int connfd = fds[i].fd; memset(users[connfd].buf, '\0', BUFFER_SIZE); ret = recv(connfd, users[connfd].buf, BUFFER_SIZE-1, 0); printf("get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd); if(ret < 0) { // 如果读操作出错,关闭连接 if (errno != EAGAIN) { close(connfd); users[fds[i].fd] = users[fds[user_counter].fd]; fds[i] = fds[user_counter]; i--; user_counter--; } } else if (ret == 0) { } else { // 如果接收到客户数据,则通知其他socket连接准备写数据 for (int j = 1; j <= user_counter; ++j) { if(fds[j].fd == connfd) { continue; } fds[j].events |= ~POLLIN; fds[j].events |= POLLOUT; users[fds[j].fd].write_buff = users[connfd].buf; } } } else if(fds[i].revents & POLLOUT) { int connfd = fds[i].fd; if (!users[connfd].write_buf) { continue; } ret = send(connfd, users[connfd].write_buf, strlen(users[connfd].write_buf), 0); users[connfd].write_buf = NULL; // 写完整数据后需要重新注册fds[i]上的可读事件 fds[i].events |= ~POLLOUT; fds[i].events |= POLLIN; } } } delete[] users; close(listenfd); return 0; } |
I/O复用应用:同时处理TCP和UDP服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define TCP_BUFFER_SIZE 512 #define UDP_BUFFER_SIZE 1024 int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } int main(int argc, char* argv[]) { if(argc <= 2) { printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); // 创建TCP socket,并将其绑定到端口上 int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); // 创建UDP socket,并将其绑定到端口上 bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int udpfd = socket(PF_INET, SOCK_DGRAM, 0); assert(udpfd >= 0); ret = bind(udpfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); // 注册TCP socket和UDP socket上的可读事件 addfd(epollfd, listenfd); addfd(epollfd, udpfd); while(1) { int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if (number < 0) { printf("epoll failure\n"); break; } for(int i = 0; i < number; i++) { int sockfd = events[i].data.fd; if (sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_length = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); add(epollfd, connfd); } else if (sockfd == udpfd) { char buf[UDP_BUFFER_SIZE]; memset(buf, '\0', UDP_BUFFER_SIZE); struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); ret = recvfrom(udpfd, buf, UDP_BUFFER_SIZE-1, 0, (struct sockaddr*)&client_address, client_addrlength); if (ret > 0) { sendto(udpfd, buf,UDP_BUFFER_SIZE-1, 0, (struct sockaddr*)&client_address, client_addrlength); } } else if (events[i].events & EPOLLIN) { char buf[TCP_BUFFER_SIZE]; memset(buf, '\0', TCP_BUFFER_SIZE); ret = recv(sockfd, buf, TCP_BUFFER_SIZE-1, 0); if (ret < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { break; } close(sockfd); break; } else if (ret == 0) { close(sockfd); } else { send(sockfd, buf, ret, 0); } } else { printf("something else happened\n"); } } } close(listenfd); return 0; } |
超级服务xinet
- Linux因特网inetd是超级服务。它能同时管理着多个子服务,即监听多个端口。
- 现在Linux系统上使用的inetd服务程序是其升级版本xinetd。
xinetd原理与inetd相同,但增加了一些控制选项,并提高了安全性。
配置文件
- xinetd采用
/etc/xinetd.conf
主配置文件和/etc/xinetd.d
目录下的子配置文件来管理所有服务。 - 主配置文件包含的是通用选项,这些选项将被所有子配置文件继承。不过子配置文件可以覆盖这些选项。
每一个子配置文件用于设置一个子服务的参数。 /etc/xinetd.d/telnet
如下图:
xinetd工作流程
- xinetd管理的子服务中有的是标准服务,比如时间日期服务
daytime
,回射服务echo
和丢弃服务discard
。 - xinetd服务器在内部直接处理这些服务。还有的字符为则需要调用外部的服务器程序来处理。
xinetd通过调用fork和exec函数来加载运行这些服务器程序。比如telnet
,ftp
。
本文为原创文章,版权归Aet所有,欢迎分享本文,转载请保留出处!
你可能也喜欢
- ♥ 51CTO:Linux C++网络编程五08/20
- ♥ Linux 基于文件描述符的文件操作(非缓冲)03/23
- ♥ Linux 高性能服务器编程:HTTP相关11/27
- ♥ Linux_ 命令大全 电子邮件与新闻组03/16
- ♥ Linux_命令大全 压缩备份03/16
- ♥ Linux目录的作用03/16