技术答疑.消息队列.DotNetMQ

栏目:云星空知识作者:金蝶来源:金蝶云社区发布:2024-09-23浏览:1

技术答疑.消息队列.DotNetMQ

【DotNetMQ】

基于.NET的开源消息队列框架。

https://www.codeproject.com/Articles/193611/DotNetMQ-A-Complete-Message-Queue-System-for-NET

https://github.com/hikalkan/dotnetmq



【工作原理示意图】



【工作原理介绍】

DotNetMQ提供了3种角色:

客户端(MDSClient):负责客户端的消息发送和接收。

服务端(MDSServer:负责服务端的Socket连接管理,消息的接收、发送、分发。

管理端(MDSController:分别提供了客户端控制器和服务端控制器。客户端控制器可以实时获取服务端的相关运行数据,也可以发送控制指令给服务端。服务端控制器负责接收控制指令,执行控制指令。



MDSServer

功能说明:DotNetMQ的服务端,用于管理客户端连接,消息接收、持久化,分发等。

工作原理:

MDSServer一经实例化,就会根据配置文件MDSSettings.xml中配置的Applications,创建MDSClientApplication实例。同时还会创建一个MDSController实例,用于实现MDS管理功能。


MDSServer的启动过程如下(MDSServer.Start):

<1>使用StorageManager连接并打开数据库。


<2>CommunicationLayer启动通讯层,实现客户端连接的监听及管理(创建连接,注册连接,分配应用程序,监听数据接收)。

客户端与服务端建立Socket长连接的全过程如下:

(1)服务端TCPCommunicationManager启动监听Socket,开启线程监听并接收(TCPConnectionListener.DoListenAsThread)连入到指定端口上的客户端长连接(TCPCommunicator)。

(2)客户端实例化MDSClient,并执行MDSClient.Connect()。

(3)服务端TCPConnectionListener监听到新连接进来后,自动分配一个唯一的ComminicatorId,同时触发TCPConnectionListener的TCPClientConnected事件,在该事件中创建TCPCommunicator,再触发TCPCommunicationManager的CommunicatorConnected事件,该事件由CommunicationLayer的Manager_CommunicatorConnected托管。

在CommunicatorConnected事件中,将新TCPCommunicator加入到通讯层连接集合_communicators中,并立即启动该TCPCommunicator。开启线程,监听消息接收。

(4)客户端连接成功后,立即发送注册消息(MDSRegisterMessage),并等待服务端应答,如果超过30秒无应答,视之为注册失败,连接不成功。

(5)服务端TCPCommunicator接收到数据后,触发消息接收事件MessageReceived,该事件由CommunicationLayer的Communicator_MessageReceived方法托管,此事件只处理客户端的注册消息MDSRegisterMessage,收到客户端注册消息后,将客户端连接加入到客户端应用程序MDSClientApplication的_communicators中,并将该连接的ComminicatorId回发给客户端MDSClient。

(6)客户端收到服务端注册成功的消息后,更新本地TCPChannel的ComminicatorId。

至此客户端和服务端的Socket长连接正式完成握手对接,且都分配了唯一识别码ComminicatorId。


<3>OrganizationLayer启动组织层,实现消息的处理、响应和分发。


<3.1>MDSClientApplication启动客户端应用程序

根据配置文件MDSSettings.xml中配置的Applications,分别启动客户端应用程序,最后再启动一个MDSController程序。

MDSRemoteApplication自行管理TCPCommunicator的接入和关闭。

MDSClientApplication基于MDSRemoteApplication实现,MDSClientApplication的MessageReceived事件由OrganizationLayer的RemoteApplication_MessageReceived方法托管。

具体实现过程如下:

(a)MDSClientApplication的TCPCommunicator接收消息,通过MDSRemoteApplication的Communicator_MessageReceived方法,将消息塞入QueueProcessorThread的队列中。

(b)QueueProcessorThread监听到消息入队后,调用MDSRemoteApplication的IncomingMessageQueue_ProcessItem方法,在该方法中,触发MDSRemoteApplication的MessageReceived事件,该事件由OrganizationLayer的RemoteApplication_MessageReceived方法托管。

(c)RemoteApplication_MessageReceived方法实现对消息的数据持久化和发送回应消息(MDSOperationResultMessage)给客户端。

MDSController的MessageReceived事件由他自己的MDSController_MessageReceived方法托管。

MDSClientApplication实现以下功能:

消息发送者角色(Send_App):

(a)服务端应用程序等待接收客户端MDSClient发过来的消息。

(b)服务端应用程序将消息转给指定的接收应用程序。

(c)服务端接收应用程序完成消息的数据持久化和内存队列入列。

(d)消息入列后,服务端应用程序发送MDSOperationResultMessage消息通知客户端MDSClient,表示该消息已发送成功。

(e)客户端MDSClient收到服务端回发的消息发送成功的消息后,即表示发送操作完成,否则表示发送操作失败(操作超时、连线被关闭、消息被拒绝、其它未知异常)。


消息接收者角色(Receive_App):

(a)服务端MDSPersistentRemoteApplicationBase启动线程,通过ProcessWaitingMessageRecordsAsThread方法,从数据库message表或者内存队列_waitingMessages中提取消息,通过MessageDeliverer挑选一个订阅了消息的客户端MDSClient,并将此消息发送给此客户端。

(b)客户端消费完消息后,回发MDSOperationResultMessage消息给服务端。

(c)服务端MDSRemoteApplication通过Communicator_MessageReceived方法接收客户端MDSClient回发的消息已处理的响应消息(MDSOperationResultMessage)后,将该消息从数据库message表和内存队列_waitingMessages中移除。


<3.2>MDSServerGraph启用消息分发服务(未使用,不做详细介绍)


<3.2>MDSController启动MDS管理服务

系统提供了两个MDSController:

服务端MDSController:MDS.Organization.MDSController(继承自MDSRemoteApplication)

客户端MDSController:MDS.Management.MDSController(内部实现类似于MDSClient)

MDSController主要提供MQ的管理和控制服务。

客户端MDSController发送指令,获取MQ的运行时数据。

服务端MDSController响应指令,把MQ的运行时数据发送给客户端,或者执行客户端的控制指令,例如:删除消息。

提供的主要服务有:

(a)MessageTypeIdGetApplicationListMessage:获取客户端程序列表

(b)MessageTypeIdGetApplicationWaitingMessage:获取客户端当前待处理消息列表

(c)MessageTypeIdRemoveMsgMessage移除消息



MDSClient

功能说明:访问MDSServer的代理类,可实现消息的发送和接收。

工作原理:

<1>客户端和服务端自动连接管理

建立连接:

MDSClient实例化后,启动定时器用于管理客户端与服务端的长连接。即自动与服务端建立长连接(TCPChannel),并通过定时器,实现断开自动重连。

定时器工作原理:

如果未连接到服务端:创建连接到服务端的长连接(TCPChannel),向服务端注册该连接(发送MDSRegisterMessage消息),获取用于身份识别的连接ID。

如果已连接到服务端:每隔60秒往远程服务器发送ping消息(MDSPingMessage),如果发送失败,断开当前Socket连接,尝试自动重连直至再次连接成功。

关闭连接:

如果客户端发送ping消息失败,将关闭此连接,并触发连接状态改变事件。客户端通过TCPChannel的StateChanged事件,捕获当前客户端Socket连接的状态变化。当收到连接关闭的事件时,清理当前正在发送的数据包,使之发送失败。


<2>消息接收

<2.1>客户端通过TCPChannel的MessageReceived事件,接收来自服务端的全部数据包,数据包有以下几种类型:

MessageTypeIdMDSDataTransferMessage(1):数据消息

MessageTypeIdMDSOperationResultMessage(2):操作结果消息

MessageTypeIdMDSPingMessage(3):ping消息

MessageTypeIdMDSRegisterMessage(4):注册消息

MessageTypeIdMDSChangeCommunicationWayMessage(5):变更通讯方式消息

MessageTypeIdMDSControllerMessage(6):控制消息

MessageTypeIdMDSDataTransferResponseMessage(7):数据响应消息


<2.2>客户端通过QueueProcessorThread的ProcessItem事件,捕获服务端发过来的【消息】数据包。

可通过MDSClient的MessageReceived事件,接收消息,并处理该消息,如果不处理消息,消息将一直存储在消息队列中,处理该消息后,消息将从消息队列中移除。


<3>消息发送

默认设置:

发送的消息数据包不能超过50M(52428800字节)。

发送消息超时时间默认是5分钟。

一次完整的消息发送包含以下3个步骤:

客户端发送消息数据包给服务端。

服务端接收消息,存入数据库,然后发送确认消息数据包给客户端。

客户端等待服务端的确认消息,如果在指定的时间内(默认5分钟)等到了确认消息,即表示此次发送操作成功。如果未等到确认消息,即表示此次发送操作失败(操作超时、连线被关闭、消息被拒绝、其它未知异常)。


---------------------------------------------------------------------------------------------------------













【金蝶云星空BOS二次开发案例演示】https://vip.kingdee.com/article/94751030918525696

技术答疑.消息队列.DotNetMQ

【DotNetMQ】基于.NET的开源消息队列框架。https://www.codeproject.com/Articles/193611/DotNetMQ-A-Complete-Message-Queue-System-for-N...
点击下载文档
确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息