epoll技术简介

epoll概述

(1)I/O多路复用:epoll就是一种典型的I/O多路复用技术:epoll技术的最大特点是支持高并发;
传统多路复用技术select,poll,在并发量达到1000-2000,性能就会明显下降;
epoll,kqueue(freebsd)
epoll,从linux内核2.6引入的,2.6之前是没有的;
(2)epoll和kquene技术类似:单独一台计算机支撑少则数万,多则数十上百万并发连接的核心技术;
epoll技术完全没有这种性能会随着并发量提高而出现明显下降的问题。但是并发没增加一个,必定要消耗一定的内存去保存这个连接相关的数据;
/并发量总还是有限制的,不可能是无限的;
(3)10万个连接同一时刻,可能只有几十上百个客户端给你发送数据,epoll只处理这几十上百个客户端;
(4)很多服务器程序用多进程,每一个进程对应一个连接;也有用多线程做的,每一个线程对应 一个连接;
epoll事件驱动机制,在单独的进程或者单独的线程里运行,收集/处理事件;没有进程/线程之间切换的消耗,高效
(5)适合高并发,融合epoll技术到项目中,作为大家将来从事服务器开发工作的立身之本;
写小demo非常简单,难度只有1-10,但是要把epoll技术融合到商业的环境中,那么难度就会骤然增加10倍;

学习epoll要达到的效果及一些说明

  1. 理解epoll的工作原理;面试考epoll技术的工作原理;
  2. 开始写代码
  3. 认可nginx epoll部分源码;并且能复用的尽量复用;
  4. 继续贯彻用啥讲啥的原则;少就是多;

epoll原理与函数介绍

概览

  • epoll_create: 创建一个epoll实例,文件描述符
  • epoll_ctl: 将监听的文件描述符添加到epoll实例中,实例代码为将标准输入文件描述符添加到epoll中
  • epoll_wait: 等待epoll事件从epoll实例中发生, 并返回事件以及对应文件描述符l

epoll_create()函数

格式:int epoll_create(int size);

功能:创建一个epoll对象,返回该对象的描述符【文件描述符】,这个描述符就代表这个epoll对象,后续会用到;

这个epoll对象最终要用close(),因为文件描述符/句柄 总是关闭的;

参数说明 :从 Linux 内核 2.6.8 版本起,size 这个参数就被忽略了,只要求 size 大于 0 即可。

原理

  1. struct eventpoll *ep = (struct eventpoll*)calloc(1, sizeof(struct eventpoll));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1·//调用epoll_create()的时候我们会创建这个结构的对象
struct eventpoll {
ep_rb_tree rbr; //ep_rb_tree是个结构,所以rbr是结构变量,这里代表红黑树的根;
int rbcnt;

LIST_HEAD( ,epitem) rdlist; //rdlist是结构变量,这里代表双向链表的根;
/* 这个LIST_HEAD等价于下边这个
struct {
struct epitem *lh_first;
}rdlist;
*/
int rdnum; //双向链表里边的节点数量(也就是有多少个TCP连接来事件了)

int waiting;

pthread_mutex_t mtx; //rbtree update
pthread_spinlock_t lock; //rdlist update

pthread_cond_t cond; //block for event
pthread_mutex_t cdmtx; //mutex for cond

};

image-20220209114400358

  1. rbr结构成员:代表一颗红黑树的根节点[刚开始指向空],把rbr理解成红黑树的根节点的指针;
    红黑树,用来保存 键【数字】/值【结构】,能够快速的通过你给key,把整个的键/值取出来;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//这是个节点相关的结构
//作为红黑树的一个节点
struct epitem {
RB_ENTRY(epitem) rbn;
/* RB_ENTRY相当如定义了如下的一个结构成员变量
struct {
struct type *rbe_left; //指向左子树
struct type *rbe_right; //指向右子树
struct type *rbe_parent; //指向父节点
int rbe_color; //该红黑树节点颜色
} rbn*/

LIST_ENTRY(epitem) rdlink;
/*
struct {
struct type *le_next; //指向下个元素
struct type **le_prev; //前一个元素的地址
}*/

int rdy; //exist in list 是否这个节点是同时在双向链表中【这个节点刚开始是在红黑树中】

int sockfd;
struct epoll_event event;
};

  1. rdlist结构成员:代表 一个双向链表的表头指针;

