MQ服务
# 1 简介
苍穹提供对消息中间件MQ的支持(目前支持rabbitmq),消息中间件是企业IT管理系统内部通讯的重要核心系统之一,具有系统解耦,异步化,削峰填谷等作用。
***特性***
1. 发布/订阅者模式
+ 1->1,单个订阅者
+ 1->N,广播,所有订阅者
2. 可靠性
+ 消息可靠持久化
+ 消息确认机制
+ 消息重新发送
3. 消息发送事务支持
4. 消息隔离机制
+ vhost 按集群隔离,一般可配置为集群名
+ 消息按照region-queue进行隔离
+ 区域region:可以用appid或领域名
+ 队列queue:按功能
5. Region可以独立部署消息服务器
+ mq.server=xxx.xxx.xxx.xxx 公共的
+ mq.server.fi=xxx.xxx.xxx.xxx //region fi独立部署
# 2 应用场景
主要应用在系统解耦、异步化处理等场景。
# 3 接口说明
MQ服务相关接口定义和实现存在于bos-mq-1.0.jar中。
## 3.1 接口列表
***消费接口***
| 方法 | 说明 |
| - | - |
| onMessage | 消费消息 |
***消息确认接口***
| 方法 | 说明 |
| - | - |
| ack | 确认消息处理完毕 |
| deny | 拒绝,返回队列,队列将再次发送给消息消费者 |
| discard | 丢弃消息,队列将丢弃该条消息 |
***消息发送接口***
| 方法 | 说明 |
| - | - |
| publish | 发送消息 |
| publishInDbTranscation | 发送消息,支持事务 |
| close | 关闭资源 |
## 3.2 接口详情
### onMessage
+ **功能描述**
消费消息。
+ **方法**
```java
void onMessage(Object message, String messageId, boolean resend, MessageAcker acker)
```
+ **参数说明**
| 参数 | 类型 | 说明 |
| - | - | - |
| message | Object | 消息内容 |
| messageId | String | 消息id |
| resend | boolean | 是否重新发送的,即前面发送的没有确认成功,mq服务器会重复发送过来 |
| acker | MessageAcker | 消息确认器 |
+ **返回值**
无
+ **示例代码**
```java
public class DemoConsumer implements MessageConsumer{
public void onMessage(Object body, String messageId, boolean resend, MessageAcker acker) {
try {
String str = (String)body; //new String(bytes, "UTF-8");
logger.info(str);
RequestContext rc = RequestContext.get(); //获取上下文
acker.ack(messageId);
} catch (Throwable e) {
boolean discard = e instanceof IOException; //是否废弃这条消息,根据具体场景判断
if (discard){
acker.discard(messageId);//废弃
// 记录废弃原因,并写业务日志
} else{
acker.deny(messageId);//告诉mq重发这条消息
// 记录异常原因,并写业务日志
}
}
}
//当事务敏感,发送方和消费方不在同一个库时,需要重写设置消费方的routekey
String getRouteKey() {
return "consumerDbRouteKey”;
}
}
```
在具体消息消费者类的onMessage方法中,如果没有调用MessageAcker的方法,默认行为:当onMessage正常执行,该消息正常消费(acker.ack(messageId));当onMessage方法抛出异常,则记录日志并丢弃该消息(acker.discard(messageId))。
在业务消费者类onMessage方法的实现中,如示例代码所示,需要捕获所有异常,判断具体异常原因,可恢复异常调用 acker.deny(messageId)告诉mq重发这条消息,不可恢复异常调用acker.discard(messageId)丢弃该消息。记录异常原因,并写入业务日志记录。
### publish
+ **功能描述**
发送消息。
+ **方法**
```java
void publish(byte[] message)
void publish(String message)
void publish(Object message)
```
+ **参数说明**
| 参数 | 类型 | 说明 |
| - | - | - |
| message | byte[] | 字节消息 |
| message | String | 字符消息 |
| message | Object | Object类型消息 |
+ **返回值**
无
### publishInDbTranscation
+ **功能描述**
发送消息,支持事务。
+ **方法**
```java
void publishInDbTranscation(String routKey,Object message)
```
+ **参数说明**
| 参数 | 类型 | 说明 |
| - | - | - |
| routKey | String | 参数已废弃 |
| message | Object | Object类型消息 |
+ **返回值**
无
+ **示例代码**
支持事务(当前代码在事务环境中):
```java
// Trascation begin
MessagePublisher pub = MQFactory.get().createSimplePublisher("demo", "demo_queue");
try {
//第一个参数现在不用了,不需要传
pub.publishInDbTranscation(null, "hello world");
} finally {
//用完关闭
pub.close();
}
// Transcation end
```
不支持事务:
```java
MessagePublisher pub = MQFactory.get().createSimplePublisher("demo", "demo_queue");
try {
pub.publish(“hello world”);
} finally{
//用完关闭
pub.close();
}
```
# 4 注意事项
## 4.1 高级特性
1. 在Consumer中获取RequestContext,该RequestContext对应了发送者的RequestContext,可以在其中获取租户id,用户id等等。
2. 如果消息发送方没有RequestContext,消息将不能被消费,所以如果自建线程中发送mq消息,需要传递或创建RequestContext。
3. 在消费者代码中,消费者消费完后,应该调用ack方法,如果消费异常,需要判断当前消息需要重新消费,还是需要被丢弃。
4. 任何情况下,只要消息没有确认成功,如网络异常等,MQ服务器会重新发送消息,resend为true所以,消费者必须处理幂等情况,及支持重复消费消息。
## 4.2 调试及常见问题
### 调试
1. 默认情况下,消息会发送给任意一个消费者节点,在开发调试阶段无法保证自己发送的message被自己java进程消费到。
2. 在DebugServer中加一个参数:
```java
System.setProperty("mq.debug.queue.tag", "lzx");
```
该参数必须全局唯一,才能确保被自己的java进程接收到。有时为分析问题需要连接部署环境进行调试,为了不影响部署环境的数据,需要添加参数:
```java
System.setProperty("dubbo.registry.register", "false");
System.setProperty(" mq.consumer.register”, "false");
```
如果就是要调试mq消费的代码块,需要设置
```java
System.setProperty("dubbo.registry.register", "true");
System.setProperty("mq.debug.queue.tag", "随机字符串");
```
### 常见问题
1. 现场开发环境收不到mq消息,检查下面两项配置。
```java
System.setProperty("mq.consumer.register", "true");//该参数为false,本节点将不会消费mq消息
System.setProperty("mq.debug.queue.tag", "每个人的专属tag");
```
2. 现场测试环境或其它线上环境偶尔收不到mq消息:
+ 开发环境为调试问题连接到了测试环境,而又没合适的配置开发环境,导致测试环境的mq消息被开发人员的环境消费,此时开发环境需要设置(如开发人员太多,有必要禁止开发人员直连测试环境)。
```java
System.setProperty(“mq.consumer.register”, “true”);//该参数为false,本节点将不会消费mq消息
System.setProperty(“mq.debug.queue.tag”, “每个人的专属tag”);
```
+ 测试环境和其它环境共用vhost,导致测试环境的消息被其它环境消费,此时需要检查所有环境的vhost设置,保证每个环境使用独立的vhost(一般可用集群名)。
```java
在配置中心查询mq.server 检查vhost设置,若环境太多检查不过来,将当前环境vhost设置一个新的值,并在rabbitmq管理中心创建该vhost。
```
# 5 公众号文章
[快速上手苍穹消息队列开发](https://mp.weixin.qq.com/s/rVR9_zOanCbgOkivnt1VnA)
MQ服务
# 1 简介苍穹提供对消息中间件MQ的支持(目前支持rabbitmq),消息中间件是企业IT管理系统内部通讯的重要核心系统之一,具有系统解耦,异...
点击下载文档
上一篇:Algo-内存数据库计算引擎接口说明下一篇:异常处理
本文2024-09-23 00:27:53发表“云苍穹知识”栏目。
本文链接:https://wenku.my7c.com/article/kingdee-cangqiong-139630.html
您需要登录后才可以发表评论, 登录登录 或者 注册
最新文档
热门文章