tags:
- OS
异步——Asynchronous这个单词由否定前缀 "A-" ,表示一起、同的前缀 "SYN-" 和表示时间的希腊词根 "CHRON" 构成。字面意思就是不同时/不同步的意思,也就是我们所说的异步。当然,你也可以理解为随机。
由于I/O完成的时间是随机的,我们总会担心I/O操作完成后的中断被错过,我们可能就会想到阻塞/自旋等待I/O操作的完成,这种等待I/O操作完成的操作就被称为同步I/O。但这段阻塞/自旋的时间就会被白白浪费,这不是我们想要的。
而通过异步I/O,CPU就可以在I/O操作完成前继续执行其他任务,不需要阻塞等待整个I/O操作完成。其实就相当于等公交,你并不会在等公交时傻傻地盯着远处一直看,而是在等待的过程中做自己的事情,当公交车到了时,司机会提醒你上车,这时才会停下手上的活去挤公交。
学过线程,我们可以用多个线程来处理多个I/O操作。如果线程被I/O阻塞了我们就重新创建一个线程。这确实是可行的,但是额外的线程就意为着多一份资源消耗和复杂性,滥用多线程可能并不是一个好的选择。况且有的语言并不支持多线程,如JavaScript,只有一个线程,我们别无选择。
在大多数情况下,我们都希望使用一个线程来处理数个 I/O 请求。在学习真正的异步 I/O 之前,我们来学习一下同步 I/O 复用。还是公交车的例子,同步 I/O 复用就相当于是等公交车的时候每隔一段时间抬头看一看车到没到。但抬头的这段时间就会影响到玩手机的时间。
select 是一个有些过时的同步 I/O 复用 API,在大多数系统上都广泛支持。
不是所有的I/O操作都能够异步完成,在文件系统的章节中,我们会学到read()
系统调用。read()
就是一个阻塞的 I/O 操作,意味着当程序调用 read()
时,它会等待数据被读取到内存buffer后才继续执行后续的代码,当遇到阻塞的I/O操作,你只能同步等待I/O操作完成。
虽然我们还学到了许多 flags ,比如 O_NONBLOCK
的文件flag选项,也就是说用非阻塞的方式打开文件。但当设置文件的操作是O_NONBLOCK
后,read()
可能仍然会阻塞当前的线程。为什么?
#include <fcntl.h> // For open(), O_RDONLY, O_WRONLY, O_RDWR, and file access modes
#include <unistd.h>// For close(), read(), write(), fsync(), lseek(), and others.
// Non-blocking file read. It still blocks, not what we expected.
int fd = open("example.txt", O_RDONLY | O_NONBLOCK);
int bytes_read = read(fd, buffer, numbytes);
// Do something here.
close(fd);
我们所想的是使用read()
时,无论数据是否被放到缓冲区里,read()
系统调用都应当立即返回一个返回值。但其实只有当使用 O_NONBLOCK
标志打开文件且没有数据可读,read()
调用才会立即返回。然而,对于磁盘文件,数据通常是立即可用的,因为内核会通过页缓存把数据从磁盘缓存到内存中,这个过程仍然会阻塞。
不同于读文件的确定性,对于 socket 或者 pipe 这类特殊文件,系统也不能确定 socket 什么时候能有数据,数据的大小是多少,这时 O_NONBLOCK
通常是可行的。
既然对文件的读取不会再阻塞了,我们是否可以每隔一段时间就轮询一遍,这样就可以实现在一个线程里面同步复用多个 I/O 了。这确实是可行的。实际上,select 使用的就是类似的机制。
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(sockfd, F_SETFL, O_NONBLOCK);
当设置非阻塞套接字后,accept()
, recv()
和 recvfrom()
都将是非阻塞的。这些系统调用将返回 -1
并设置 errno
为 EAGAIN
或 EWOULDBLOCK
。
那设置 non-blocking 的 sockets 有什么用处?我们的服务器总是要服务许多的客户端,假设每个客户端对应一个 socket 连接,那么服务器就需要响应监听这么多的 socket 连接。而这些网络相关的系统调用往往是阻塞的,我们并不想使用多线程来监听这些连接。
通过设置非阻塞 socket,我们就不用被 socket 相关的系统调用所阻塞。进而,我们可以在主线程中轮询监听这些 sockets,也就是紧轮询。只在检测到客户端的连接时创建线程来响应服务客户端,免去浪费线程资源来处理无意义的 IO 阻塞。
Tight polling(轮询)我们在IO 阶段中了解过。我们了解了 tight polling 会不停的循环问询浪费CPU资源,尽管简单,但是无法避免地 CPU 做无用功。所以我们并不想这么做。
内核为我们提供了一种更好的办法:select。select 使得我们可以设置一个监视器来监视一组的套接字,并告诉我们组中每个套接字的状态。socket状态可以是待读、待写和发生异常情况三种状态。select()
会根据socket的状态将sockets放到可读、可写和一些异常的三种集合中进行管理。
下面是select()
系统调用的函数原型:
#include <sys/select.h> // For select() and related macros
#include <sys/time.h> // For struct timeval
#include <sys/types.h> // For data types used in some system calls
#include <unistd.h> // For close(), read(), and other unix system calls
int select( int nfds,
fd_set *_Nullable restrict readfds,
fd_set *_Nullable restrict writefds,
fd_set *_Nullable restrict exceptfds,
struct timeval *_Nullable restrict timeout);
/* Parameters:
1. nfds: The highest-numbered file descriptor in any of the three sets, plus 1.
2. readfds: Pointer to an fd_set that will be checked for readability.
3. writefds: Pointer to an fd_set that will be checked for writability.
4. exceptfds: Pointer to an fd_set that will be checked for exceptional conditions.
5. timeout: Pointer to a struct timeval that specifies the maximum interval to wait for any file descriptor to become ready. If NULL, select() will block indefinitely.
Return value:
- On success, returns the number of file descriptors contained in the three returned descriptor sets (that is, the total number of bits that are set in readfds, writefds, and exceptfds).
- On error, returns -1 and sets errno to indicate the error.
*/
nfds
我们来看第一个参数,nfds
是要监视文件描述符集合中最大文件描述符的值加 1。它用于告诉 select()
要检查的文件描述符的范围。具体来说,select()
会监控文件描述符从 0 到 nfds
- 1 的文件描述符,等待一个或多个文件描述符变为"ready"状态。
在许多老系统中,将进程打开文件表表项的最大数量设置为 1024 ,也就是每个进程最多可以打开的文件数。因此,select()
的监视集合 fd_set
能够最多监视 1024 个文件描述符。fd_set
是用固定大小的位图 bit field (bit array) 来实现的,每个文件描述符对应一个 bit。
通过设置 nfds
,在 select()
遍历时,内核遍历到最大的监视项后就会立即停止遍历。现在很多系统中,一个进程可能需要打开的文件数量可能要比 1024 个大得多。而 select()
最多也只能监视 1024 个文件描述符,这也是 select()
的缺点。在select Linux manual中,也推荐使用poll或epoll来获取更大的文件句柄监视范围。
我们用fd_set
结构来定义监视类型的结构:
#include <sys/select.h>
fd_set readfds;
fd_set writefds;
fd_set exceptfds;
fd_set
用于表示一个文件描述符的集合。在 POSIX 下,fd_set
最多能够容纳的文件描述符的数量在 FD_SETSIZE
宏中定义,我们前面提到了,这个数字在很多系统中都是 1024,可能不会改变。所以通常而言,fd_set
数据结构都是一个大小为 1024 字节的位图。
select()
提供了一些设置fd_set
的函数,如下:
#include <sys/select.h> // For select() and related macros
void FD_ZERO(fd_set *set);
/* Parameters:
1. set: Pointer to an fd_set structure that will be initialized to have zero bits set, meaning no file descriptors are part of the set.
*/
FD_ZERO(fd_set *set)
用于初始化或清除一个fd_set
结构,将其所有位清零,表示集合中没有任何文件描述符。由于 select
要循环地检查这些文件描述符,当要复用文件描述符时,你要重新进行初始化这些集合。
void FD_SET(int fd, fd_set *set);
/* Parameters:
1. fd: The file descriptor to be added to the set.
2. set: Pointer to an fd_set structure where the file descriptor will be added.
*/
FD_SET(int fd, fd_set *set)
用于将一个文件描述符添加到 fd_set
结构中。它会设置相应的位,表示该文件描述符现在是集合的一部分。
void FD_CLR(int fd, fd_set *set);
/* Parameters:
1. fd: The file descriptor to be removed from the set.
2. set: Pointer to an fd_set structure from which the file descriptor will be removed.
*/
FD_CLR(int fd, fd_set *set)
用于从 fd_set
结构中移除一个文件描述符。它会清除相应的 bit 位,表示该文件描述符不再是集合的一部分。
int FD_ISSET(int fd, fd_set *set);
/* Parameters:
1. fd: The file descriptor to be checked.
2. set: Pointer to an fd_set structure that will be checked to see if the file descriptor is part of the set.
Return value:
- Returns a non-zero value if the file descriptor is part of the set, otherwise returns 0.
*/
FD_ISSET(int fd, fd_set *set)
用于检查一个文件描述符是否在 fd_set
结构中。如果该文件描述符在集合中,则返回非零值,否则返回 0。
select
会将文件描述符归为三类,分别是可读(readfds
)、可写(writefds
)和异常(excaptfds
)。这三类参数是可选的,当你不需要某一类时,你可以将那个参数设置为NULL
。这三个参数也是select 最重要的参数。
最后一个参数是timeval
,这是一个结构体,用于指定 select()
等待文件描述符变为就绪的最大时间间隔。在这个等待时间内,select会阻塞直到(1)一个文件描述符准备好了;(2)被信号所中断;(3)时间到了。
timeval
结构体的定义如下:
#include <sys/time.h>
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
当你将两个字段都设置为0,select()
就会立刻返回。要是sockets中有数据可读,select()
就会告诉你,不然就会告诉你监视的fds都没有准备好呢。如果timeval
为 NULL
,select()
将无限期地阻塞,直到至少有一个文件描述符变为就绪。
再用等公交车的例子说明一下,设置 timeval
的作用就是控制你抬头看公交车到没到的时间。在抬头的这段时间里,你实际上是被硬控阻塞的。
无论因为何种原因导致select()
的返回,除了 nfds
之外的一些参数可能会被更新。
当我们调用 select()
时,传入的文件描述符集合(readfds
、writefds
和 exceptfds
)会被内核修改,以反映哪些文件描述符在调用期间变为就绪状态。这是因为 select()
需要告诉你哪些文件描述符可以进行I/O操作,而不会阻塞。
例如,你传入一个包含多个文件描述符的 readfds
集合,select()
会在返回时修改这个集合,只保留那些在调用期间变为可读的文件描述符。这样,你可以通过检查返回的集合来确定哪些文件描述符可以进行读取操作。
如果在调用 select()
时传入了 timeval
结构体,内核可能(不)会修改这个结构体,以反映在调用期间实际经过的时间。如果你设置了一个 5 秒的超时时间,但 select()
在 2 秒后返回,有的操作系统内核实现可能会将 timeval
结构体中的剩余时间更新为 3 秒;而有些系统则会保留原先 5 秒的超时。因此,重新使用 timeval
结构体是不安全的,应该在每次调用 select()
前重新设置。
当select()
返回时,它会将哪些可以直接进行的I/O操作保留在fd_sets里。要知道有哪些 I/O 操作,我们就需要迭代遍历检查我们之前添加过的所有文件描述符。这时,FD_ISSET
宏的作用就显现出来了。
因为 select()
调用会修改传入的参数,以反映哪些文件描述符已经准备好进行 I/O 操作,以及实际经过的时间。所以在处理完成一轮的 select()
调用之后,我们需要重新设置 fd_sets
、timeval
甚至 nfds
。
void listen_for_connections(int client_sock1, int client_sock2, int client_sock3);
{
int nfds = 1 + (client_sock1 > client_sock2
?
(client_sock1 > client_sock3 ? client_sock1 : client_sock3)
:
(client_sock2 > client_sock3 ? client_sock2 : client_sock3));
fd_set s;
struct timeval tv;
ptintf("Startup complete!\n");
while(!quit){
FD_ZERO(&s);
FD_SET(service1_sock, &s);
FD_SET(service2_sock, &s);
FD_SET(service3_sock, &s);
tv.tv_sec = 30;
tv.tv_usec = 0;
int res = select(nfds, &s, NULL, NULL, &tv);
// An Error occurred.
if(res == -1){
printf("An error occurred in selest(): %s.\n", strerror(errno));
quit = 1;
}
else if(res == 0){
printf("Still waiting events...\n");
}
else{
if(FD_ISSET(service1_sock, &s)){
service1_activate();
}
if(FD_ISSET(service2_sock, &s)){
service2_activate();
}
if(FD_ISSET(service3_sock, &s)){
service3_activate();
}
}
}
}
从上面的代码中,你实际上可以感受到 select
有多么低效。比如我们的进程打开了 560 个文件,但是只需要监视 550、540、539 号文件,这时,你传入的 nfds
是 551 (0号文件到550号文件共551个文件) 。猜猜 select
会帮你做什么?每次调用 select()
时,它都会遍历进程打开文件表中从 0
到 nfds-1
的文件描述符。即便其他 548 个描述符完全无关。
而且,如果文件打开超过 1024 个文件,那就完蛋了。
除了select()
,我们还有一个pselect()
函数。pselect()
的最后两个参数与select()
有所不同。用于定义时间间隔的结构体timespec
是一个const
类型的参数,pselect()
保证不对这个结构体的任何改变。另一个大的改变就是sigmask
参数,用于定义哪些信号在等待期间被屏蔽。
pselect()
的函数原型如下:
#include <sys/select.h> // For select() and related macros
#include <sys/time.h> // For struct timeval
#include <sys/types.h> // For data types used in some system calls
#include <unistd.h> // For close(), read(), write(), and other system calls
#include <signal.h> // For sigset_t and related functions
int pselect(int nfds,
fd_set *_Nullable restrict readfds,
fd_set *_Nullable restrict writefds,
fd_set *_Nullable restrict exceptfds,
const struct timespec *_Nullable restrict timeout,
const sigset_t *_Nullable restrict sigmask);
/* Parameters:
1. nfds: The highest-numbered file descriptor in any of the three sets, plus 1.
2. readfds: Pointer to an fd_set that will be checked for readability.
3. writefds: Pointer to an fd_set that will be checked for writability.
4. exceptfds: Pointer to an fd_set that will be checked for exceptional conditions.
5. timeout: Pointer to a struct timespec that specifies the maximum interval to wait for any file descriptor to become ready. If NULL, pselect() will block indefinitely.
6. sigmask: Pointer to a sigset_t that specifies the signal mask to be used during the wait. If NULL, the signal mask is not changed.
Return value:
- On success, returns the number of file descriptors contained in the three returned descriptor sets (that is, the total number of bits that are set in readfds, writefds, and exceptfds).
- On error, returns -1 and sets errno to indicate the error.
*/
pselect()
中的timespec
结构体容许我们设置更精细的时间间隔,它的结构体原型如下:
#include <sys/time.h>
struct timespec{
long tv_sec; /* seconds */
long tv_nsec; /* nanoseconds */
};
允许你在调用pselect()
的同时原子化地修改signal mask。在这个并发的世界中,我们对原子化的爱是毋庸置疑的。下面的一条pselect()
语句:
ready = pselect(nfds, &readfds, &writefds, &exceptfds, timeout, &sigmask);
和原子化的
sigset_t origmask;
pthread_sigmask(SIG_SETMASK, &sigmask, &origmask); // Mask the signals in sigmask.
ready = select(nfds, &readfds, &writefds, &exceptfds, timeout);
pthread_sigmask(SIG_SETMASK, &origmask, NULL); // Back to origmask stage.
是等价的。
另一个与 select 相近的 API 叫poll()
,它也是一种同步 IO 复用的 API 。相比于select()
,poll()
要求的参数更少。现在,你不再需要计算最大的nfds
是多少了,也不用再提供三种集合了。但由于它们实现上的相近,作为select()
的表亲,并没有带来性能上的加成,它们都很慢。
poll()
的函数原型如下:
#include <poll.h> // glibc library for poll() and related macros
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
/* Parameters:
1. fds: Pointer to an array of struct pollfd, which specifies the file descriptors to be monitored.
2. nfds: The number of items in the fds array.
3. timeout: The maximum number of milliseconds that poll() will block. A negative value means an infinite timeout, while zero means poll() will return immediately.
Return value:
- On success, returns the number of file descriptors with events, which may be zero if the timeout expired.
- On error, returns -1 and sets errno to indicate the error.
*/
poll()
将所有的监视项都放在了一个pollfd
结构体类型的数组中。在使用时,你还需要提供数组的项数(你想让它监视多少项),这比select()
方便了不少。你想指定的事件需要在pollfd
结构体中说明,这个结构体的原型如下:
#include <poll.h>
struct pollfd{
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
/*
events/revents:
- POLLIN: Look for if there is data to read.
- POLLOUT: Writing is now possible without blocking.
- POLLRDHUP: Stream socket peer closed connection, or shut down writing half of connection.
- POLLPRI: There is urgent data to read.
revents specific(be ignored in events):
- POLLERR: Error condition.
- POLLHUP: Hang up, otherside closed socket.
- POLLNVAL: Invalid request: fd not open.
compiling with _XOPEN_SOURCE(additional bits):
- POLLRDNORM: equivalent to POLLIN
- POLLRDBAND
- POLLWRNORM
- POLLWRBAND
*/
fd
表示要监视的文件描述符。events
是一个bit mask用于指定要监视何种事件,这是一个输入变量。revents
是内核所设置的return events,用于标识当poll()
返回时,实际发生的事件。如果fd
是一个负值,那么字段events
就会被忽略,而且revents
会返回0
值。
poll()
void listen_for_connections(int client_sock1, int client_sock2, int client_sock3)
{
struct pollfd pollfds[3];
pollfds[0].fd = client_sock1;
pollfds[0].events = POLLIN;
pollfds[1].fd = client_sock2;
pollfds[1].events = POLLIN;
pollfds[2].fd = client_sock3;
pollfds[2].events = POLLIN;
int timeout = 30 * 1000;
printf( "Going to start listening for socket events.\n" );
while(!quit){
int res = poll(&pollfds, 3, timeout);
// Error checking.
if(res == -1){
printf
quit = 1;
}
// 0 sockets had events occur
else if(res == 0){
printf("Still waiting for events...\n");
}
// things happened
else{
if(pollfds[0].revents & POLLIN){
service0_activate();
}
if(pollfds[1].revents & POLLIN){
service1_activate();
}
if(pollfds[2].revents & POLLIN){
service2_activate();
}
}
}
}
poll和ppoll的关系就如同select和pselect一样。同pselect一样,ppoll允许应用safely wait直到其中一个文件描述符准备好了或者有信号被捕获。
#define _GNU_SOURCE /* See feature_test_macros(7) */
#include <poll.h>
int ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *_Nullable tmo_p,
const sigset_t *_Nullable sigmask);
epoll()
是一个 Linux-specific API,功能和poll()
差不多,都通过监视一组文件描述符来看 I/O 是否可用。但 epoll API 和 poll 却有本质上的区别。epoll API 提供了边沿触发和电平触发两种使用接口。在高并发情境下,epoll API 的性能要好得多。
对于 epoll API 而言,最重要的就是 epoll instance 。epoll instance 是一个内核数据结构。在用户程序眼中,他就是一个包含两个列表的容器。这两个列表是 interest list(兴趣列表) 和 ready list(就绪列表)。从名字中你就能大致知道这两个列表的作用。
Interest list 也叫 epoll set,是一个包含一些文件描述符的集合。在 interest list 中的文件描述符是用户进程所感兴趣的一些监视项。通过将这些感兴趣的文件描述符添加到 interest list 中,我们就能用 epoll API 让操作系统帮我们监视这些文件描述符的状态。(红黑树)
Ready list 是第二个列表。这个列表包含了那些准备好了进行 I/O 操作的文件描述符,ready list 是 interest list 的一个子集。一旦 interest list 中的文件描述符有 I/O 活动,内核就会动态地把相应的文件描述符添加到 ready list 中。(双向链表)
为什么 epoll 这么快呢?我们先从 epoll instance 谈起。 epoll instance 中维护两个 "list" ,其中,interest list 是我们想让 epoll 帮我们监视的文件描述符集合,这是一个红黑树。使得 epoll 在插入、删除、查找文件描述符的时间复杂度均为
此外,我们前面学习的 select 和 poll 模型需要不断地遍历所有监听的文件描述符(时间复杂度 epoll_wait
时直接返回就绪的文件描述符(时间复杂度
epoll 使用回调机制就是当某个文件描述符(如 socket)发生 I/O 事件(如可读、可写)时,内核会主动通知 epoll,而不是由 epoll 主动轮询检查所有 fd。和 select/poll 那样时不时地看一眼公交车有没有到相比,让司机通知你肯定更好一些。
现在,我们先了解了解用来创建和管理 epoll instances 的系统调用。
epoll_create
and epoll_create1
要创建一个 epoll instance,我们有两种方式:epoll_create
和 epoll_create1
。epoll_create
会创建一个 epoll instance。在 Linux 2.6.8 之后,内核会动态地检查 epoll instance,所以这个系统调用参数 size
会被忽略,但为了兼容新版本,size
必须比 0 大。
它的系统调用原型如下:
#include <sys/epoll.h>
int epoll_create(int size);
/* Parameters:
1. size: initial size (ignored in modern implementations, but must be > 0)
Returns a file descriptor for the new epoll instance, or -1 on error.
*/
在 Linux 2.6.27 后,内核提供了另一种创建 epoll instance 的系统调用。我们不再需要担心 size
是否大于 0 的问题了,而且还引入了 flags 参数。flags 可以是 0 或 EPOLL_CLOEXEC
,用于在执行 exec
系列函数时自动关闭 epoll 文件描述符。
#include <sys/epoll.h>
int epoll_create1(int flags); // create a new epoll instance with flags
/* Parameters:
1. flags: epoll instance flags (0 or EPOLL_CLOEXEC)
Returns a file descriptor for the new epoll instance, or -1 on error.
*/
当不再需要这些文件描述符后,记得用 close()
系统调用关闭掉相关的文件描述符。epoll 使用引用计数来管理 instance 的生命周期,当 epoll instance 中的所有文件描述符都关闭后,内核就会销毁 instance 并释放相关联的资源。
在 epoll instance 创建好之后,我们就可以通过创建返回的 epfd
往 interest list 中添加、修改或删除 list entries(也就是文件描述符和 event
字段中特化的事件)。
以下是 epoll_ctl
系统调用的原型:
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
/* Parameters:
1. epfd: file descriptor returned by epoll_create or epoll_create1
2. op: operation to be performed
- EPOLL_CTL_ADD: Add an entry to the interest list of the epfd.
- EPOLL_CTL_MOD: Change the settings associated with fd in the interest list
to the new settings specified in event.
- EPOLL_CTL_DEL: Remove the target file descriptor fd from the interest list.
3. fd: file descriptor to be added, modified, or removed
4. event: pointer to epoll_event structure (can be NULL for EPOLL_CTL_DEL)
Returns 0 on success, or -1 on error.
*/
除了我们刚才提到的 epfd
参数外,我们还有三个参数,分别是 op
, fd
, 和一个指针指向结构体 epoll_event
。最开始的时候,epoll instance 为空,我们就需要在 op
上填入 EPOLL_CTL_ADD
来向 interest list 中添加相关的文件描述符,同时,在 epoll_event
结构体中选择让内核按何种方式来监视这些文件描述符。
op
字段提供将某个文件描述符加入到 interest list 中进行监视、fd
字段指示监视项、event
主要是一些监视选项,它是 epoll 机制中用于事件监视和传递用户数据的核心结构体。 struct epoll_event
的原型如下:
#include <sys/epoll.h>
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
/*
events/revents:
- EPOLLIN: Look for if there is data to read.
- EPOLLOUT: Writing is now possible without blocking.
- EPOLLRDHUP: Stream socket peer closed connection, or shut down writing half of connection.
- EPOLLPRI: There is urgent data to read.
revent specific:
- EPOLLERR: Error condition happened.
- EPOLLHUP: Hang up, otherside closed socket.
- EPOLLNVAL: Invalid request: fd not open.
additional epoll-specific flags:
- EPOLLET: Requests edge-triggered notification.
- EPOLLONESHOT: Requests one-shot notification.
- EPOLLWAKEUP: Prevent system suspend while event is pending.
- EPOLLEXCLUSIVE: Sets exclusive wakeup mode.
*/
union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
};
这里的联合体 epoll_data
是用户的自定义数据,当事件触发时,内核就会将此数据返回给应用。
在上面,我们了解到用户感兴趣的 instances 会被添加到兴趣列表中提供给 epoll 来监视,如果有事件发生,epoll 会将可用的文件描述符放到 ready list 中。epoll API 给我们提供了下面的函数来监视 I/O 事件发生:epoll_wait
, epoll_pwait
和 epoll_pwait2
。下面是这三个函数的原型:
#include <sys/epoll.h>
/* Waits for events on an epoll file descriptor. */
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
/*
Parameters:
1. epfd: The epoll file descriptor obtained from epoll_create or epoll_create1.
2. events: Pointer to an array of epoll_event structures where the ready events will be stored.
3. maxevents: The maximum number of events that can be returned (must be > 0).
4. timeout: Timeout in milliseconds:
- -1: Block indefinitely until an event occurs.
- 0: Return immediately.
- > 0: Maximum wait time in milliseconds.
Return value:
- Returns the number of ready file descriptors.
- On failure, returns -1 and sets errno.
*/
/* Waits for events with the ability to temporarily block signals. */
int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout,
const sigset_t *_Nullable sigmask);
/*
Parameters:
1. epfd, events, maxevents, timeout: Same as epoll_wait.
2. sigmask: Signal mask to be temporarily applied during the wait. Can be NULL to behave like epoll_wait.
Return value:
- Same as epoll_wait.
*/
/* Waits for events with nanosecond-resolution timeout and signal mask support. */
int epoll_pwait2(int epfd, struct epoll_event *events, int maxevents,
const struct timespec *_Nullable timeout,
const sigset_t *_Nullable sigmask);
/*
Parameters:
1. epfd, events, maxevents: Same as epoll_wait.
2. timeout: High-resolution timeout using timespec structure (supports nanosecond precision). NULL indicates indefinite blocking.
3. sigmask: Signal mask to be temporarily applied during the wait. Can be NULL to behave like epoll_wait.
Return value:
- Same as epoll_wait.
*/
我们以 epoll_wait
为例,解释一下每个字段的含义:
epfd
:这是需要被监视的 epoll instance 。events
:这是用户分配的 epoll_event
数组指针,用于接收就绪事件。maxevents
:events
数组的最大容量(必须大于0)timeout
:超时时间。epoll_wait
就会阻塞这么长时间后返回(毫秒)。你可能会好奇,epoll instance 中明明有一个 ready list,为什么还需要人为设置一个 events
数组?这时因为 ready list 是一个内核中的数据结构,我们在用户态是访问不到的,所以我们要提供一个存放文件描述符的数组,让内核在事件发生后将 ready list 中的 fd 拷贝到 events
数组中。拷贝完成后, ready list 将会被清空。
epoll 有两种触发方式:水平触发 (LT) 和边沿触发 (ET) 。水平触发是一种状态驱动的模式,也就是说只要文件描述符满足某种状态条件,epoll_wait
就会持续的返回该 fd 的时间。这就相当于水位报警器,只要水位高于警戒线,蜂鸣器就会一种报警。默认情况下 epoll 都是水平触发的。
而边沿触发是事件触发的模式,只在fd 的状态发生变化时触发事件通知(无数据到有数据)。比如缓冲区中有 500MB 的数据,我们只读取 250MB,剩余 250MB。但这时你再用 epoll_wait(...)
就会阻塞,直到有新数据到达。
第一节课的时候,我们学习了三个用于服务器上的异步I/O处理方式。本节课,我们来了解一些如何通过Client URL来处理网络客户端上的异步I/O。
cURL 是对 socket 接口的封装,它是一个用户程序,可能需要你自己下载安装相关的开发包。随后,通过包含头文件 curl/curl.h
并链接 -lcurl
,你就能在你的程序中使用libcurl
库来请求HTTP服务、下载文件和一些其他的网络服务。
由于大多数的资源在服务器的手中,客户端想要某些资源就需要向服务器发送响应的请求。同样,当你使用 cURL 进行请求时,默认的 cURL API(curl_easy_perform()
)会阻塞我们的程序。而通过上节课,我们知道,这种阻塞并不是我们想要的。我们想不浪费资源的同时提高程序的效率。
幸运的是,libcurl
库中有能够让我们同时handle多个异步IO的API。curl_multi
允许客户端同时进行多个网络操作请求,而且不会阻塞用户程序。但再此之前,我们先来看看默认的 cURL API。
在使用这些libcurl
的库函数之前,我们需要调用curl_global_init()
进行全局初始化,并在程序结束时调用curl_global_cleanup()
进行全局清理。
下面是它们的函数原型:
#include <curl/curl.h>
CURLcode curl_global_init(long flags);
/* Initializes the cURL library globally.
Parameters:
1. flags: Bitmask of options to initialize. Commonly used values include:
- CURL_GLOBAL_DEFAULT: Initialize all supported features (equivalent to CURL_GLOBAL_SSL | CURL_GLOBAL_WIN32).
- CURL_GLOBAL_SSL: Initialize SSL.
- CURL_GLOBAL_WIN32: Initialize Windows-specific features.
- CURL_GLOBAL_ALL: Equivalent to CURL_GLOBAL_DEFAULT.
- CURL_GLOBAL_NOTHING: Initialize no features.
Return value:
- On success, returns CURLE_OK.
- On failure, returns a CURLcode error value. Those error values include:
- CURLE_UNSUPPORTED_PROTOCOL
- CURLE_FAILED_INIT
- CURLE_URL_MALFORMAT
- CURLE_COULDNT_RESOLVE_HOST
- CURLE_COULDNT_CONNECT
- CURLE_OPERATION_TIMEDOUT
- CURLE_SSL_CONNECT_ERROR
- CURLE_PEER_FAILED_VERIFICATION
*/
curl_global_init(long flags);
为我们提供了一些可选项。一般我们会使用在flags字段中填入CURL_GLOBAL_DEFAULT
,表示初始化所有cURL支持的功能,它和CURL_GLOBAL_ALL
是等价的。在初始化完成之后,用户会得到CURLcode
类型的返回值,用于反馈操作结果。
在程序结束或我们不再需要cURL库的服务时,我们就可以使用curl_global_cleanup()
全局清理cURL库。这个函数不需要任何参数,也不会有返回值。
void curl_global_cleanup(void);
/* Cleans up the cURL library globally.
Parameters: None.
Return value: None.
*/
libcurl
库为我们提供了一些库函数用于与cURL进行交互,这些函数使得我们可以方便地进行网络请求和数据传输。通过使用这些库函数,我们可以实现HTTP、HTTPS、FTP等多种协议的支持。在cURL Easy API Interface中,我们会使用cURL easy handle来获取cURL的服务。
在全局初始化完成之后,我们就可以获取cURL handle并设置相应的选项来获取cURL的服务。其中我们用curl_easy_init()
初始化并获取一个cURL easy handle,用curl_easy_setopt()
设置handle的属性。
下面是这两个函数的原型:
CURL *curl_easy_init(void);
/* Initializes a CURL easy handle.
Return value:
- On success, returns a pointer to a CURL easy handle.
- On failure, returns NULL.
*/
curl_easy_init()
并不需要什么参数,如果初始化成功完成,它会返回一个指向初始化handle的指针,如果失败,它会返回NULL
。
CURLcode curl_easy_setopt(CURL *handle, CURLoption option, ...);
/* Sets options for a CURL easy handle.
Parameters:
1. handle: The CURL easy handle.
2. option: The option to set. Common options include:
- CURLOPT_URL: The URL to fetch.
- CURLOPT_POSTFIELDS: The data to send in a POST request.
- CURLOPT_HTTPHEADER: A linked list of HTTP headers to include in the request.
- CURLOPT_WRITEFUNCTION: A callback function to handle data received from the server.
- CURLOPT_WRITEDATA: A pointer to pass to the write callback function.
- CURLOPT_READFUNCTION: A callback function to handle data sent to the server.
- CURLOPT_READDATA: A pointer to pass to the read callback function.
- CURLOPT_TIMEOUT: The maximum time in seconds that the request is allowed to take.
3. ...: The value to set for the option, depends on the option being set.
Return value:
- On success, returns CURLE_OK.
- On failure, returns a CURLcode error value.
*/
curl_easy_setopt()
要复杂的多。这个库函数会根据不同的设置项来配置 cURL easy handle 的行为。而且每个选项后面的参数类型会根据不同的选项而有所不同。
所有的所有都完成之后,我们就可以使用curl_easy_perform()
来执行请求了。
CURLcode curl_easy_perform(CURL *handle);
/* Performs the file transfer.
Parameters:
1. handle: The CURL easy handle.
Return value:
- On success, returns CURLE_OK.
- On failure, returns a CURLcode error value.
*/
完成请求后,我们使用curl_easy_cleanup()
来清理handle。
void curl_easy_cleanup(CURL *handle);
/* Cleans up a CURL easy handle.
Parameters:
1. handle: The CURL easy handle to clean up.
Return value: None.
*/
在上面,我们看到好几个函数都会返回的CURLcode
用于指示函数是否正常运行。我们可以通过库函数curl_easy_strerror()
来查看具体的错误信息。这个函数会返回一个描述错误的字符串,帮助我们理解问题之所在。
以下是 curl_easy_strerror()
的函数原型:
const char *curl_easy_strerror(CURLcode errornum);
/* Returns a string describing the CURLcode error.
Parameters:
1. errornum: The CURLcode error value.
Return value:
- Returns a pointer to a null-terminated string describing the error.
*/
在下面的例子中,我们通过设置 curl_easy_setopt(curl, CURLOPT_URL, "https://congzhi.wiki/");
来请求网站的 index.html
。
#include <stdio.h>
#include <curl/curl.h>
int main(int argc, char** argv){
CURL *curl;
CURLcode res;
curl_global_init(CURL_GLOBAL_DEFAULT);
curl = curl_easy_init();
if(curl){
curl_easy_setopt(curl, CURLOPT_URL, "https://congzhi.wiki/");
res = curl_easy_perform(curl);
}
if(res != CURLE_OK){
fprintf(stderr, "curl_easy_perform() failed: %s\n",
curl_easy_strerror(res));
curl_easy_cleanup(curl);
}
curl_easy_cleanup(curl);
curl_global_cleanup();
return 0;
}
cURL easy 什么都好,但和其他请求 IO 的函数一样,res = curl_easy_perform(curl);
这一步会阻塞程序,这就是 easy perform 最大的缺点。
为了确保一个线程能够操作多个异步I/O,libcurl
库提供了 cURL multi API 。cURL multi 用于管理一组easy handles,通过将多个easy handles放到一个队列中,并检查它们的状态来实现异步I/O。我们将包含多个easy handles队列的这样一个结构叫做multi handle。
因为 cURL multi 是在 cURL easy 的基础上建立起来的,所以在使用cURL multi时,我们仍然需要进行cURL全局上的初始化、清理等。此外,由于我们引入了新的结构,我们需要一个新的类型来表示multi handle。在libcurl
中,这个结构叫CURLM
。
curl_multi
内部维护了两个队列,分别是正在进行的传输队列和已经完成的传输队列。当curl multi handle初始化完成后,两个队列也随之初始化。
此外,curl_multi_init
还会初始化内部状态和数据结构、分配必要的资源和内存等。
函数原型如下:
#include <curl/multi.h> // Included <curl/curl.h>
CURLM *curl_multi_init(void);
/* Initializes a CURL multi handle.
Return value:
- On success, returns a pointer to a CURL multi handle.
- On failure, returns NULL.
*/
我们有了新的结构后,我们可以往里面添加任意数量的 easy handle 。我们所添加的 easy handles 就会被放到正在进行的传输队列中。
CURLMcode curl_multi_add_handle(CURLM *multi_handle, CURL *easy_handle);
/* Adds a CURL easy handle to a CURL multi handle.
Parameters:
1. multi_handle: The CURL multi handle.
2. easy_handle: The CURL easy handle to add.
Return value:
- On success, returns CURLM_OK.
- On failure, returns a CURLMcode error value.
*/
在添加完需要添加的 easy handles 之后,我们就可以使用 curl_multi_perform()
来启动多个并发的网络请求(第一次使用),这个函数会立即返回正在进行传输队列中的 easy handles 的数量,所以这些请求不会阻塞当前程序。
其函数原型如下:
CURLMcode curl_multi_perform(CURLM *multi_handle, int *still_running_handles);
/* Performs the transfers for all added handles.
Parameters:
1. multi_handle: The CURL multi handle.
2. still_running_handles: Pointer to an integer that will be set to the number of running handles.
Return value:
- On success, returns CURLM_OK.
- On failure, returns a CURLMcode error value.
*/
启用后,当我们使用curl_multi_perform()
时,它会更新参数still_running_handles
用于指示multi handle中仍在进行传输的easy handles的数量。当这个参数指向的数字变成0
,你就知道IO全部完成了。而这就意味着我们需要多次调用curl_multi_perform()
,这是否意味着我们在轮询呢?
尽管我们实现了在一个线程中处理多个IO,我们仍不希望无意义的轮询占用太多CPU时间。为了解决这个问题,libcurl
提供了 curl_multi_wait()
函数,它可以在有数据可读或可写之前阻塞一段时间,让紧轮询变成松轮询。
下面是它的函数原型:
CURLMcode curl_multi_wait(CURLM *multi_handle, struct curl_waitfd extra_fds[],
unsigned int extra_nfds, int timeout_ms, int *numfds);
/* Waits for activity on any of the curl easy handles within a multi handle.
Parameters:
1. multi_handle: The CURL multi handle.
2. extra_fds: An array of extra fds to wait on.(NULL)
3. extra_nfds: The number of extra file descriptors.(0)
4. timeout_ms: The maximum time to wait in milliseconds.
5. numfds: Pointer to an integer number of "interesting" events occurred.
Return value:
- On success, returns CURLM_OK.
- On failure, returns a CURLMcode error value.
*/
同样的,我们可以用curl_mulri_strerror()
来查看发生了什么错误。
const char *curl_multi_strerror(CURLMcode errornum);
/* Returns a string describing the CURLMcode error.
Parameters:
1. errornum: The CURLMcode error value.
Return value:
- Returns a pointer to a null-terminated string describing the error.
*/
当加入cURL multi的cURL easy handle完成传输任务后,curl multi就会将其添加到一个已完成传输的队列中。之后我们就可以通过curl_multi_info_read()
读取easy handle的状态了。
curl_multi_info_read()
的函数原型如下:
CURLMsg *curl_multi_info_read(CURLM *multi_handle, int *msgs_left);
/* Reads information about completed transfers.
Parameters:
1. multi_handle: The CURL multi handle.
2. msgs_left: Pointer to an integer that will be set to the number of messages left in the queue.
Return value:
- On success, returns a pointer to a CURLMsg structure.
- On failure, returns NULL.
*/
当读取成功时,curl_multi_info_read()
会返回一个结构体并将所读取的curl easy从完成传输队列中移除。CURLMsg
是一个结构体,用于提供有关传输完成的详细信息。它的原型如下:
typedef struct {
CURLMSG msg; /* What this message means */
CURL *easy_handle; /* The handle it concerns */
union {
void *whatever; /* Message-specific data */
CURLcode result; /* Return code for transfer */
} data;
} CURLMsg;
这个结构体的第一个字段CURLMSG
表示消息的类型。这是一个枚举类型,可能的值有CURLMSG_DONE
表示传输完成和CURLMSG_ERR
表示有发生错误。
第二个字段表示与当前消息相关联的easy handle。
第三个字段是一个联合体data
,用于存储不同的类型,其中CURLcode result;
表示curl easy传输的结果,我们前面见过。
当一个easy handle完成后,我们就可以将它从multi handle中移除出去了。我们可以通过msg->easy_handle
在结构体CURLMsg
中获取到确切的easy_handle。然后通过相关的库函数将其移除并进行清理。(curl_easy_cleanup(CURL* eh)
)
curl_multi_remove_handle()
的函数原型如下:
CURLMcode curl_multi_remove_handle(CURLM *multi_handle, CURL *easy_handle);
/* Removes an easy handle from a multi handle.
Parameters:
1. multi_handle: The CURL multi handle.
2. easy_handle: The CURL easy handle to remove.
Return value:
- On success, returns CURLM_OK.
- On failure, returns a CURLMcode error value, such as:
- CURLM_BAD_HANDLE: The provided multi handle is invalid.
- CURLM_BAD_EASY_HANDLE: The provided easy handle is invalid.
- CURLM_OUT_OF_MEMORY: Memory allocation failed.
*/
当我们不再需要curl multi时,就可以通过curl_multi_cleanup()
将其清理掉,其函数原型如下:
CURLMcode curl_multi_cleanup(CURLM *multi_handle);
/* Cleans up a CURL multi handle.
Parameters:
1. multi_handle: The CURL multi handle to clean up.
Return value:
- On success, returns CURLM_OK.
- On failure, returns a CURLMcode error value.
*/
Reuse the curl easy handles all the time.
在学习异步 I/O 的过程中,我们希望 I/O 操作能够避免阻塞当前线程。然而,通过之前的学习,我们发现,尽管文件 fd 的读取涉及磁盘 I/O,与 socket 或 pipe 这类 fd 操作不同,文件操作通常能够确保数据始终可读。因此,即使将文件的 fd 设置为 O_NONBLOCK
,它实际上仍然会导致阻塞。
我们想要的是让文件加载到内存中之后再读,而不是阻塞硬控线程 5ms,该怎么办?POSIX AIO 闪亮登场!在发起文件 I/O 请求后,AIO 可以让线程立即返回,不被阻塞。并且当 I/O 操作完成时,通过回调或通知机制处理数据。完美!
AIO 会创建一个 aiocb
(AIO Control Block) 控制块,带有可选回调,这是一个操作描述符,管理 AIO 的具体操作(告诉操作系统你想要干什么)。
#include <aio.h>
struct aiocb {
int aio_fildes; /* File descriptor */
off_t aio_offset; /* File offset */
volatile void *aio_buf; /* Buffer for data */
size_t aio_nbytes; /* Number of bytes for I/O */
int aio_reqprio; /* Request priority */
struct sigevent aio_sigevent; /* Signal or function to call when I/O completes */
int aio_lio_opcode; /* Operation to be performed (LIO_READ, LIO_WRITE, etc.) */
};
#include <signal.h>
struct sigevent {
int sigev_notify; /* Notification type */
int sigev_signo; /* Signal number (for signal-based notification) */
union sigval sigev_value; /* Data passed with notification */
void (*sigev_notify_function)(union sigval); /* Callback function (if applicable) */
void *sigev_notify_attributes; /* Attributes for the thread (if using thread notification) */
};
union sigval {
int sival_int;
void* sival_ptr;
};
io_uring