
# 1 简介
苍穹提供对消息中间件MQ的支持(目前支持rabbitmq),消息中间件是企业IT管理系统内部通讯的重要核心系统之一,具有系统解耦,异步化,削峰填谷等作用。
***特性***
1. 发布/订阅者模式
+ 1->1,单个订阅者
+ 1->N,广播,所有订阅者
2. 可靠性
+ 消息可靠持久化
+ 消息确认机制
+ 消息重新发送
3. 消息发送事务支持
4. 消息隔离机制
+ vhost 按集群隔离,一般可配置为集群名
+ 消息按照region-queue进行隔离
+ 区域region:可以用appid或领域名
+ 队列queue:按功能
5. Region可以独立部署消息服务器
+ mq.server=xxx.xxx.xxx.xxx 公共的
+ mq.server.fi=xxx.xxx.xxx.xxx //region fi独立部署
# 2 应用场景
主要应用在系统解耦、异步化处理等场景。
# 3 接口说明
MQ服务相关接口定义和实现存在于bos-mq-1.0.jar中。
## 3.1 接口列表
***消费接口***
| 方法 | 说明 |
| - | - |
| onMessage | 消费消息 |
***消息确认接口***
| 方法 | 说明 |
| - | - |
| ack | 确认消息处理完毕 |
| deny | 拒绝,返回队列,队列将再次发送给消息消费者 |
| discard | 丢弃消息,队列将丢弃该条消息 |
***消息发送接口***
| 方法 | 说明 |
| - | - |
| publish | 发送消息 |
| publishInDbTranscation | 发送消息,支持事务 |
| close | 关闭资源 |
## 3.2 接口详情
### onMessage
+ **功能描述**
消费消息。
+ **方法**
```java
void onMessage(Object message, String messageId, boolean resend, MessageAcker acker)
```
+ **参数说明**
| 参数 | 类型 | 说明 |
| - | - | - |
| message | Object | 消息内容 |
| messageId | String | 消息id |
| resend | boolean | 是否重新发送的,即前面发送的没有确认成功,mq服务器会重复发送过来 |
| acker | MessageAcker | 消息确认器 |
+ **返回值**
无
+ **示例代码**
```java
public class DemoConsumer implements MessageConsumer{
public void onMessage(Object body, String messageId, boolean resend, MessageAcker acker) {
try {
String str = (String)body; //new String(bytes, "UTF-8");
logger.info(str);
RequestContext rc = RequestContext.get(); //获取上下文
acker.ack(messageId);
} catch (Throwable e) {
boolean discard = e instanceof IOException; //是否废弃这条消息,根据具体场景判断
if (discard){
acker.discard(messageId);//废弃
// 记录废弃原因,并写业务日志
} else{
acker.deny(messageId);//告诉mq重发这条消息
// 记录异常原因,并写业务日志
}
}
}
//当事务敏感,发送方和消费方不在同一个库时,需要重写设置消费方的routekey
String getRouteKey() {
return "consumerDbRouteKey”;
}
}
```
在具体消息消费者类的onMessage方法中,如果没有调用MessageAcker的方法,默认行为:当onMessage正常执行,该消息正常消费(acker.ack(messageId));当onMessage方法抛出异常,则记录日志并丢弃该消息(acker.discard(messageId))。
在业务消费者类onMessage方法的实现中,如示例代码所示,需要捕获所有异常,判断具体异常原因,可恢复异常调用 acker.deny(messageId)告诉mq重发这条消息,不可恢复异常调用acker.discard(messageId)丢弃该消息。记录异常原因,并写入业务日志记录。
### publish
+ **功能描述**
发送消息。
+ **方法**
```java
void publish(byte[] message)
void publish(String message)
void publish(Object message)
```
+ **参数说明**
| 参数 | 类型 | 说明 |
| - | - | - |
| message | byte[] | 字节消息 |
| message | String | 字符消息 |
| message | Object | Object类型消息 |
+ **返回值**
无
### publishInDbTranscation
+ **功能描述**
发送消息,支持事务。
+ **方法**
```java
void publishInDbTranscation(String routKey,Object message)
```
+ **参数说明**
| 参数 | 类型 | 说明 |
| - | - | - |
| routKey | String | 参数已废弃 |
| message | Object | Object类型消息 |
+ **返回值**
无
+ **示例代码**
支持事务(当前代码在事务环境中):
```java
// Trascation begin
MessagePublisher pub =