总结:

  1. 创建了一个eventpoll结构对象,被系统保存起来;
  2. rbr成员被初始化成指向一颗红黑树的根【有了一个红黑树】;
  3. rdlist成员被初始化成指向一个双向链表的根【有了双向链表】;
  4. epitem结构即用于红黑树节点,也用于双向链表节点!!!

epitem结构设计的高明之处:既能够作为红黑树中的节点,又能够作为双向链表中的节点;

image-20220209161410167

epoll_ctl()函数

格式:int epoll_ctl(int efpd,int op,int sockid,struct epoll_event *event);

功能:把一个socket以及这个socket相关的事件添加到这个epoll对象描述符中去,目的就是通过这个epoll对象来监视这个socket【客户端的TCP连接】上数据的来往情况。操作控制 epoll 对象,主要涉及 epoll 红黑树上节点的一些操作,比如添加节点,删除节点,修改节点事件。

参数说明

efpd:epoll_create()返回的epoll对象描述符;

op:动作,添加/删除/修改 ,对应数字是1,2,3, EPOLL_CTL_ADD, EPOLL_CTL_DEL ,EPOLL_CTL_MOD

  1. EPOLL_CTL_ADD添加事件:等于你往红黑树上添加一个节点,每个客户端连入服务器后,服务器都会产生 一个对应的socket,每个连接这个socket值都不重复,所以,这个socket就是红黑树中的key,把这个节点添加到红黑树上去;
  2. EPOLL_CTL_DEL:是从红黑树上把这个节点干掉;这会导致这个socket【这个tcp链接】上无法收到任何系统通知事件;
  3. EPOLL_CTL_MOD:修改事件;你 用了EPOLL_CTL_ADD把节点添加到红黑树上之后,才存在修改;

sockid:表示客户端连接,就是你从accept();这个是红黑树里边的key;

event:事件信息,这里包括的是 一些事件信息;EPOLL_CTL_ADD和EPOLL_CTL_MOD都要用到这个event参数里边的事件信息;

源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
int epoll_ctl(int epid, int op, int sockid, struct epoll_event *event) {

nty_tcp_manager *tcp = nty_get_tcp_manager();
if (!tcp) return -1;

nty_trace_epoll(" epoll_ctl --> 1111111:%d, sockid:%d\n", epid, sockid);
struct _nty_socket *epsocket = tcp->fdtable->sockfds[epid];
//struct _nty_socket *socket = tcp->fdtable->sockfds[sockid];

//nty_trace_epoll(" epoll_ctl --> 1111111:%d, sockid:%d\n", epsocket->id, sockid);
if (epsocket->socktype == NTY_TCP_SOCK_UNUSED) {
errno = -EBADF;
return -1;
}

if (epsocket->socktype != NTY_TCP_SOCK_EPOLL) {
errno = -EINVAL;
return -1;
}

nty_trace_epoll(" epoll_ctl --> eventpoll\n");

struct eventpoll *ep = (struct eventpoll*)epsocket->ep;
if (!ep || (!event && op != EPOLL_CTL_DEL)) {
errno = -EINVAL;
return -1;
}

if (op == EPOLL_CTL_ADD) {
//添加sockfd上关联的事件
pthread_mutex_lock(&ep->mtx);

struct epitem tmp;
tmp.sockfd = sockid;
struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); //先在红黑树上找,根据key来找,也就是这个sockid,找的速度会非常快
if (epi) {
//原来有这个节点,不能再次插入
nty_trace_epoll("rbtree is exist\n");
pthread_mutex_unlock(&ep->mtx);
return -1;
}

//只有红黑树上没有该节点【没有用过EPOLL_CTL_ADD的tcp连接才能走到这里】;

//(1)生成了一个epitem对象,大家注意这个结构epitem,这个结构对象,其实就是红黑的一个节点,也就是说,红黑树的每个节点都是 一个epitem对象;
epi = (struct epitem*)calloc(1, sizeof(struct epitem));
if (!epi) {
pthread_mutex_unlock(&ep->mtx);
errno = -ENOMEM;
return -1;
}

//(2)把socket(TCP连接)保存到节点中;
epi->sockfd = sockid; //作为红黑树节点的key,保存在红黑树中

//(3)我们要增加的事件也保存到节点中;
memcpy(&epi->event, event, sizeof(struct epoll_event));

//(4)把这个节点插入到红黑树中去
epi = RB_INSERT(_epoll_rb_socket, &ep->rbr, epi); //实际上这个时候epi的rbn成员就会发挥作用,如果这个红黑树中有多个节点,那么RB_INSERT就会epi->rbi相应的值:可以参考图来理解
assert(epi == NULL);
ep->rbcnt ++;

pthread_mutex_unlock(&ep->mtx);

} else if (op == EPOLL_CTL_DEL) {
//把红黑树节点从红黑树上删除
pthread_mutex_lock(&ep->mtx);

struct epitem tmp;
tmp.sockfd = sockid;

struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp);//先在红黑树上找,根据key来找,也就是这个sockid,找的速度会非常快
if (!epi) {
nty_trace_epoll("rbtree no exist\n");
pthread_mutex_unlock(&ep->mtx);
return -1;
}

