进阶TCP服务器
该模块涉及TCP服务器常见点:并发量、IO模型
IO网络模型
阻塞IO (Blocking IO)
阻塞IO的情况下,我们如果需要更多的并发,只能使用多线程,一个IO占用一个线程,资源浪费很大但是在并发量小的情况下性能很强。
非阻塞IO (Non-Blocking IO)
在非阻塞状态下,recv() 接口在被调用后立即返回,返回值代表了不同的含义。如在本例中,
- recv() 返回值大于 0,表示接受数据完毕,返回值即是接受到的字节数;
- recv() 返回 0,表示连接已经正常断开;
- recv() 返回 -1,且 errno 等于 EAGAIN,表示 recv 操作还没执行完成;
- recv() 返回 -1,且 errno 不等于 EAGAIN,表示 recv 操作遇到系统错误 errno。
非阻塞的接口相比于阻塞型接口的显著差异在于,在被调用之后立即返回。使用如下的函数可以将某句柄 fd 设为非阻塞状态:
fcntl( fd, F_SETFL, O_NONBLOCK );
多路复用IO (IO Multiplexing)
多路复用IO,select/poll、epoll。
select/poll
select和poll很相似,在检测IO时间的时候都需要遍历整个FD存储结构,只是select使用数组存储FD,其具有最大值限制,而poll使用链表无最大值限制(与内存大小相关)。
先来分析select的优缺点,这样就知道epoll相比select的优势等。
select 本质上是通过设置或检查存放fd标志位的数据结构进行下一步处理。 这带来缺点: 单个进程可监视的fd数量被限制,即能监听端口的数量有限 单个进程所能打开的最大连接数有FD_SETSIZE宏定义,其大小是32个整数的大小(在32位的机器上,大小就是3232,同理64位机器上FD_SETSIZE为3264),当然我们可以对进行修改,然后重新编译内核,但是性能可能会受到影响,这需要进一步的测试 一般该数和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。32位机默认1024个,64位默认2048。
当socket较多时,每次select都要通过遍历FD_SETSIZE个socket,不管是否活跃,这会浪费很多CPU时间。如果能给 socket 注册某个回调函数,当他们活跃时,自动完成相关操作,即可避免轮询,这就是epoll与kqueue。
select 调用流程
-
使用copy_from_user从用户空间拷贝fd_set到内核空间
-
注册回调函数__pollwait
-
遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)
-
以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。
-
__pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。
-
poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。
-
如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。
-
把fd_set从内核空间拷贝到用户空间。
-
内核需要将消息传递到用户空间,都需要内核拷贝动作。需要维护一个用来存放大量fd的数据结构,使得用户空间和内核空间在传递该结构时复制开销大。
-
每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
-
同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
-
select支持的文件描述符数量太小了,默认是1024
epoll
- 没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口)
- 效率提升,不是轮询,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数 即Epoll最大的优点就在于它只关心“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll
- 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。
- epoll通过内核和用户空间共享一块内存来实现的
异步 IO(Asynchronous I/O)
异步IO工作流程可以看图
信号驱动 IO(signal driven I/O, SIGIO)
服务器模型
Reactor
首先来回想一下普通函数调用的机制:程序调用某函数,函数执行,程序等待,函数将 结果和控制权返回给程序,程序继续处理。Reactor 释义“反应堆”,是一种事件驱动机制。 和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰 相反,Reactor 逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上, 如果相应的时间发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函 数”。
Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要 处理的 I/O 事件注册到一个中心 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上; 一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事 先注册的相应 I/O 事件分发到对应的处理器中。
Reactor 模型有三个重要的组件:
- 多路复用器:由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。
- 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中。
- 事件处理器:负责处理特定事件的处理函数。
Proactor
Proactor 最大的特点是 使用异步 I/O。所有的 I/O 操作都交由系统提供的异步 I/O 接口去执行。工作线程仅仅负责业务逻辑。在 Proactor 中,用户函数启动一个异步的文件操作。同时将这个操作注册到多路复用器上。多路复用器并不关心文件是否可读或可写而是关心这个异步读操作是否完成。异步操作是操作系统完成,用户程序不需要关心。多路复用器等待直到有完成通知到来。当操作系统完成了读文件操作——将读到的数据复制到了用户先前提供的缓冲区之后,通知多路复用器相关操作已完成。多路复用器再调用相应的处理程序,处理数据。
Proactor 增加了编程的复杂度,但给工作线程带来了更高的效率。Proactor 可以在 系统态将读写优化,利用 I/O 并行能力,提供一个高性能单线程模型。在 windows 上,由于没有 epoll 这样的机制,因此提供了 IOCP 来支持高并发, 由于操作系统做了较好的优化,windows 较常采用 Proactor 的模型利用完成端口来实现服务器。在 linux 上,在2.6 内核出现了 aio 接口,但 aio 实际效果并不理想,它的出现,主要是解决 poll 性能不佳的问题,但实际上经过测试,epoll 的性能高于 poll+aio,并且 aio 不能处理 accept,因此 linux 主要还是以 Reactor 模型为主。在不使用操作系统提供的异步 I/O 接口的情况下,还可以使用 Reactor 来模拟 Proactor, 差别是:使用异步接口可以利用系统提供的读写并行能力,而在模拟的情况下,这需要在用户态实现。具体的做法只需要这样:
- 注册读事件(同时再提供一段缓冲区)
- 事件分离器等待可读事件
- 事件到来,激活分离器,分离器(立即读数据,写缓冲区)调用事件处理器
- 事件处理器处理数据,删除事件(需要再用异步接口注册)
我们知道,Boost.asio 库采用的即为 Proactor 模型。不过 Boost.asio 库在 Linux 平台采用 epoll 实现的 Reactor 来模拟 Proactor,并且另外开了一个线程来完成读写调度。
并发量
通过linux系统的配置、使用多路复用的IO就可以很容易达到百万的并发量这个数量级。
配置:
net.ipv4.tcp_mem = 252144 524288 786432
net.ipv4.tcp_wmem = 2048 2048 4096
net.ipv4.tcp_rmem = 2048 2048 4096
fs.file-max = 1048576
net.nf_conntrack_max = 1048576
net.netfilter.nf_conntrack_tcp_timeout_established = 1200
fs.file-max为fd可以达到的最大值
net.nf_conntrack_max为防火墙允许的最大连接数
net.ipv4.tcp_mem 三道台阶,协议栈占有空间大于这三道台阶时采取不同的优化方案
net.ipv4.tcp_wmem 和 net.ipv4.tcp_rmem 为写和收buffer大小阶梯,默认为中间大小,动态变化的最大值为第三个值。
上面的buffer根据传输的文件类型可以采用不同的值。
ulimit -a 可以看文件描述符数量限制,调整到1048576(ulimit -n)
IO:
使用epoll,监听多个端口(100就够)
实现(Reactor-Epoll)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <errno.h>
#include <sys/epoll.h>
#define BUFFER_LENGTH 1024
struct sockitem { //
int sockfd;
int (*callback)(int fd, int events, void *arg);
char recvbuffer[BUFFER_LENGTH]; //
char sendbuffer[BUFFER_LENGTH];
int rlength;
int slength;
};
// mainloop / eventloop --> epoll -->
struct reactor {
int epfd;
struct epoll_event events[512];
};
struct reactor *eventloop = NULL;
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg) {
struct sockitem *si = (struct sockitem*)arg;
send(fd, si->sendbuffer, si->slength, 0); //
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
//ev.data.fd = clientfd;
si->sockfd = fd;
si->callback = recv_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}
// ./epoll 8080
int recv_cb(int fd, int events, void *arg) {
//int clientfd = events[i].data.fd;
struct sockitem *si = (struct sockitem*)arg;
struct epoll_event ev;
//char buffer[1024] = {0};
int ret = recv(fd, si->recvbuffer, BUFFER_LENGTH, 0);
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { //
return -1;
} else {
}
ev.events = EPOLLIN;
//ev.data.fd = fd;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
close(fd);
free(si);
} else if (ret == 0) { //
//
printf("disconnect %d\n", fd);
ev.events = EPOLLIN;
//ev.data.fd = fd;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
close(fd);
free(si);
} else {
printf("Recv: %s, %d Bytes\n", si->recvbuffer, ret);
si->rlength = ret;
memcpy(si->sendbuffer, si->recvbuffer, si->rlength);
si->slength = si->rlength;
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLET;
//ev.data.fd = clientfd;
si->sockfd = fd;
si->callback = send_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int accept_cb(int fd, int events, void *arg) {
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(fd, (struct sockaddr*)&client_addr, &client_len);
if (clientfd <= 0) return -1;
char str[INET_ADDRSTRLEN] = {0};
printf("recv from %s at port %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
ntohs(client_addr.sin_port));
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
//ev.data.fd = clientfd;
struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = clientfd;
si->callback = recv_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, clientfd, &ev);
return clientfd;
}
int main(int argc, char *argv[]) {
if (argc < 2) {
return -1;
}
int port = atoi(argv[1]);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
return -2;
}
if (listen(sockfd, 5) < 0) {
return -3;
}
eventloop = (struct reactor*)malloc(sizeof(struct reactor));
// epoll opera
eventloop->epfd = epoll_create(1);
struct epoll_event ev;
ev.events = EPOLLIN;
struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = sockfd;
si->callback = accept_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sockfd, &ev);
while (1) {
int nready = epoll_wait(eventloop->epfd, eventloop->events, 512, -1);
if (nready < -1) {
break;
}
int i = 0;
for (i = 0;i < nready;i ++) {
if (eventloop->events[i].events & EPOLLIN) {
//printf("sockitem\n");
struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
si->callback(si->sockfd, eventloop->events[i].events, si);
}
if (eventloop->events[i].events & EPOLLOUT) {
struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
si->callback(si->sockfd, eventloop->events[i].events, si);
}
}
}
}
实现(Reactor-Epoll的百万并发)
前面有reactor-epoll自己实现的了,这里直接使用netty的reactor实现就可以。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#define BUFFER_LENGTH 1024
#define MAX_EPOLL_EVENTS 1024*1024 //connection
#define MAX_EPOLL_ITEM 102400 //con
#define SERVER_PORT 8888
#define LISTEN_PORT_COUNT 100
typedef int NCALLBACK(int ,int, void*);
struct ntyevent {
int fd;
int events;
void *arg;
int (*callback)(int fd, int events, void *arg);
int status;
char buffer[BUFFER_LENGTH];
int length;
long last_active;
};
struct ntyreactor {
int epfd;
struct ntyevent *events; // 1024 * 1024
};
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) {
ev->fd = fd;
ev->callback = callback;
ev->events = 0;
ev->arg = arg;
ev->last_active = time(NULL);
return ;
}
int nty_event_add(int epfd, int events, struct ntyevent *ev) {
struct epoll_event ep_ev = {0, {0}};
ep_ev.data.ptr = ev;
ep_ev.events = ev->events = events;
int op;
if (ev->status == 1) {
op = EPOLL_CTL_MOD;
} else {
op = EPOLL_CTL_ADD;
ev->status = 1;
}
if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {
printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
return -1;
}
return 0;
}
int nty_event_del(int epfd, struct ntyevent *ev) {
struct epoll_event ep_ev = {0, {0}};
if (ev->status != 1) {
return -1;
}
ep_ev.data.ptr = ev;
ev->status = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
return 0;
}
int recv_cb(int fd, int events, void *arg) {
struct ntyreactor *reactor = (struct ntyreactor*)arg;
struct ntyevent *ev = reactor->events+fd;
int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);
nty_event_del(reactor->epfd, ev);
if (len > 0) {
ev->length = len;
ev->buffer[len] = '\0';
printf("C[%d]:%s\n", fd, ev->buffer);
nty_event_set(ev, fd, send_cb, reactor);
nty_event_add(reactor->epfd, EPOLLOUT, ev);
} else if (len == 0) {
close(ev->fd);
printf("[fd=%d] pos[%ld], closed\n", fd, ev-reactor->events);
} else {
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
return len;
}
int send_cb(int fd, int events, void *arg) {
struct ntyreactor *reactor = (struct ntyreactor*)arg;
struct ntyevent *ev = reactor->events+fd;
int len = send(fd, ev->buffer, ev->length, 0);
if (len > 0) {
printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);
nty_event_del(reactor->epfd, ev);
nty_event_set(ev, fd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, ev);
} else {
close(ev->fd);
nty_event_del(reactor->epfd, ev);
printf("send[fd=%d] error %s\n", fd, strerror(errno));
}
return len;
}
int accept_cb(int fd, int events, void *arg) {
struct ntyreactor *reactor = (struct ntyreactor*)arg;
if (reactor == NULL) return -1;
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int clientfd;
if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) {
if (errno != EAGAIN && errno != EINTR) {
}
printf("accept: %s\n", strerror(errno));
return -1;
}
int i = 0;
do {
#if 0
for (i = 0;i < MAX_EPOLL_EVENTS;i ++) {
if (reactor->events[i].status == 0) {
break;
}
}
if (i == MAX_EPOLL_EVENTS) {
printf("%s: max connect limit[%d]\n", __func__, MAX_EPOLL_EVENTS);
break;
}
#endif
int flag = 0;
if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
break;
}
nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);
} while (0);
printf("new connect [%s:%d][time:%ld], pos[%d]\n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i);
return 0;
}
int init_sock(short port) {
int fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_SETFL, O_NONBLOCK);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port);
bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (listen(fd, 20) < 0) {
printf("listen failed : %s\n", strerror(errno));
}
printf("listen port : %d\n", port);
return fd;
}
int ntyreactor_init(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
memset(reactor, 0, sizeof(struct ntyreactor));
reactor->epfd = epoll_create(1);
if (reactor->epfd <= 0) {
printf("create epfd in %s err %s\n", __func__, strerror(errno));
return -2;
}
reactor->events = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
if (reactor->events == NULL) {
printf("create epfd in %s err %s\n", __func__, strerror(errno));
close(reactor->epfd);
return -3;
}
}
int ntyreactor_destory(struct ntyreactor *reactor) {
close(reactor->epfd);
free(reactor->events);
}
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {
if (reactor == NULL) return -1;
if (reactor->events == NULL) return -1;
nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);
nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);
return 0;
}
int ntyreactor_run(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
if (reactor->epfd < 0) return -1;
if (reactor->events == NULL) return -1;
struct epoll_event events[MAX_EPOLL_ITEM];
int checkpos = 0, i;
while (1) {
#if 0
long now = time(NULL);
for (i = 0;i < 100;i ++, checkpos ++) {
if (checkpos == MAX_EPOLL_EVENTS) {
checkpos = 0;
}
if (reactor->events[checkpos].status != 1) {
continue;
}
long duration = now - reactor->events[checkpos].last_active;
if (duration >= 60) {
close(reactor->events[checkpos].fd);
printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);
nty_event_del(reactor->epfd, &reactor->events[checkpos]);
}
}
#endif
int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_ITEM, 1000);
if (nready < 0) {
printf("epoll_wait error, exit\n");
continue;
}
for (i = 0;i < nready;i ++) {
struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr;
if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {
ev->callback(ev->fd, events[i].events, ev->arg);
}
if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {
ev->callback(ev->fd, events[i].events, ev->arg);
}
}
}
}
int main(int argc, char *argv[]) {
unsigned short port = SERVER_PORT;
if (argc == 2) {
port = atoi(argv[1]);
}
struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
ntyreactor_init(reactor);
int listenfd[LISTEN_PORT_COUNT] = {0};
int i = 0;
for (i = 0;i < LISTEN_PORT_COUNT;i ++) {
listenfd[i] = init_sock(port+i);
ntyreactor_addlistener(reactor, listenfd[i], accept_cb);
}
ntyreactor_run(reactor);
ntyreactor_destory(reactor);
for (i = 0;i < LISTEN_PORT_COUNT;i ++) {
close(listenfd[i]);
}
return 0;
}