技术答疑.消息队列.DotNetMQ
【DotNetMQ】
基于.NET的开源消息队列框架。
https://www.codeproject.com/Articles/193611/DotNetMQ-A-Complete-Message-Queue-System-for-NET
https://github.com/hikalkan/dotnetmq
【工作原理示意图】
【工作原理介绍】
DotNetMQ提供了3种角色:
客户端(MDSClient):负责客户端的消息发送和接收。
服务端(MDSServer):负责服务端的Socket连接管理,消息的接收、发送、分发。
管理端(MDSController):分别提供了客户端控制器和服务端控制器。客户端控制器可以实时获取服务端的相关运行数据,也可以发送控制指令给服务端。服务端控制器负责接收控制指令,执行控制指令。
【MDSServer】
功能说明:DotNetMQ的服务端,用于管理客户端连接,消息接收、持久化,分发等。
工作原理:
MDSServer一经实例化,就会根据配置文件MDSSettings.xml中配置的Applications,创建MDSClientApplication实例。同时还会创建一个MDSController实例,用于实现MDS管理功能。
MDSServer的启动过程如下(MDSServer.Start):
<1>使用StorageManager连接并打开数据库。
<2>CommunicationLayer启动通讯层,实现客户端连接的监听及管理(创建连接,注册连接,分配应用程序,监听数据接收)。
客户端与服务端建立Socket长连接的全过程如下:
(1)服务端TCPCommunicationManager启动监听Socket,开启线程监听并接收(TCPConnectionListener.DoListenAsThread)连入到指定端口上的客户端长连接(TCPCommunicator)。
(2)客户端实例化MDSClient,并执行MDSClient.Connect()。
(3)服务端TCPConnectionListener监听到新连接进来后,自动分配一个唯一的ComminicatorId,同时触发TCPConnectionListener的TCPClientConnected事件,在该事件中创建TCPCommunicator,再触发TCPCommunicationManager的CommunicatorConnected事件,该事件由CommunicationLayer的Manager_CommunicatorConnected托管。
在CommunicatorConnected事件中,将新TCPCommunicator加入到通讯层连接集合_communicators中,并立即启动该TCPCommunicator。开启线程,监听消息接收。
(4)客户端连接成功后,立即发送注册消息(MDSRegisterMessage),并等待服务端应答,如果超过30秒无应答,视之为注册失败,连接不成功。
(5)服务端TCPCommunicator接收到数据后,触发消息接收事件MessageReceived,该事件由CommunicationLayer的Communicator_MessageReceived方法托管,此事件只处理客户端的注册消息MDSRegisterMessage,收到客户端注册消息后,将客户端连接加入到客户端应用程序MDSClientApplication的_communicators中,并将该连接的ComminicatorId回发给客户端MDSClient。
(6)客户端收到服务端注册成功的消息后,更新本地TCPChannel的ComminicatorId。
至此客户端和服务端的Socket长连接正式完成握手对接,且都分配了唯一识别码ComminicatorId。
<3>OrganizationLayer启动组织层,实现消息的处理、响应和分发。
<3.1>MDSClientApplication启动客户端应用程序
根据配置文件MDSSettings.xml中配置的Applications,分别启动客户端应用程序,最后再启动一个MDSController程序。
MDSRemoteApplication自行管理TCPCommunicator的接入和关闭。
MDSClientApplication基于MDSRemoteApplication实现,MDSClientApplication的MessageReceived事件由OrganizationLayer的RemoteApplication_MessageReceived方法托管。
具体实现过程如下:
(a)MDSClientApplication的TCPCommunicator接收消息,通过MDSRemoteApplication的Communicator_MessageReceived方法,将消息塞入QueueProcessorThread的队列中。
(b)QueueProcessorThread监听到消息入队后,调用MDSRemoteApplication的IncomingMessageQueue_ProcessItem方法,在该方法中,触发MDSRemoteApplication的MessageReceived事件,该事件由OrganizationLayer的RemoteApplication_MessageReceived方法托管。
(c)RemoteApplication_MessageReceived方法实现对消息的数据持久化和发送回应消息(MDSOperationResultMessage)给客户端。
MDSController的MessageReceived事件由他自己的MDSController_MessageReceived方法托管。
MDSClientApplication实现以下功能:
消息发送者角色(Send_App):
(a)服务端应用程序等待接收客户端MDSClient发过来的消息。
(b)服务端应用程序将消息转给指定的接收应用程序。
(c)服务端接收应用程序完成消息的数据持久化和内存队列入列。
(d)消息入列后,服务端应用程序发送MDSOperationResultMessage消息通知客户端MDSClient,表示该消息已发送成功。
(e)客户端MDSClient收到服务端回发的消息发送成功的消息后,即表示发送操作完成,否则表示发送操作失败(操作超时、连线被关闭、消息被拒绝、其它未知异常)。
消息接收者角色(Receive_App):
(a)服务端MDSPersistentRemoteApplicationBase启动线程,通过ProcessWaitingMessageRecordsAsThread方法,从数据库message表或者内存队列_waitingMessages中提取消息,通过MessageDeliverer挑选一个订阅了消息的客户端MDSClient,并将此消息发送给此客户端。
(b)客户端消费完消息后,回发MDSOperationResultMessage消息给服务端。
(c)服务端MDSRemoteApplication通过Communicator_MessageReceived方法接收客户端MDSClient回发的消息已处理的响应消息(MDSOperationResultMessage)后,将该消息从数据库message表和内存队列_waitingMessages中移除。
<3.2>MDSServerGraph启用消息分发服务(未使用,不做详细介绍)
<3.2>MDSController启动MDS管理服务
系统提供了两个MDSController:
服务端MDSController:MDS.Organization.MDSController(继承自MDSRemoteApplication)
客户端MDSController:MDS.Management.MDSController(内部实现类似于MDSClient)
MDSController主要提供MQ的管理和控制服务。
客户端MDSController发送指令,获取MQ的运行时数据。
服务端MDSController响应指令,把MQ的运行时数据发送给客户端,或者执行客户端的控制指令,例如:删除消息。
提供的主要服务有:
(a)MessageTypeIdGetApplicationListMessage:获取客户端程序列表
(b)MessageTypeIdGetApplicationWaitingMessage:获取客户端当前待处理消息列表
(c)MessageTypeIdRemoveMsgMessage移除消息
【MDSClient】
功能说明:访问MDSServer的代理类,可实现消息的发送和接收。
工作原理:
<1>客户端和服务端自动连接管理
建立连接:
MDSClient实例化后,启动定时器用于管理客户端与服务端的长连接。即自动与服务端建立长连接(TCPChannel),并通过定时器,实现断开自动重连。
定时器工作原理:
如果未连接到服务端:创建连接到服务端的长连接(TCPChannel),向服务端注册该连接(发送MDSRegisterMessage消息),获取用于身份识别的连接ID。
如果已连接到服务端:每隔60秒往远程服务器发送ping消息(MDSPingMessage),如果发送失败,断开当前Socket连接,尝试自动重连直至再次连接成功。
关闭连接:
如果客户端发送ping消息失败,将关闭此连接,并触发连接状态改变事件。客户端通过TCPChannel的StateChanged事件,捕获当前客户端Socket连接的状态变化。当收到连接关闭的事件时,清理当前正在发送的数据包,使之发送失败。
<2>消息接收
<2.1>客户端通过TCPChannel的MessageReceived事件,接收来自服务端的全部数据包,数据包有以下几种类型:
MessageTypeIdMDSDataTransferMessage(1):数据消息
MessageTypeIdMDSOperationResultMessage(2):操作结果消息
MessageTypeIdMDSPingMessage(3):ping消息
MessageTypeIdMDSRegisterMessage(4):注册消息
MessageTypeIdMDSChangeCommunicationWayMessage(5):变更通讯方式消息
MessageTypeIdMDSControllerMessage(6):控制消息
MessageTypeIdMDSDataTransferResponseMessage(7):数据响应消息
<2.2>客户端通过QueueProcessorThread的ProcessItem事件,捕获服务端发过来的【消息】数据包。
可通过MDSClient的MessageReceived事件,接收消息,并处理该消息,如果不处理消息,消息将一直存储在消息队列中,处理该消息后,消息将从消息队列中移除。
<3>消息发送
默认设置:
发送的消息数据包不能超过50M(52428800字节)。
发送消息超时时间默认是5分钟。
一次完整的消息发送包含以下3个步骤:
客户端发送消息数据包给服务端。
服务端接收消息,存入数据库,然后发送确认消息数据包给客户端。
客户端等待服务端的确认消息,如果在指定的时间内(默认5分钟)等到了确认消息,即表示此次发送操作成功。如果未等到确认消息,即表示此次发送操作失败(操作超时、连线被关闭、消息被拒绝、其它未知异常)。
---------------------------------------------------------------------------------------------------------
【金蝶云星空BOS二次开发案例演示】https://vip.kingdee.com/article/94751030918525696
技术答疑.消息队列.DotNetMQ
本文2024-09-23 03:57:45发表“云星空知识”栏目。
本文链接:https://wenku.my7c.com/article/kingdee-k3cloud-162217.html