//只有在红黑树上找到该节点【用过EPOLL_CTL_ADD的tcp连接才能走到这里】;

//(1)从红黑树上把这个节点干掉
epi = RB_REMOVE(_epoll_rb_socket, &ep->rbr, epi);
if (!epi) {
nty_trace_epoll("rbtree is no exist\n");
pthread_mutex_unlock(&ep->mtx);
return -1;
}

ep->rbcnt --;
free(epi);

pthread_mutex_unlock(&ep->mtx);

} else if (op == EPOLL_CTL_MOD) {
//修改红黑树某个节点的内容
struct epitem tmp;
tmp.sockfd = sockid;
struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); //先在红黑树上找,根据key来找,也就是这个sockid,找的速度会非常快
if (epi) {
//(1)红黑树上有该节点,则修改对应的事件
epi->event.events = event->events;
epi->event.events |= EPOLLERR | EPOLLHUP;
} else {
errno = -ENOENT;
return -1;
}

} else {
nty_trace_epoll("op is no exist\n");
assert(0);
}

return 0;
}
关于event参数的讲解

该参数是 struct epoll_event 类型的指针变量,结构体定义如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef union epoll_data 
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event
{
uint32_t events; /* Epoll 事件 */
epoll_data_t data; /* 用户数据 */
};

events成员

​ 成员 events 代表要监听的 epoll 事件类型,有读事件,写事件,有如下取值。

events参数取值 含义
EPOLLIN 监听 fd 的读事件。举例:如果客户端发送消息过来,代表服务器收到了可读事件。
EPOLLOUT 监听 fd 的写事件。如果 fd 对应的发数据内核缓冲区不为满,只要监听了写事件,就会触发可写事件。
EPOLLRDHUP 监听套接字关闭或半关闭事件,Linux 内核 2.6.17 后可用。
EPOLLPRI 监听紧急数据可读事件。

举例:假设现在我们的服务器通过调用 accept 函数成功与客户端建立连接并得到了通讯套接字 connfd,如果我们需要关心这个客户端是否给服务器发送数据过来(读事件),我们需要将 event 参数的 events 成员的 EPOLLIN 置位为1,相当于 event.events |= EPOLLIN,如果我们并不关心服务器是否可以往客户端写数据(写事件),我们可以将 event 参数的 events 成员的 EPOLLOUT 置位为 0,相当于 event.events &= ~EPOLLOUT。同理,如果你还关心某个事件或者不关心某个事件,就将该位置为 1 或者 0 即可,再通过 epoll_ctl 函数来修改。这样,当我们调用 epoll_wait 函数等待事件时,一旦发生了我们希望监听的事件时,epoll_wait 函数就会返回并通知我们哪个描述符发生了对应的事件。(本文后续会讲到 epoll_wait 函数)

data成员

