一、概述
非阻塞网络编程无疑成了高并发、高性能编程的代名词,但现实应用编程中并不是每种应用都需要采用非阻塞编程模式,因为这将大大增加编程的复杂性、开发周期以及出错率,所以我们写的绝大部分网络程序程序都是阻塞的,一般是一个进程一个网络连接或一个线程一个网络连接。即然非阻塞模式可以实现高并发网络连接,阻塞模式可以实现复杂的业务逻辑,那是否有办法将二者结合起来呢?答案是肯定的,其中在 acl_cpp 库中,ipc 目录下的模块就是为了满足这种需求而设计的。
二、接口设计
在 rpc.hpp 头文件中有两个类:rpc_service 和 rpc_request,其中 rpc_request 是一个纯虚类,用户需要继承此类并实现该类中规定的纯虚接口,从而实现自己的阻塞操作功能;rpc_service 是阻塞与非阻塞结合的粘合类,通过将 rpc_request 子类实例传递给 rpc_service 类实例的 rpc_fork 方法,实现将阻塞请求过程交给子线程处理的目的。
1、rpc_service 类
1.1、构造函数:
1 2 3 4 5 6 7 8 9
|
rpc_service(int nthread, bool win32_gui = false)
|
从参数 win32_gui 可以看出,acl_cpp 的阻塞/非阻塞结合模式不仅可以用在通常的网络编程中,同时还可以用在阻塞过程与 WINDOWS 界面消息相结合的方面,这无非是经常用 MFC 进行界面编程的福音(例如,用户 VC 写了一个界面程序—当然这个界面窗口是基于 WIN32 消息的,但如果想进行一些数据库操作或文件下载操作,对用户而言阻塞式方法是非常容易实现这两类需求的,但 WIN32 的界面过程不能堵在任何一个数据库操作或文件下载过程,原来 VC 程序员通常的做法也是创建单独的线程进行阻塞操作,然后通过给主窗口传递消息将操作结果通知至主线程,幸运的是 acl_cpp 的 rpc 相关类可以使这一过程更为方便快捷;再如,你写的一个网络服务器程序的主线程是非阻塞的,但其中你不得不调用别人提供的库以实现用户身份验证的功能,同时这个用于用户认证的库又恰恰是阻塞的—一般也是如此,固然,你也许可以费很周折实现这一过程,同样,acl_cpp 的 rpc 相关类可以帮你解决这类问题)。
1.2、将阻塞处理过程的对象交由子线程处理:
1 2 3 4 5 6 7 8
|
void rpc_fork(rpc_request* req);
|
通过此接口,可以将阻塞请求过程交给子线程处理,子线程处理完后再通知主线程。
2、rpc_request 类
在类 rpc_request 中有三个虚接口,用户子类必须实现其中的两个纯虚接口:rpc_run 和 rpc_onover,同时用户可以根据需要实现另一非纯虚接口:rpc_wakeup。是当用户调完 rpc_service::rpc_fork 且子线程接收到此请求任务后 rpc_request::rpc_run 方法会被调用(一定切记:rpc_fork 是在主线程中被调用的,而 rpc_run 是在子线程中被调用的);当 rpc_request::rpc_run 函数返回后,rpc_reuqest::rpc_onover 会在主线程中被调用以表示子线程已经处理完毕(同样需要严重注意:rpc_request::rpc_onover 方法又回到主线程中被调用),这样,通过这两个过程就实现了将阻塞过程放在子线程中处理,主线程的非阻塞过程(非阻塞网络事件或非阻塞的 WIN32 消息过程)异步地等待子线程完成任务。
1 2 3 4 5 6 7 8 9 10 11
|
virtual void rpc_run(void) = 0;
virtual void rpc_onover(void) = 0;
|
另外,在 rpc_request 类中还有一个非纯虚方法:rpc_wakeup,这是做什么用的呢?可以假设这种应用场景,在子线程中调用 rpc_request::rpc_run 方法内部过程中,用户如果需要通知主线程一些中间状态(比如,文件下载的进度)该怎么办?那就在 rpc_run 方法内先调用 rpc_request::rpc_signal 通知主线程子线程处理的中间状态,则在主线程中用户实现的 rpc_request::rpc_wakeup 虚接口就会被调用。下面是 rpc_signal 和 rpc_wakeup 的接口说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
void rpc_signal(void* ctx);
virtual void rpc_wakeup(void* ctx) { (void) ctx; }
|
紧接着这个应用场景,假设在子线程调用 rpc_run 的内部通过 rpc_signal 通知主线程的中间状态后,希望主线程能收到此通知消息并且希望得到主线程下一步希望执行的指令才会进一步继续执行。于是便有了 rpc_request::cond_wait 和 rpc_request::cond_signal 两个方法的产生,即子线程通过 cond_wait 阻塞地等待主线程的下一步操作指令,主线程则调用 cond_signal 通知子线程下步指令,下面是这两个方法的说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
bool cond_wait(int timeout = -1);
void rpc_signal(void* ctx);
|
三、示例
如果您能大体明白上面有关 rpc_service 和 rpc_request 类的功能说明,相信下面的例子您也一定能看明白:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
#include "stdafx.h" #include <assert.h> #include "lib_acl.hpp"
using namespace acl;
typedef enum { CTX_T_CONTENT_LENGTH, CTX_T_PARTIAL_LENGTH, CTX_T_END } ctx_t;
struct DOWN_CTX { ctx_t type; int length; };
static int __download_count = 0;
class http_download : public rpc_request { public: http_download(aio_handle& handle, const char* addr, const char* url) : handle_(handle) , addr_(addr) , url_(url) , error_(false) , total_read_(0) , content_length_(0) {} ~http_download() {} protected:
void rpc_run() { http_request req(addr_);
req.request_header().set_url(url_.c_str()) .set_content_type("text/html") .set_host(addr_.c_str()) .set_method(HTTP_METHOD_GET);
string header; req.request_header().build_request(header); printf("request: %s\r\n", header.c_str());
if (req.request(NULL, 0) == false) { printf("send request error\r\n"); error_ = false; return; }
http_client* conn = req.get_client(); assert(conn); DOWN_CTX* ctx = new DOWN_CTX; ctx->type = CTX_T_CONTENT_LENGTH;
ctx->length = (int) conn->body_length(); content_length_ = ctx->length;
rpc_signal(ctx);
char buf[8192]; while (true) { int ret = req.get_body(buf, sizeof(buf)); if (ret <= 0) { ctx = new DOWN_CTX; ctx->type = CTX_T_END; ctx->length = ret; rpc_signal(ctx); break; } ctx = new DOWN_CTX; ctx->type = CTX_T_PARTIAL_LENGTH; ctx->length = ret; rpc_signal(ctx); } }
void rpc_onover() { printf("%s: read over now, total read: %d, content-length: %d\r\n", addr_.c_str(), total_read_, content_length_);
__download_count--; if (__download_count == 0) handle_.stop(); }
void rpc_wakeup(void* ctx) { DOWN_CTX* down_ctx = (DOWN_CTX*) ctx; switch (down_ctx->type) { case CTX_T_CONTENT_LENGTH: printf("%s: content-length: %d\r\n", addr_.c_str(), down_ctx->length); break; case CTX_T_PARTIAL_LENGTH: total_read_ += down_ctx->length; printf("%s: partial-length: %d, total read: %d\r\n", addr_.c_str(), down_ctx->length, total_read_); break; case CTX_T_END: printf("%s: read over\r\n", addr_.c_str()); break; default: printf("%s: ERROR\r\n", addr_.c_str()); break; } delete down_ctx; } private: aio_handle& handle_; string addr_; string url_; bool error_; int total_read_; int content_length_; };
static void run(void) { aio_handle handle; rpc_service* service = new rpc_service(10);
if (service->open(&handle) == false) { printf("open service error: %s\r\n", last_serror()); return; }
http_download down1(handle, "www.sina.com.cn:80", "http://www.sina.com.cn/"); service->rpc_fork(&down1); __download_count++;
http_download down2(handle, "www.hexun.com:80", "/"); service->rpc_fork(&down2); __download_count++;
while (true) { if (handle.check() == false) break; }
delete service; handle.check(); }
int main(void) { #ifdef WIN32 acl_cpp_init(); #endif
run(); printf("Enter any key to continue\r\n"); getchar(); return 0; }
|
acl 下载:
github:https://github.com/acl-dev/acl
gitee:https://gitee.com/acl-dev/acl