MQ服务

栏目:云苍穹知识作者:金蝶来源:金蝶云社区发布:2024-09-23浏览:1

MQ服务

# 1 简介 苍穹提供对消息中间件MQ的支持(目前支持rabbitmq),消息中间件是企业IT管理系统内部通讯的重要核心系统之一,具有系统解耦,异步化,削峰填谷等作用。 ***特性*** 1. 发布/订阅者模式 + 1->1,单个订阅者 + 1->N,广播,所有订阅者 2. 可靠性 + 消息可靠持久化 + 消息确认机制 + 消息重新发送 3. 消息发送事务支持 4. 消息隔离机制 + vhost 按集群隔离,一般可配置为集群名 + 消息按照region-queue进行隔离 + 区域region:可以用appid或领域名 + 队列queue:按功能 5. Region可以独立部署消息服务器 + mq.server=xxx.xxx.xxx.xxx 公共的 + mq.server.fi=xxx.xxx.xxx.xxx //region fi独立部署 # 2 应用场景 主要应用在系统解耦、异步化处理等场景。 # 3 接口说明 MQ服务相关接口定义和实现存在于bos-mq-1.0.jar中。 ## 3.1 接口列表 ***消费接口*** | 方法 | 说明 | | - | - | | onMessage | 消费消息 | ***消息确认接口*** | 方法 | 说明 | | - | - | | ack | 确认消息处理完毕 | | deny | 拒绝,返回队列,队列将再次发送给消息消费者 | | discard | 丢弃消息,队列将丢弃该条消息 | ***消息发送接口*** | 方法 | 说明 | | - | - | | publish | 发送消息 | | publishInDbTranscation | 发送消息,支持事务 | | close | 关闭资源 | ## 3.2 接口详情 ### onMessage + **功能描述** 消费消息。 + **方法** ```java void onMessage(Object message, String messageId, boolean resend, MessageAcker acker) ``` + **参数说明** | 参数 | 类型 | 说明 | | - | - | - | | message | Object | 消息内容 | | messageId | String | 消息id | | resend | boolean | 是否重新发送的,即前面发送的没有确认成功,mq服务器会重复发送过来 | | acker | MessageAcker | 消息确认器 | + **返回值** 无 + **示例代码** ```java public class DemoConsumer implements MessageConsumer{ public void onMessage(Object body, String messageId, boolean resend, MessageAcker acker) { try { String str = (String)body; //new String(bytes, "UTF-8"); logger.info(str); RequestContext rc = RequestContext.get(); //获取上下文 acker.ack(messageId); } catch (Throwable e) { boolean discard = e instanceof IOException; //是否废弃这条消息,根据具体场景判断 if (discard){ acker.discard(messageId);//废弃 // 记录废弃原因,并写业务日志 } else{ acker.deny(messageId);//告诉mq重发这条消息 // 记录异常原因,并写业务日志 } } } //当事务敏感,发送方和消费方不在同一个库时,需要重写设置消费方的routekey String getRouteKey() { return "consumerDbRouteKey”; } } ``` ​在具体消息消费者类的onMessage方法中,如果没有调用MessageAcker的方法,默认行为:当onMessage正常执行,该消息正常消费(acker.ack(messageId));当onMessage方法抛出异常,则记录日志并丢弃该消息(acker.discard(messageId))。 ​在业务消费者类onMessage方法的实现中,如示例代码所示,需要捕获所有异常,判断具体异常原因,可恢复异常调用 acker.deny(messageId)告诉mq重发这条消息,不可恢复异常调用acker.discard(messageId)丢弃该消息。记录异常原因,并写入业务日志记录。 ### publish + **功能描述** 发送消息。 + **方法** ```java void publish(byte[] message) void publish(String message) void publish(Object message) ``` + **参数说明** | 参数 | 类型 | 说明 | | - | - | - | | message | byte[] | 字节消息 | | message | String | 字符消息 | | message | Object | Object类型消息 | + **返回值** 无 ### publishInDbTranscation + **功能描述** 发送消息,支持事务。 + **方法** ```java void publishInDbTranscation(String routKey,Object message) ``` + **参数说明** | 参数 | 类型 | 说明 | | - | - | - | | routKey | String | 参数已废弃 | | message | Object | Object类型消息 | + **返回值** 无 + **示例代码** 支持事务(当前代码在事务环境中): ```java // Trascation begin MessagePublisher pub = MQFactory.get().createSimplePublisher("demo", "demo_queue"); try { //第一个参数现在不用了,不需要传 pub.publishInDbTranscation(null, "hello world"); } finally { //用完关闭 pub.close(); } // Transcation end ``` 不支持事务: ```java MessagePublisher pub = MQFactory.get().createSimplePublisher("demo", "demo_queue"); try { pub.publish(“hello world”); } finally{ //用完关闭 pub.close(); } ``` # 4 注意事项 ## 4.1 高级特性 1. 在Consumer中获取RequestContext,该RequestContext对应了发送者的RequestContext,可以在其中获取租户id,用户id等等。 2. 如果消息发送方没有RequestContext,消息将不能被消费,所以如果自建线程中发送mq消息,需要传递或创建RequestContext。 3. 在消费者代码中,消费者消费完后,应该调用ack方法,如果消费异常,需要判断当前消息需要重新消费,还是需要被丢弃。 4. 任何情况下,只要消息没有确认成功,如网络异常等,MQ服务器会重新发送消息,resend为true所以,消费者必须处理幂等情况,及支持重复消费消息。 ## 4.2 调试及常见问题 ### 调试 1. 默认情况下,消息会发送给任意一个消费者节点,在开发调试阶段无法保证自己发送的message被自己java进程消费到。 2. 在DebugServer中加一个参数: ```java System.setProperty("mq.debug.queue.tag", "lzx"); ``` 该参数必须全局唯一,才能确保被自己的java进程接收到。有时为分析问题需要连接部署环境进行调试,为了不影响部署环境的数据,需要添加参数: ```java System.setProperty("dubbo.registry.register", "false"); System.setProperty(" mq.consumer.register”, "false"); ``` 如果就是要调试mq消费的代码块,需要设置 ```java System.setProperty("dubbo.registry.register", "true"); System.setProperty("mq.debug.queue.tag", "随机字符串"); ``` ### 常见问题 1. 现场开发环境收不到mq消息,检查下面两项配置。 ```java System.setProperty("mq.consumer.register", "true");//该参数为false,本节点将不会消费mq消息 System.setProperty("mq.debug.queue.tag", "每个人的专属tag"); ``` 2. 现场测试环境或其它线上环境偶尔收不到mq消息: + 开发环境为调试问题连接到了测试环境,而又没合适的配置开发环境,导致测试环境的mq消息被开发人员的环境消费,此时开发环境需要设置(如开发人员太多,有必要禁止开发人员直连测试环境)。 ```java System.setProperty(“mq.consumer.register”, “true”);//该参数为false,本节点将不会消费mq消息 System.setProperty(“mq.debug.queue.tag”, “每个人的专属tag”); ``` + 测试环境和其它环境共用vhost,导致测试环境的消息被其它环境消费,此时需要检查所有环境的vhost设置,保证每个环境使用独立的vhost(一般可用集群名)。 ```java 在配置中心查询mq.server 检查vhost设置,若环境太多检查不过来,将当前环境vhost设置一个新的值,并在rabbitmq管理中心创建该vhost。 ``` # 5 公众号文章 [快速上手苍穹消息队列开发](https://mp.weixin.qq.com/s/rVR9_zOanCbgOkivnt1VnA)

MQ服务

# 1 简介苍穹提供对消息中间件MQ的支持(目前支持rabbitmq),消息中间件是企业IT管理系统内部通讯的重要核心系统之一,具有系统解耦,异...
点击下载文档
确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息