当 OS 平台支持异步操作时,一种高效而方便的实现高性能 Web 服务器的方法是使用前摄式事件分派。使用前摄式事件分派模型设计的 Web 服务器通过一或多个线程控制来处理异步操作的完成。这样,通过集成完成事件多路分离(completion event demultiplexing)和事件处理器分派,前摄器模式简化了异步的 Web 服务器。
异步的 Web 服务器将这样来利用前摄器模式:首先让 Web 服务器向 OS 发出异步操作,并将回调方法登记到 Completion Dispatcher(完成分派器),后者将在操作完成时通知 Web 服务器。于是 OS 代表 Web 服务器执行操作,并随即在一个周知的地方将结果排队。Completion Dispatcher 负责使完成通知出队,并执行适当的、含有应用特有的 Web 服务器代码的回调。
使用前摄器模式的主要优点是可以启动多个并发操作,并可并行运行,而不要求应用必须拥有多个线程。操作被应用异步地启动,它们在 OS 的 I/O 子系统中运行直到完成。发起操作的线程现在可以服务 另外的请求了。
在ACE中,可以通过ACE_Proactor实现前摄器模式。实现方式如下。
1。创建服务处理器:
Proactor框架中服务处理器均派生自ACE_Service_Handler,它和Reactor框架的事件处理器非常类似。当发生IO操作完成事件时,会触发相应的事件完成会调函数。
2。实现服务处理器IO操作
Proactor框架中所有的IO操作都由相应的异步操作类来完成,这些异步操作类都继承自ACE_Asynch_Operation。常用的有以下几种。
-
ACE_Asynch_Read_Stream, 提供从TCP/IP socket连接中进行异步读操作.
-
ACE_Asynch_Write_Stream, 提供从TCP/IP socket连接中进行异步写操作.
使用这些操作类的一般方式如下:
- 初始化
将相关的操作注册到服务处理器中,一般可通过调用其open方法实现。
- 发出IO操作
发出异步IO操作请求,该操作不会阻塞,具体的IO操作过程由操作系统异步完成。
- IO操作完成回调处理
异步IO操作完成后,OS会触发服务处理器中的相应回调函数,可通过该函数的ACE_Asynch_Result参数获取相应的返回值。
3。使用连接器或接受器和远端进行连接
ACE为Proactor框架提供了两个工厂类来建立TCP/IP连接。
-
ACE_Asynch_Acceptor, 用于被动地建立连接
-
ACE_Asynch_Connector 用于主动地建立连接
当远端连接建立时,连接器或接受器便会创建相应的服务处理器,从而可以实现服务处理。
4。启动Proactor事件分发处理
启动事件分发处理只需如下调用:
while(true)
ACE_Proactor::instance ()->handle_events ();
5。程序示例
服务器端:
服务器端简单的实现了一个EchoServer,流程如下:
当客户端建立连接时,首先发出一个异步读的异步请求,当读完成时,将所读的数据打印出来,并发出一个新的异步请求。
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
class HA_Proactive_Service : public ACE_Service_Handler
{
public:
~HA_Proactive_Service ()
{
if (this->handle () != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this->handle ());
}
virtual void open (ACE_HANDLE h, ACE_Message_Block&)
{
this->handle (h);
if (this->reader_.open (*this) != 0 )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("HA_Proactive_Service open")));
delete this;
return;
}
ACE_Message_Block *mb = new ACE_Message_Block(buffer,1024);
if (this->reader_.read (*mb, mb->space ()) != 0)
{
ACE_OS::printf("Begin read fail\n");
delete this;
return;
}
return;
}
//异步读完成后会调用此函数
virtual void handle_read_stream
(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == 0)
{
mb.release ();
delete this;
return;
}
mb.copy("");//为字符串添加结束标记'\0'
ACE_OS::printf("rev:\t%s\n",mb.rd_ptr());
mb.release();
ACE_Message_Block *nmb = new ACE_Message_Block(buffer,1024);
if (this->reader_.read (*nmb, nmb->space ()) != 0)
return;
}
private:
ACE_Asynch_Read_Stream reader_;
char buffer[1024];
};
int main(int argc, char *argv[])
{
int port=3000;
ACE_Asynch_Acceptor<HA_Proactive_Service> acceptor;
if (acceptor.open (ACE_INET_Addr (port)) == -1)
return -1;
while(true)
ACE_Proactor::instance ()->handle_events ();
return 0;
}
客户端:
客户端代码比较简单,就是每隔1秒钟将当前的系统时间转换为字符串形式通过异步形式发送给服务器,发送完成后,释放时间字符的内存空间。
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
class HA_Proactive_Service : public ACE_Service_Handler
{
public:
~HA_Proactive_Service ()
{
if (this->handle () != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this->handle ());
}
virtual void open (ACE_HANDLE h, ACE_Message_Block&)
{
this->handle (h);
if (this->writer_.open (*this) != 0 )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("HA_Proactive_Service open")));
delete this;
return;
}
ACE_OS::printf("connceted");
for(int i=0;i<10;i++)//每隔秒中发送时间至服务器
{
ACE_OS::sleep(1);
time_t now = ACE_OS::gettimeofday().sec();
char *time = ctime(&now);//获取当前时间的字符串格式
ACE_Message_Block *mb = new ACE_Message_Block(100);
mb->copy(time);
if (this->writer_.write(*mb,mb->length()) !=0)
{
ACE_OS::printf("Begin read fail\n");
delete this;
return;
}
}
return;
}
//异步写完成后会调用此函数
virtual void handle_write_dgram
(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
mb.release();
return;
}
private:
ACE_Asynch_Write_Stream writer_;
};
int main(int argc, char *argv[])
{
ACE_INET_Addr addr(3000,"192.168.1.142");
HA_Proactive_Service *client = new HA_Proactive_Service();
ACE_Asynch_Connector<HA_Proactive_Service> connector;
connector.open();
if (connector.connect(addr) == -1)
return -1;
while(true)
ACE_Proactor::instance ()->handle_events ();
return 0;
}
相关推荐
ACE proactor 与 Reactor 模式的详解
使用ACE_Proactor进行TCP通信的示例代码。
本论文中介绍的前摄器(Proactor)模式描述怎样构造应用和系统,以有效地利用操作系统支持的异步机制。 当应用调用异步操作时,OS代表应用执行此操作。这使得应用可以让多个操作同时运行,而又不需要应用拥有相 应...
本人学习ACE 时候写的关于两种通信方式的实例代码,我把三个连写都写在了一个文件,根据需要注释掉其他的就行。 ACE用于开发通信程序非常方便,尤其相对于WinSock 来说,可以省很多代码,普通程序员也更容易理解
对ACE的Proactor通讯模式的全面封装,可以在完全不知道ACE的情况下进行异步通信
ACE_Proactor TCP协议通信示例代码
ACE的Proactor框架在Windows底层下是采用IOCP来实现的,这里采用IOCP模仿实现了ACE的Proactor框架,对于学习和研究ACE的Proactor框架很有帮助.
2. 注册EventHandler到InitiationDispatcher中,每个EventHandler包含对相应Handle的引用,从而建立Handle到
C++高性能并发编程之proactor模式应用实例,应用于网络编程领域,包含一下几个部分: 1 服务器端程序:一个server代码展示如何使用现代C++11编程高并发代码,一个Makefile文件 2 客户端程序:一个客户端代码展示如何...
很全面的ACE服务器,包括proactor、reactor模式
第 8 章 前摄器(Proactor):用于为异步事件多路分离和分派处理器的对象行为模式 第 9 章 接受器-连接器(Acceptor-Connector):用于连接和初始化通信服务的对象创建模式 第 10 章 服务配置器(Service ...
Proactor和Reactor模式_继续并发系统设计的扫盲[参照].pdf
《ace技术内幕:深入解析ace架构设计与实现原理》从构架模式、编程示例和源代码3个维度系统地对经典网络框架ace(adaptivemunicationenvironment)的架构设计和实现原理进行了深入分析,它能解决4个方面的问题:,...
reactor和proactor的区别
Proactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handlers for Asynchronous Events.pdf
基于ace的服务器架构,包含了reactor、proactor架构。
ACE技术论文集,ACE程序员教程,ACE应用实例,对于理解Proactor框架帮助甚大。
ACE在服务端开发中的应用,ACE_Proactor应用于异步I/O操作
引擎内部实现参考了 ACE 的种种概念,比如 proactor(预先操作,前摄器),task(任务),主动对象(Active Object),message queue(消息队列),lock(锁),guard(锁守护). 在内部需要缓存的发送和接收的数据会被放入内存池...