​ data 成员时一个联合体类型,它可以在我们调用 epoll_ctl 给 fd 添加/修改描述符监听的事件时顺带一些数据。最典型的用法就是每个通讯套接字会对应内存中的一块数据区,这块数据区一般存放着一些连接相关的信息,比如对端的 IP,端口等。当我们要添加该通讯套接字监听事件时就可以把这块内存的地址赋值给 ptr,这样当我们调用 epoll_wait 时也可以取出这些信息。

结构:

a)epi = (struct epitem*)calloc(1, sizeof(struct epitem));

b)epi = RB_INSERT(_epoll_rb_socket, &ep->rbr, epi); 【EPOLL_CTL_ADD】增加节点到红黑树中

//epitem.rbn ,代表三个指针,分别指向红黑树的左子树,右子树,父亲;

//epi = RB_REMOVE(_epoll_rb_socket, &ep->rbr, epi);【EPOLL_CTL_DEL】,从红黑树中把节点干掉

//EPOLL_CTL_MOD,找到红黑树节点,修改这个节点中的内容;

红黑树的节点是调用epoll_ctl[EPOLL_CTL_ADD]往里增加的节点;面试可能考

红黑树的节点是epoll_ctl[EPOLL_CTL_DEL]删除的;

总结:

EPOLL_CTL_ADD:等价于往红黑树中增加节点

EPOLL_CTL_DEL:等价于从红黑树中删除节点

EPOLL_CTL_MOD:等价于修改已有的红黑树的节点

一百万个并发连接到来,都调用epoll_ctl()函数,一百万个sockid加入红黑树中。

当事件发生,我们如何拿到操作系统的通知;

epoll_wait()函数

函数原型 :int epoll_wait(int epid, struct epoll_event *events, int maxevents, int timeout);

功能说明 :阻塞一段时间并等待事件发生,返回事件集合,也就是获取内核的事件通知。说白了就是遍历双向链表,把双向链表里的节点数据拷贝出来,拷贝完毕后就从双向链表移除。

参数说明

epid:epoll_create 返回的 epoll 对象描述符。

events:存放就绪的事件集合,这个是传出参数。

maxevents:代表可以存放的事件个数,也就是 events 数组的大小。

timeout:阻塞等待的时间长短,以毫秒为单位,如果传入 -1 代表阻塞等待。

返回值说明

