电脑桌面
添加蚂蚁七词文库到电脑桌面
安装后可以在桌面快捷访问

.NetCore中使用分布式事务DTM的二阶段消息

来源:金蝶云社区作者:金蝶2024-09-163

.NetCore中使用分布式事务DTM的二阶段消息

.NetCore中使用分布式事务DTM的二阶段消息

一、概述

二阶段消息是DTM新提出的,可以完美代替现有的事务消息和本地消息表架构。无论从复杂度、性能、便利性还是代码量都是完胜现有的方案。

相比现有的消息架构借助于各种消息中间件比如RocketMQ等,DTM自己实现了无需额外的学习成本。它能够保证本地事务的提交和全局事务提交是“原子的”,适合解决不需要回滚的分布式事务场景

二阶段消息保证提交的原子性和如何保证业务成功执行如下时序图:

 

 二阶段消息主要是指PrepareSubmit两个阶段,主程序向DTM服务发送Prepare消息,成功后执行本地事务,完成本地事务后发送Submit消息至DTM服务,之后DTM会调用分支事件执行其他服务,最后完成全局事务。

 当发送了Prepare但是Submit没有提交的话,会进行回调请求来确认消息的情况,具体工作过程如下:

 1、在处理本地事务时,会将gid插入到barrier表中,同时带上插入原因为committed。该表有一个唯一索引,主要字段为gid

 2、当进行回查时,二阶段消息的操作不是直接查gid是否存在,而是再insert ignore一条带有相同gid的数据,同时带上插入原因为rollbacked。此时如果表中如果已有gid的记录,那么新的插入操作就会被ignore,否则数据会被插入。

 3、然后再用gid查询表中的记录,如果查到记录的reasoncommitted,那么说明本地事务已提交;如果查到记录的reasonrollbacked,那么说明本地事务已回滚。

二、安装DTM

 我使用二进制包下载安装地址,我是Window环境所以下载后解压,点击dtm.exe进行运行即可,如下启动成功

启动成功后可以访问http://localhost:36789,进入管理后台

三、创建DTM所需的表

我们需要创建一个表处理消息的回查,表里保存全局事务ID,具体作用在后续说明,我这里用的SqlServer数据库,所以执行如下:

复制代码
CREATE TABLE [dbo].[barrier](    [id] bigint NOT NULL IDENTITY(1,1) PRIMARY KEY,    [trans_type] varchar(45) NOT NULL DEFAULT(''),    [gid] varchar(128) NOT NULL DEFAULT(''),    [branch_id] varchar(128) NOT NULL DEFAULT(''),    [op] varchar(45) NOT NULL DEFAULT(''),    [barrier_id] varchar(45) NOT NULL DEFAULT(''),    [reason] varchar(45) NOT NULL DEFAULT(''),    [create_time] datetime NOT NULL DEFAULT(getdate()) ,    [update_time] datetime NOT NULL DEFAULT(getdate())
)GOCREATE UNIQUE INDEX[ix_uniq_barrier] ON[dbo].[barrier]
        ([gid] ASC, [branch_id] ASC, [op] ASC, [barrier_id] ASC)WITH(IGNORE_DUP_KEY = ON)GO
复制代码

这里比较关键的是那个唯一索引,有一个IGNORE_DUP_KEY = ON,这个其实就是为了等价mysqlinsert ignore表示存在相关字段的信息则不插入,否则就插入数据

当然还支持很多其他的数据库,建表语句可以从这里查看地址

 四、创建项目

 我们简单的创建两个.net core webapi项目进行测试,两个项目都进行相同的如下操作:

 1、安装Dtmcli和Microsoft.EntityFrameworkCore.SqlServer

 安装Dtmcli是因为其中已经帮我们集成了DTM客户端SDK HTTP版本,想要GRPC版本可以安装Dtmgrpc

 安装Microsoft.EntityFrameworkCore.SqlServer很显然是为了处理数据库。

Install-Package Dtmcli
Install-Package Microsoft.EntityFrameworkCore.SqlServer

2、配置

接下来我们配置服务,先在配置文件appsetting.json中添加如下

复制代码
  "AppSettings": {    "DtmUrl": "http://localhost:36789",    "BusiUrl": "http://localhost:5056",    "QueryPreparedUrl": "http://localhost:5046",    "BarrierConn": "Data Source=.;Initial Catalog=HTGL;TrustServerCertificate=True;;Integrated Security=True"
  }
复制代码

 DtmUrlDTM的监听地址,http的是36789grpc的是36790

 BusiUrl:访问其他服务的地址

 QueryPreparedUrl:回查的地址

 BarrierConn:数据库连接语句

 添加一个配置类:

复制代码
    public class AppSettings
    {        public string DtmUrl { get; set; }        public string BusiUrl { get; set; }        public string BarrierConn { get; set; }        public string QueryPreparedUrl { get; set; }
    }
复制代码

 之后注入服务如下:

复制代码
builder.Services.AddDtmcli(dtm => { 
    dtm.DtmUrl = builder.Configuration.GetValue<string>("AppSettings:DtmUrl");
    dtm.SqlDbType = DtmCommon.Constant.Barrier.DBTYPE_SQLSERVER;
    dtm.BarrierSqlTableName = "[HTGL].[dbo].[barrier]";
});
builder.Services.Configure<AppSettings>(builder.Configuration.GetSection("AppSettings"));
复制代码

SqlDbType:表示使用的数据库类型

BarrierSqlTableNameBarrier表的名字

3、添加代码

我们在其中一个项目添加主程序代码如下:

复制代码
    [ApiController]public class DtmController : ControllerBase
    {        private readonly ILogger<DtmController> _logger;        private readonly IDtmClient _dtmClient;        private readonly IDtmTransFactory _transFactory;        private readonly AppSettings _settings;        private readonly IBranchBarrierFactory _factory;        public DtmController(ILogger<DtmController> logger, IDtmClient dtmClient,IDtmTransFactory transFactory, IOptions<AppSettings> settings, IBranchBarrierFactory factory)
        {
            _logger = logger;
            _dtmClient = dtmClient;
            _transFactory = transFactory;
            _settings = settings.Value;
            _factory = factory;
        }        private DbConnection GetConn() => new Microsoft.Data.SqlClient.SqlConnection(_settings.BarrierConn);
        [HttpPost("post-dtm-msg")]        public async Task<IActionResult> Get(CancellationToken cancellationToken)
        {            //1、创建gid
            var gid = await _dtmClient.GenGid(cancellationToken);            //2、设置分支事务
            var msg = _transFactory.NewMsg(gid)
                .Add(_settings.BusiUrl + "/TransOut

.NetCore中使用分布式事务DTM的二阶段消息

.NetCore中使用分布式事务DTM的二阶段消息一、概述二阶段消息是DTM新提出的,可以完美代替现有的事务消息和本地消息表架构。无论从复杂度、...
点击下载文档文档为doc格式

声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。

已经是第一篇
确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息
QQ群
  • 答案:my7c点击这里加入QQ群
支持邮箱
微信
  • 微信