开始基于事件编程
有了poll
和epoll
提供的事件通知机制,我们可以开始基于事件驱动编程了!基于事件驱动,我们可以构建出一个简单的网络编程框架,常见的设计模式包括Reactor模式。
Reactor模式
意会一下Reactor模式的名字,我们可以感受到这个设计模式的核心是“对事件作出反应”。例如,当有读事件(文件描述符可读)产生时,我们开始读取;有写事件产生时,开始写入。进程在没有事件发生时处于挂起状态,只有事件产生后才根据业务逻辑作出相应的反应,这就是事件驱动的Reactor模式,可以由下图表示:
单线程的Reactor模式1
图中的acceptor专门用于查询连接事件的产生,在有连接事件时创建新的连接,并通知dispatcher监听该连接上的读写事件。dispatcher负责检查并分发事件,实现时可以用poll
或epoll
查询事件产生。
小试牛刀
让我们来改写之前实现的echo_epoll_server
,实现一个基于epoll
的简单reactor。
回调函数
既然要对事件作出“反应”,我们就需要编写相应的函数,以便在被通知事件发生时及时处理,这样的函数成为回调函数(callback function)。对于我们的echo server,我们只需要处理两种事件即可:连接建立事件,以及读事件。在acceptor检测到有来自客户端的连接时,调用accept_callback
处理新连接。在检测到客户端向我们发送数据时,调用read_callback
,将数据读出并返回给客户端。
具体而言,accept_callback
要与epoll_ctl
交互,让epoll
帮我们检查新来的连接conn_fd
上是否有读写事件。read_callback
中可编写业务逻辑,当检测到连接关闭时,通知epoll
实例之后不要在检测这个连接上的事件了。
Channel
有了回调函数,我们可以通过epoll_wait
查询发生的事件,并调用相关回调函数作出“反应”。用户可以在epoll_event
的data
字段中存储fd
,获取事件对应的文件描述符。然而,文件描述符多种多样,可能是listen_fd
,也可能是conn_fd
,如何通过文件描述符,找到与之关联的数据处理函数,并在事件到来时进行处理?
只知道文件描述符的数值是不够的,我们还需要额外信息维护文件描述符的类型及其回调函数。因此,我们引入Channel
类,记录这些信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
struct Channel {
int fd;
function<void(int, int)> read_callback;
void on_event(uint32_t events, int efd) {
if ((events & EPOLLIN) && (read_callback != nullptr)) {
read_callback(fd, efd);
}
}
};
void read_callback(int conn_fd, int efd) {
TcpConnection conn{conn_fd};
string msg;
if (!(msg = conn.blocking_receive_line()).empty()) {
cout << "server received: " << msg;
conn.blocking_send(msg);
} else {
struct epoll_event event{};
epoll_ctl(efd, EPOLL_CTL_DEL, conn_fd, &event);
conn.close();
cout << "server closed conn_fd: " << conn_fd << endl;
}
}
void accept_callback(int listen_fd, int efd) {
TcpListener listener{listen_fd};
TcpConnection conn = listener.accept();
conn.set_nonblocking();
int conn_fd = conn.conn_fd();
struct epoll_event event{};
event.events = EPOLLIN | EPOLLET;
struct Channel *channel = new Channel{conn_fd, read_callback};
event.data.ptr = channel;
if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_fd, &event) == -1) {
cerr << "epoll_ctl add failed: " << strerror(errno) << endl;
exit(1);
}
}
由于data
可以存储指针,我们直接在里面存放Channel
结构体的指针,这样就可以在事件返回时重新找到事件对应的Channel
,然后用on_event
处理事件。当然,直接操作指针是危险的,上面的程序已经有内存泄漏了,体现在read_callback
中关闭连接时,并没有同时释放连接对应的Channel
。在这里我们只是用偷懒的方式演示一个简单的reactor模式的实现。
main
函数中的循环如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
const int MAX_EVENTS = 128;
int main() {
int efd;
struct epoll_event event{};
int n;
vector<struct epoll_event> events(MAX_EVENTS);
TcpListener listener;
listener.listen(8888, true);
efd = epoll_create1(EPOLL_CLOEXEC);
if (efd == -1) {
cerr << "epoll create failed: " << strerror(errno) << endl;
exit(1);
}
event.events = EPOLLIN | EPOLLET;
struct Channel *acceptor = new Channel{listener.listen_fd(), accept_callback};
event.data.ptr = acceptor;
if (epoll_ctl(efd, EPOLL_CTL_ADD, listener.listen_fd(), &event) == -1) {
cerr << "epoll_ctl add failed: " << strerror(errno) << endl;
exit(1);
}
while (true) {
n = epoll_wait(efd, events.data(), MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
if (events[i].events & (EPOLLERR | EPOLLHUP) ||
!(events[i].events & EPOLLIN)) {
cerr << "epoll error" << endl;
struct Channel *channel = (Channel *) events[i].data.ptr;
epoll_ctl(efd, EPOLL_CTL_DEL, channel->fd, &events[i]);
close(channel->fd);
delete channel;
} else {
if (events[i].events & EPOLLIN) {
cout << "epoll in\n";
}
struct Channel *channel = (Channel *) events[i].data.ptr;
channel->on_event(events[i].events, efd);
}
}
}
}
simple_epoll_reactor
与echo_epoll_server
的主要不同在于epoll_event
的data
字段存放的是Channel
指针,用Channel
将文件描述符与回调函数关联,用on_event
判断事件类型并选择相应的回调函数执行。在这个例子中,只有读事件的回调函数。之后更复杂的reactor实现中我们还会加入写事件的回调函数。
作为拓展,有兴趣的读者可以思考一下如何实现一个基于poll
的简单reactor。
Dispatcher
如果你已经思考了如何用poll
实现一个reactor,你会发现它的代码和我们用epoll
写的echo server很不一样。为了屏蔽poll
和epoll
的差别,我们可以封装一个Dispatcher
类,作为不同事件通知机制的统一交互接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct DispatcherEvent {
int fd{-1};
int revents{0};
};
class Dispatcher {
public:
virtual bool add(const Channel *channel) = 0;
virtual bool remove(const Channel *channel) = 0;
virtual bool update(const Channel *channel) = 0;
virtual std::vector<struct DispatcherEvent> dispatch() = 0;
const int DISPATCHER_MAX_EVENTS{128};
};
class EpollDispatcher : public Dispatcher { ... };
class PollDispatcher : public Dispatcher { ... };
Dispatcher
的操作即对事件的“增删改查”,其输入为Channel
类,从中获取要监听的文件描述符fd
以及事件(读/写);输出为DispatcherEvent
类。为了屏蔽poll
和epoll
的revents
中事件类型定义的不同,我们重新封装了下面的事件类型,作为DispatcherEvent
中revents
返回:
1
2
3
#define EVENT_ERR 0x01
#define EVENT_READ 0x02
#define EVENT_WRITE 0x04
这样一来,用户就可以根据需要,在实现时自行选择使用哪种dispatcher。
控制反转
在上文简单版Reactor实现中,我们只用了一个Channel
结构体同时负责listen_fd
和conn_fd
两种套接字,并用on_event
调用它们各自的回调函数。在框架实现时,Channel
所需要的状态可能更加复杂。例如,我们希望管理listen_fd
的Channel
同时保存相应的TcpListener
,而管理conn_fd
的Channel
能保存相应的TcpConnection
,并新增与连接事件有关的回调函数。
对于框架而言,它所知道的只是事件发生时,要作出“反应”,除此之外它都不知道。因此,框架只会傻傻地调用on_event
。我们需要在实现不同的Channel
时进一步细化on_event
的逻辑,这其实就是依赖反转的体现。
为了实现清晰,我们在Channel
的基础上新增AcceptorChannel
和ConnectionChannel
,管理各自更为复杂的状态和功能,并使用std::bind
将自己内部的函数与Channel
的回调函数绑定。
至此,我们可以明确这些类的不同职责。Channel
是我们的网络编程框架使用的基本类,在框架的实现中,我们规定了何时调用on_event
,并且规定了on_event
函数的固定流程,即判断传入的事件是否可读可写,并根据Channel
实例是否有读写相关的回调函数,对这些函数进行调用。而用户可以在Channel
基础上扩展,细化其要管理的状态以及回调函数的行为,以顾及基本框架照顾不到的细节。最终,系统在调用on_event
时,表现出用户扩展的Channel
的子类的行为,实现控制反转。
AcceptorChannel
我们在AcceptorChannel
内部管理一个TcpListener
,利用之前封装好的接口与网络套接字交互。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class AcceptorChannel : public Channel {
public:
AcceptorChannel() = delete;
explicit AcceptorChannel(int port) {
cout << "listening port: " << port << endl;
listener.listen(port, true);
_fd = listener.listen_fd();
_read_callback = std::bind(&AcceptorChannel::handle_connection_established, this);
}
private:
vector<ChannelOp> handle_connection_established() {
TcpConnection conn = listener.accept();
conn.set_nonblocking();
vector<ChannelOp> ret;
ret.push_back({CHANNEL_ADD, CHANNEL_CONN, conn.conn_fd()});
cout << "AcceptorChannel handle_connection_established: new conn fd=" << conn.conn_fd() << endl;
return ret;
}
TcpListener listener;
};
ConnectionChannel
在ConnectionChannel
中,我们加入与连接有关的回调函数connection_closed_callback
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class ConnectionChannel : public Channel {
public:
ConnectionChannel() = delete;
explicit ConnectionChannel(int fd) {
cout << "new connection with fd: " << fd << endl;
conn.set_fd(fd);
_fd = fd;
_read_callback = bind(&ConnectionChannel::handle_connection_read, this);
_write_callback = bind(&ConnectionChannel::handle_connection_write, this);
}
std::function<void(TcpConnection&)> message_callback{nullptr};
std::function<void(TcpConnection&)> write_completed_callback{nullptr};
std::function<void(TcpConnection&)> connection_closed_callback{nullptr};
private:
vector<ChannelOp> handle_connection_read() {
cout << "ConnectionChannel: handle_connection_read\n";
if (conn.buffer_receive() > 0) {
if (message_callback != nullptr) {
message_callback(conn);
}
} else {
conn.buffer_out.end_input();
return handle_connection_closed();
}
return {};
}
vector<ChannelOp> handle_connection_write() {
cout << "ConnectionChannel: handle_connection_write\n";
conn.buffer_send();
if (conn.buffer_in.eof()) {
if (write_completed_callback != nullptr) {
write_completed_callback(conn);
}
}
return {};
}
vector<ChannelOp> handle_connection_closed() {
cout << "ConnectionChannel: connection closed\n";
vector<ChannelOp> ret;
if (connection_closed_callback != nullptr) {
connection_closed_callback(conn);
}
ret.push_back({CHANNEL_REMOVE, CHANNEL_CONN, _fd});
return ret;
}
TcpConnection conn{-1};
};
在上文simple_epoll_reactor
的实现中,我们直接在Channel
的回调函数里与Dispatcher
交互,实现事件的增删。为了将Channel
与dispatcher解耦,我们让Channel
的on_event
返回ChannelOp
,用ChannelOp
表达这次处理完成后是否要向dispatcher中新增要监听的事件或是别的操作。在main
函数的循环中,对ChannelOp
进行处理。
示例:使用Dispatcher
在有了多层封装后,main
函数的循环进一步简化。我们新增了channel_map
,用于将DispatcherEvent
返回的文件描述符编号与具体的Channel
对应起来。此外,针对on_event
返回的ChannelOp
,也要在循环中处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void on_message(TcpConnection& conn) {
if (!conn.buffer_out.eof()) {
string msg = conn.buffer_read_line();
cout << "server received: " << msg;
conn.buffer_write(msg);
}
}
int main() {
unordered_map<int, unique_ptr<Channel>> channel_map;
unique_ptr<Dispatcher> dispatcher{new EpollDispatcher()};
unique_ptr<Channel> acceptor{new AcceptorChannel(8888)};
dispatcher->add(acceptor.get());
channel_map[acceptor->fd()] = move(acceptor);
while (true) {
vector<DispatcherEvent> events{dispatcher->dispatch()};
for (auto &event : events) {
if (!channel_map.count(event.fd)) {
continue;
}
Channel *channel = channel_map[event.fd].get();
vector<ChannelOp> channel_ops{channel->on_event(event.revents)};
for (auto &op : channel_ops) {
if (op.op == CHANNEL_ADD) {
if (op.channel_type == CHANNEL_CONN) {
ConnectionChannel *channel = new ConnectionChannel(op.fd);
channel->message_callback = on_message;
dispatcher->add(channel);
channel_map[op.fd] = unique_ptr<Channel>(channel);
cout << "new connection established: fd=" << op.fd << endl;
}
} else if (op.op == CHANNEL_REMOVE) {
Channel *channel = channel_map[op.fd].get();
dispatcher->remove(channel);
channel_map.erase(op.fd);
close(op.fd);
}
}
}
}
return 0;
}
总结
本文实现了一个简单的Reactor模式编程框架,基于事件驱动编程,可以获得较高的效率,和较好的可扩展性。我们引入了Channel
层抽象,用于管理不同的文件描述符和它们的事件处理逻辑之间的关系。随后,引入了Dispatcher
层抽象,用于隔离不同的事件通知机制之间的区别。为了将文件描述符与对应的Channel关联,我们引入了channel_map
。最后,我们介绍了控制反转是如何影响框架的具体行为的。通过引入新的抽象层,可以使代码在实现时更清晰简洁,更具有扩展性。
完整代码仓库
「网络编程101」系列全部代码可在我的GitHub代码仓库中查看:Captor: An easy-to-understand Reactor in C++
欢迎提出各类宝贵的修改意见和issues,指出其中的错误和不足!
最后,感谢你读到这里,希望我们都有所收获!