返回值 含义
>0 代表有几个我们希望监听的事件发生了
=0 timeout 超时时间到了
<0 出错,可以通过 errno 值获取出错原因
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//到双向链表中去取相关的事件通知
int epoll_wait(int epid, struct epoll_event *events, int maxevents, int timeout) {

nty_tcp_manager *tcp = nty_get_tcp_manager();
if (!tcp) return -1;

//nty_socket_map *epsocket = &tcp->smap[epid];
struct _nty_socket *epsocket = tcp->fdtable->sockfds[epid];
if (epsocket == NULL) return -1;

if (epsocket->socktype == NTY_TCP_SOCK_UNUSED) {
errno = -EBADF;
return -1;
}

if (epsocket->socktype != NTY_TCP_SOCK_EPOLL) {
errno = -EINVAL;
return -1;
}

struct eventpoll *ep = (struct eventpoll*)epsocket->ep;
if (!ep || !events || maxevents <= 0) {
errno = -EINVAL;
return -1;
}

if (pthread_mutex_lock(&ep->cdmtx)) {
if (errno == EDEADLK) {
nty_trace_epoll("epoll lock blocked\n");
}
assert(0);
}

//(1)这个while用来等待一定的时间【在这段时间内,发生事件的TCP连接,相关的节点,会被操作系统扔到双向链表去【当然这个节点同时也在红黑树中呢】】
while (ep->rdnum == 0 && timeout != 0) {

ep->waiting = 1;
if (timeout > 0) {

struct timespec deadline;

clock_gettime(CLOCK_REALTIME, &deadline);
if (timeout >= 1000) {
int sec;
sec = timeout / 1000;
deadline.tv_sec += sec;
timeout -= sec * 1000;
}

deadline.tv_nsec += timeout * 1000000;

if (deadline.tv_nsec >= 1000000000) {
deadline.tv_sec++;
deadline.tv_nsec -= 1000000000;
}

int ret = pthread_cond_timedwait(&ep->cond, &ep->cdmtx, &deadline);
if (ret && ret != ETIMEDOUT) {
nty_trace_epoll("pthread_cond_timewait\n");

pthread_mutex_unlock(&ep->cdmtx);

return -1;
}
timeout = 0;
} else if (timeout < 0) {

int ret = pthread_cond_wait(&ep->cond, &ep->cdmtx);
if (ret) {
nty_trace_epoll("pthread_cond_wait\n");
pthread_mutex_unlock(&ep->cdmtx);

return -1;
}
}
ep->waiting = 0;

}

pthread_mutex_unlock(&ep->cdmtx);

//等一小段时间,等时间到达后,流程来到这里。。。。。。。。。。。。。。

pthread_spin_lock(&ep->lock);

int cnt = 0;

//(1)取得事件的数量
//ep->rdnum:代表双向链表里边的节点数量(也就是有多少个TCP连接来事件了)
//maxevents:此次调用最多可以收集到maxevents个已经就绪【已经准备好】的读写事件
int num = (ep->rdnum > maxevents ? maxevents : ep->rdnum); //哪个数量少,就取得少的数字作为要取的事件数量
int i = 0;

while (num != 0 && !LIST_EMPTY(&ep->rdlist)) { //EPOLLET

//(2)每次都从双向链表头取得 一个一个的节点
struct epitem *epi = LIST_FIRST(&ep->rdlist);

//(3)把这个节点从双向链表中删除【但这并不影响这个节点依旧在红黑树中】
LIST_REMOVE(epi, rdlink);

//(4)这是个标记,标记这个节点【这个节点本身是已经在红黑树中】已经不在双向链表中;
epi->rdy = 0; //当这个节点被操作系统 加入到 双向链表中时,这个标记会设置为1。

//(5)把事件标记信息拷贝出来;拷贝到提供的events参数中
memcpy(&events[i++], &epi->event, sizeof(struct epoll_event));

num --;
cnt ++; //拷贝 出来的 双向链表 中节点数目累加
ep->rdnum --; //双向链表里边的节点数量减1
}

pthread_spin_unlock(&ep->lock);

//(5)返回 实际 发生事件的 tcp连接的数目;
return cnt;
}

image-20220209162601731

epoll_wait 做的事情不复杂,当它被调用时它观察 eventpoll->rdllist 链表里有没有数据即可。有数据就返回,没有数据就创建一个等待队列项,将其添加到 eventpoll (eventpoll本身也占用一个文件描述符,拥有等待队列)的等待队列上,然后把自己(阻塞掉当前进程)阻塞掉就完事。

内核向双向链表增加节点

一般有四种情况,会使操作系统把节点插入到双向链表中;

  1. 客户端完成三路握手;服务器要accept();
  2. 当客户端关闭连接,服务器也要调用close()关闭;
  3. 客户端发送数据来的;服务器要调用read(),recv()函数来收数据;
  4. 当可以发送数据时;服务器可以调用send(),write();
  5. 其他情况;写实战代码再说;

总结

epoll使用RB-Tree红黑树去监听并维护所有文件描述符,RB-Tree的根节点

调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个 红黑树 用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件.

epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。而且,通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已.

那么,这个准备就绪list链表是怎么维护的呢?

当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了。

epoll相比于select并不是在所有情况下都要高效,例如在如果有少于1024个文件描述符监听,且大多数socket都是出于活跃繁忙的状态,这种情况下,select要比epoll更为高效,因为epoll会有更多次的系统调用,用户态和内核态会有更加频繁的切换。

epoll监听全过程

epoll_ctl()将当前socket

为了简单,我们只考虑使用 EPOLL_CTL_ADD 添加 socket,先忽略删除和更新。

假设我们现在和客户端们的多个连接的 socket 都创建好了,也创建好了 epoll 内核对象。在使用 epoll_ctl 注册每一个 socket 的时候,内核会做如下三件事情

1.epollctl分配一个红黑树节点对象 epitem

图片

fd指向监测的文件描述符,ep指向struct epoll

