文章

反应堆模型

反应堆模型

Reactor 是一个基于epoll的高性能、低延迟、高并发的模型。

设计思路

  • Reactor模式核心:使用单线程事件循环处理所有I/O事件,通过epoll管理文件描述符。
  • 非阻塞IO:所有socket设置为非阻塞模式,配合epoll的边沿触发(ET)模式,确保高效事件处理。
  • 事件分发:每个文件描述符关联一个事件处理器(EventHandler),由Reactor统一管理事件注册和回调。
  • 资源管理:使用RAII确保资源自动释放,避免泄漏。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
#include <unordered_map>
#include <memory>
#include <stdexcept>
#include <iostream>
#include <vector>
#include <string>
#include <cerrno>

// 设置文件描述符为非阻塞模式
void set_non_blocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        throw std::runtime_error("fcntl F_GETFL failed");
    }
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
        throw std::runtime_error("fcntl F_SETFL failed");
    }
}

// 事件处理器接口
class EventHandler {
public:
    virtual ~EventHandler() = default;
    virtual void handle_event(uint32_t events) = 0;
};

class Reactor;

// Acceptor类,处理新连接
class Acceptor : public EventHandler {
public:
    Acceptor(Reactor& reactor, int port);
    ~Acceptor();
    void handle_event(uint32_t events) override;

private:
    Reactor& reactor_;
    int server_fd_;
};

// Connection类,处理客户端连接
class Connection : public EventHandler {
public:
    Connection(Reactor& reactor, int fd);
    ~Connection();
    void handle_event(uint32_t events) override;

private:
    Reactor& reactor_;
    int fd_;
    std::string output_buffer_;
    std::string input_buffer_;
};

// Reactor核心类
class Reactor {
public:
    Reactor();
    ~Reactor();

    void add_fd(int fd, uint32_t events, EventHandler* handler);
    void update_fd(int fd, uint32_t events);
    void del_fd(int fd);
    void run();
    void stop();

private:
    int epoll_fd_;
    std::unordered_map<int, EventHandler*> handlers_;
    bool running_ = true;

    friend class Acceptor;
    friend class Connection;
};

// Acceptor实现
Acceptor::Acceptor(Reactor& reactor, int port) : reactor_(reactor), server_fd_(-1) {
    server_fd_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (server_fd_ == -1) {
        throw std::runtime_error("socket creation failed");
    }

    int opt = 1;
    if (setsockopt(server_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
        close(server_fd_);
        throw std::runtime_error("setsockopt failed");
    }

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(port);

    if (bind(server_fd_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
        close(server_fd_);
        throw std::runtime_error("bind failed");
    }

    if (listen(server_fd_, SOMAXCONN) == -1) {
        close(server_fd_);
        throw std::runtime_error("listen failed");
    }

    reactor_.add_fd(server_fd_, EPOLLIN | EPOLLET, this);
}

Acceptor::~Acceptor() {
    if (server_fd_ != -1) {
        close(server_fd_);
    }
}

void Acceptor::handle_event(uint32_t events) {
    if (events & EPOLLIN) {
        while (true) {
            struct sockaddr_in client_addr;
            socklen_t len = sizeof(client_addr);
            int client_fd = accept(server_fd_, (struct sockaddr*)&client_addr, &len);
            if (client_fd == -1) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    break; // 所有新连接已处理
                } else {
                    std::cerr << "accept error: " << strerror(errno) << std::endl;
                    break;
                }
            }

            try {
                set_non_blocking(client_fd);
                reactor_.add_fd(client_fd, EPOLLIN | EPOLLET | EPOLLRDHUP, new Connection(reactor_, client_fd));
            } catch (const std::exception& e) {
                std::cerr << "Failed to add client: " << e.what() << std::endl;
                close(client_fd);
            }
        }
    }
}

// Connection实现
Connection::Connection(Reactor& reactor, int fd) : reactor_(reactor), fd_(fd) {}

Connection::~Connection() {
    if (fd_ != -1) {
        close(fd_);
    }
}

void Connection::handle_event(uint32_t events) {
    if (events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
        reactor_.del_fd(fd_);
        return;
    }

    if (events & EPOLLIN) {
        char buffer[4096];
        ssize_t nread;
        while ((nread = read(fd_, buffer, sizeof(buffer))) > 0) {
            input_buffer_.append(buffer, nread);
        }

        if (nread == -1) {
            if (errno != EAGAIN && errno != EWOULDBLOCK) {
                reactor_.del_fd(fd_);
                return;
            }
        } else if (nread == 0) {
            reactor_.del_fd(fd_);
            return;
        }

        // 业务逻辑:将输入直接作为输出返回
        output_buffer_ = input_buffer_;
        input_buffer_.clear();
        reactor_.update_fd(fd_, EPOLLOUT | EPOLLET);
    }

    if (events & EPOLLOUT) {
        size_t total = output_buffer_.size();
        ssize_t nwritten = write(fd_, output_buffer_.data(), total);
        if (nwritten == -1) {
            if (errno != EAGAIN) {
                reactor_.del_fd(fd_);
            }
            return;
        }
        output_buffer_.erase(0, nwritten);
        if (output_buffer_.empty()) {
            reactor_.update_fd(fd_, EPOLLIN | EPOLLET);
        }
    }
}

// Reactor实现
Reactor::Reactor() {
    epoll_fd_ = epoll_create1(0);
    if (epoll_fd_ == -1) {
        throw std::runtime_error("epoll_create1 failed");
    }
}

Reactor::~Reactor() {
    stop();
    for (auto& pair : handlers_) {
        close(pair.first);
        delete pair.second;
    }
    close(epoll_fd_);
}

void Reactor::add_fd(int fd, uint32_t events, EventHandler* handler) {
    epoll_event ev;
    ev.events = events;
    ev.data.fd = fd;
    if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) == -1) {
        delete handler;
        throw std::runtime_error("epoll_ctl add failed: " + std::string(strerror(errno)));
    }
    handlers_[fd] = handler;
}

void Reactor::update_fd(int fd, uint32_t events) {
    epoll_event ev;
    ev.events = events;
    ev.data.fd = fd;
    if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev) == -1) {
        throw std::runtime_error("epoll_ctl mod failed: " + std::string(strerror(errno)));
    }
}

void Reactor::del_fd(int fd) {
    auto it = handlers_.find(fd);
    if (it == handlers_.end()) return;

    delete it->second;
    handlers_.erase(it);
    epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
    close(fd);
}

void Reactor::run() {
    const int MAX_EVENTS = 64;
    epoll_event events[MAX_EVENTS];

    while (running_) {
        int nfds = epoll_wait(epoll_fd_, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            if (errno == EINTR) continue;
            throw std::runtime_error("epoll_wait failed");
        }

        for (int i = 0; i < nfds; ++i) {
            int fd = events[i].data.fd;
            auto it = handlers_.find(fd);
            if (it != handlers_.end()) {
                it->second->handle_event(events[i].events);
            }
        }
    }
}

void Reactor::stop() {
    running_ = false;
}

int main() {
    try {
        Reactor reactor;
        Acceptor acceptor(reactor, 8080);
        reactor.run();
    } catch (const std::exception& e) {
        std::cerr << "Error: " << e.what() << std::endl;
        return 1;
    }
    return 0;
}
本文由作者按照 CC BY 4.0 进行授权

© xiongyi. 保留部分权利。

|

岂堪久蔽苍苍色,须放三光照九州