2.添加等待事件到 监听socket 的等待队列中,其回调函数是 ep_poll_callback

在创建 epitem 并初始化之后,ep_insert 中第二件事情就是设置 socket 对象上的等待任务队列。并把函数 fs/eventpoll.c 文件下的 ep_poll_callback 设置为数据就绪时候的回调函数。

图片

3.将 epitem 插入到 epoll 对象的红黑树里

图片

监听的socket收到了数据

在前面 epoll_ctl 执行的时候,内核为每一个 socket 上都添加了一个等待队列项。在 epoll_wait 运行完的时候,又在 event poll 对象上添加了等待队列元素。在讨论数据开始接收之前,我们把这些队列项的内容再稍微总结一下。

图片

  • 在 socket 的等待队列项中,其回调函数是 ep_poll_callback。另外其 private 没有用了,指向的是空指针 null。
  • 在 eventpoll 的等待队列项中,回调函数是 default_wake_function。其 private 指向的是等待该事件的用户进程。

ep_poll_callback

当 socket 上数据就绪时候,内核将以 sock_def_readable 这个函数为入口,找到 epoll_ctl 添加 socket 时在其上设置的回调函数 ep_poll_callback。

  1. 首先它做的第一件事就是把自己的 epitem 添加到 epoll 的就绪队列中
  2. 接着它又会查看 eventpoll 对象上的等待队列里是否有等待项(epoll_wait 执行的时候会设置)。
  3. 如果没执行软中断的事情就做完了。如果有等待项,那就查找到等待项里设置的回调函数default_wake_function。

default_wake_function

在default_wake_function 中找到等待队列项里的进程描述符,然后唤醒之。

将epoll_wait进程推入可运行队列,等待内核重新调度进程。然后epoll_wait对应的这个进程重新运行后,就从 schedule 恢复

当进程醒来后,继续从 epoll_wait 时暂停的代码继续执行。把 rdlist 中就绪的事件返回给用户进程

总结下,epoll 相关的函数里内核运行环境分两部分:

  • 用户进程内核态。进行调用 epoll_wait 等函数时会将进程陷入内核态来执行。这部分代码负责查看接收队列,以及负责把当前进程阻塞掉,让出 CPU。
  • 硬软中断上下文。在这些组件中,将包从网卡接收过来进行处理,然后放到 socket 的接收队列。对于 epoll 来说,再找到 socket 关联的 epitem,并把它添加到 epoll 对象的就绪链表中。这个时候再捎带检查一下 epoll 上是否有被阻塞的进程,如果有唤醒之。

recv()

参数说明

  1. 第一个参数指定接收端套接字描述符;
  2. 第二个参数指明一个缓冲区,该缓冲区用来存放recv函数接收到的数据;
  3. 第三个参数指明buf的长度;
  4. 第四个参数一般置0。

同步Socket的recv函数的执行流程

当应用程序调用recv函数时:

  1. recv先等待 SOCKET s 的发送缓冲中的数据被协议传送完毕,如果协议在传送s的发送缓冲中的数据时出现网络错误,那么recv函数返回SOCKET_ERROR
  2. 如果s的发送缓冲区中没有数据或者数据被协议成功发送完毕后,recv先检查套接字s的接收缓冲区
  3. 如果s的接收缓冲区中没有数据或者协议正在接收数据,那么recv就一直等待,直到协议把数据接收完毕;
  4. 当协议把数据接收完毕,recv函数就把s的接收缓冲区中的数据copy到buf中。(注意协议接收到的数据可能大于buf的长度,所以 在这种情况下要调用几次recv函数才能把s的接收缓冲中的数据copy完。recv函数仅仅是copy数据,真正的接收数据是协议来完成的), recv函数返回其实际copy的字节数。如果recv在copy时出错,那么它返回SOCKET_ERROR
  5. 如果recv函数在等待协议接收数据时网络中断了,那么它返回0。

默认 socket 是阻塞的,阻塞与非阻塞recv返回值没有区分,都是 <0 出错 =0 连接关闭 >0 接收到数据大小。

返回值

  1. 成功执行时,返回接收到的字节数。
  2. 另一端已关闭则返回0。
  3. 失败返回-1,errno被设为以下的某个值 :
  • EAGAIN:套接字已标记为非阻塞,而接收操作被阻塞或者接收超时
  • EBADF:sock不是有效的描述词
  • ECONNREFUSE:远程主机阻绝网络连接
  • EFAULT:内存空间访问出错
  • EINTR:操作被信号中断
  • EINVAL:参数无效
  • ENOMEM:内存不足
  • ENOTCONN:与面向连接关联的套接字尚未被连接上
  • ENOTSOCK:sock索引的不是套接字 当返回值是0时,为正常关闭连接;

特别

返回值<0时并且(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)的情况下认为连接是正常的,继续接收。

EAGAIN、EWOULDBLOCK、EINTR与非阻塞

  • EWOULDBLOCK:用于非阻塞模式,不需要重新读或者写
  • EINTR:指操作被中断唤醒,需要重新读/写

如果出现EINTR即errno为4,错误描述Interrupted system call,操作应该继续。

  • EAGAIN:Linux - 非阻塞socket编程处理EAGAIN错误

在linux进行非阻塞的socket接收数据时经常出现Resource temporarily unavailable,errno代码为11(EAGAIN)。从字面上来看,是提示再试一次。这个错误经常出现在当应用程序进行一些非阻塞(non-blocking)操作(对文件或socket)的时候。这个错误不会破坏socket的同步,不用管它,下次循环接着recv就可以。

对非阻塞socket而言,EAGAIN不是一种错误。在VxWorks和Windows上,EAGAIN的名字叫做EWOULDBLOCK。

    例如,以 O_NONBLOCK的标志打开文件/socket/FIFO,如果你连续做read操作而没有数据可读。此时程序不会阻塞起来等待数据准备就绪返 回,read函数会返回一个错误EAGAIN,提示你的应用程序现在没有数据可读请稍后再试。

    又例如,当一个系统调用(比如fork)因为没有足够的资源(比如虚拟内存)而执行失败,返回EAGAIN提示其再调用一次(也许下次就能成功)。

ET,LT模式深入分析及测试

边缘触发ET

image-20220211141810994

如图所示,在ngx_event_accept()函数中我们设置了边缘触发模式ET,边缘触发/告诉模式,这个事件通知只会出现一次;
然后我们试着运行nginx程序,利用telnet向服务器发送数据,每次只会通知一次,只执行一次处理函数,处理函数如下所示,每次只接收两个字节。

1
2
3
4
5
6
7
8
void CSocekt::ngx_wait_request_handler(lpngx_connection_t c)
{
unsigned char buf[10]={0};
memset(buf,0,sizeof(buf));
int n = recv(c->fd,buf,2,0); //每次只收两个字节
ngx_log_stderr(0,"OK,收到的字节数为%d,内容为%s",n,buf);
return;
}

image-20220211141244654

image-20220211141301745

运行结果如图所示,客户端总共发了4次消息,而服务器只接收了仅仅8个字节,剩下字节都在接收缓冲区当中

而如果我们修改一下处理函数,变为一个while循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void CSocekt::ngx_wait_request_handler(lpngx_connection_t c)
{
unsigned char buf[10]={0};
memset(buf,0,sizeof(buf));
do{
int n = recv(c->fd,buf,2,0); //每次只收两个字节

if(n == -1 && errno==EAGAIN)
break;
else if(n == 0)
break;

ngx_log_stderr(0,"OK,收到的字节数为%d,内容为%s",n,buf);
}while(1);

return;
}

image-20220211144108304

水平触发LT

image-20220211144225304

处理函数仍然是之前的,没有while循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
void CSocekt::ngx_wait_request_handler(lpngx_connection_t c)
{
unsigned char buf[10]={0};
memset(buf,0,sizeof(buf));
int n = recv(c->fd,buf,2,0); //每次只收两个字节
if(n == 0){
ngx_free_connection(c);
close(c->fd);
c->fd = -1;
}
ngx_log_stderr(0,"OK,收到的字节数为%d,内容为%s",n,buf);
return;
}

image-20220211144642612

运行结果如图,水平触发能够接收到所有字节。

快速处理

如果是 lt 模式,epi 节点刚开始在内核被删除,然后数据从内核空间拷贝到用户空间后,内核马上将这个被删除的节点重新追加回就绪队列,这个速度很快,所以后面来的新的就绪事件很大几率会排在已经处理过的事件后面。
LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

而 et 模式呢,数据从内核拷贝到用户空间后,内核不会重新将就绪事件节点添加回就绪队列,当事件在用户空间处理完后,用户空间根据需要重新将这个事件通过 epoll_ctl 添加回就绪队列(又或者这个节点因为有新的数据到来,重新触发了就绪事件而被添加)。从节点被删除到重新添加,这中间的过程是比较“漫长”的,所以新来的其它事件节点能排在旧的节点前面,能快速处理。
ET (edge-triggered) 是高速工作方式,只支持no-block socket。 在这种模式下,当描述符从未就绪变为就绪时,内核就通过epoll告诉你,然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的 就绪通知,直到你做了某些操作而导致那个文件描述符不再是就绪状态(比如 你在发送,接收或是接受请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核就不会发送更多的通知(only once)。不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。

惊群

当多个进程共享同一个 “epoll fd” 时,多个进程同时在等待资源,也就是多个进程通过 epoll_wait 将自己当前进程的等待事件挂在内核 epoll 实例 eventpoll.wq 等待队列上,换句话说,eventpoll.wq 等待队列上挂着多个进程的等待事件,当某个事件触发时,等待队列上的进程会被唤醒。

如果是 lt 模式,epoll 在下一个 epoll_wait 执行前,fd 事件节点仍然会存在就绪队列中,不管事件是否处理完成,那么唤醒进程 A 处理事件时,如果 B 进程也在等待资源,那么同样的事件有可能将 B 进程也唤醒处理,然后 B 又是同样的逻辑唤醒 C —— 连环唤醒问题,这种情况可能是用户不愿意看到的。

通过阅读 ep_send_events_proc 源码,最大区别就是,事件通知。

当用户关注的 fd 事件发生时,et 模式,只通知用户一次,不管这个事件是否已经被用户处理完毕,直到该事件再次发生,或者用户通过 epoll_ctl 重新关注该 fd 对应的事件;而 lt 模式,会不停地通知用户,直到用户把事件处理完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv) {
...
// 遍历处理 txlist(原 ep->rdllist 数据)就绪队列结点,获取事件拷贝到用户空间。
list_for_each_entry_safe (epi, tmp, head, rdllink) {
if (esed->res >= esed->maxevents)
break;
...
/* 先从就绪队列中删除 epi。*/
list_del_init(&epi->rdllink);

/* 获取 epi 对应 fd 的就绪事件。 */
revents = ep_item_poll(epi, &pt, 1);
if (!revents)
/* 如果没有就绪事件,说明就绪事件已经处理完了,就返回。(这时候,epi 已经从就绪队列中删除了。) */
continue;

...
else if (!(epi->event.events & EPOLLET)) {
/* lt 模式,前面删除掉了的就绪事件节点,重新追加到就绪队列尾部。*/
list_add_tail(&epi->rdllink, &ep->rdllist);
...
}
}

return 0;
}

结论

LT:水平触发/低速模式,这个事件没处理完,就会被 一直触发;

ET:边缘触发/告诉模式,这个事件通知只会出现一次;

普遍认为ET比LT效率高一些,但是 ET编程难度比LT大一些;

ET模式下,如果没有数据可接收,则recv会返回-1

思考:
为什么ET模式事件只触发一次[事件被扔到双向链表中一次,被epoll_wait取出后就干掉]
LT模式事件会触发多次呢?[事件如果没有处理完【比如缓冲区的数据没有读完】,那么事件会被多次往双向链表中扔]

如何选择ET,还是LT
如果收发数据包有固定格式【后续会讲】,那么建议采取LT:编程简单,清晰,写好了效率不见得低;本项目中采用LT这种方法【固定格式的数据收发方式来写我们的项目】
如果收发数据包没有固定格式,可以考虑采用ET模式;