电脑桌面
添加蚂蚁七词文库到电脑桌面
安装后可以在桌面快捷访问

分布式事务 | 使用 dotnetcore/CAP 的本地消息表模式

来源:金蝶云社区作者:金蝶2024-09-163

分布式事务 | 使用 dotnetcore/CAP 的本地消息表模式

分布式事务 | 使用 dotnetcore/CAP 的本地消息表模式

本地消息表模式

本地消息表模式,其作为柔性事务的一种,核心是将一个分布式事务拆分为多个本地事务,事务之间通过事件消息衔接,事件消息和上个事务共用一个本地事务存储到本地消息表,再通过定时任务轮询本地消息表进行消息投递,下游业务订阅消息进行消费,本质上是依靠消息的重试机制达到最终一致性。其示意图如下所示,主要分为以下三步:

  1. 本地业务数据和发布的事件消息共享同一个本地事务,进行数据落库,其中事件消息持久化到单独的事件发件箱表中。

  2. 单独的进程或线程不断查询发件箱表中未发布的事件消息。

  3. 将未发布的事件消息发布到消息代理,然后将消息的状态更新为已发布。

dotnetcore/CAP 简介

在《.NET 微服务:适用于容器化 .NET 应用程序的体系结构》电子书中,提及了如何设计兼具原子性和弹性的事件总线,其中提出了三种思路:使用完整的事件溯源模式,使用事务日志挖掘,使用发件箱模式(The outbox pattern)。其中事件溯源模式实现相对复杂,事务日志挖掘局限于特定类型数据库,而发件箱模式则是一种相对平衡的实现方式,其基于事务数据库表和简化的事件溯源模式。发件箱模式的示意图如下所示:

从上图可以看出,其实现原理与上面提及的本地消息表模式十分相似,我们可以理解其也是本地消息表模式的一种实现。作者Savorboard也正是受该电子书启发,实现了.NET版本的本地消息表模式,并命名为dotnetcore/CAP,其架构如下图所示。其同时也兼具EventBus的功能,其支持主流消息代理,如RabbitMQ、Redis、Kafka和Pulsar,同时支持多种持久化存储方式进行消息存储,包括MySQL、PostgreSQL、SQL Server和MongoDB。因此基于dotnetcore/CAP,.NET 开发者也可以快速实现微服务间的异步通信和解决分布式事务问题。

基于dotnetcore/CAP 实现分布式事务

那具体如何使用dotnetcore/CAP来解决分布式事务问题呢,基于本地消息表加补偿模式实现。dotnetcore/CAP的补偿模式比较巧妙,其基于发布事件的方法签名中提供了一个回调参数。发布方法的事件签名为:PublishAsync<T>(string name, T? contentObj, string? callbackName=null),第一个参数是事件名称,第二个参数为事件数据包,第三个参数用来指定于接收事件消费结果的回调地址(事件),但是否触发回调,取决于事件订阅方是否定义返回参数,若有则触发。如果基于CAP实现下单流程,则其流程如下所示:

接下来就来创建解决方案来实现以上下单流程示例。依次创建以下项目,订单服务、库存服务和支付服务均依赖共享类库项目,其中共享类库添加DotNetCore.CapDotNetCore.Cap.MySqlDotNetCore.Cap.RabbitMQNuGet包。

项目项目名项目类型
订单服务CapDemo.OrderServiceASP.NET Core Web API
库存服务CapDemo.InventoryServiceWorker Service
支付服务CapDemo.PaymentServiceWorker Service
共享类库CapDemo.SharedClass Library

订单服务

订单服务首先需要暴露WebApi用于订单的创建,为了方便数据的持久化,首先添加Pomelo.EntityFrameworkCore.MySqlNuget包,然后创建OrderDbContext

using System;using System.Collections.Generic;using System.Linq;using System.Threading.Tasks;using Microsoft.EntityFrameworkCore;using CapDemo.OrderService.Domains;namespace CapDemo.OrderService.Data{    public class OrderDbContext : DbContext
    {        public OrderDbContext (DbContextOptions<OrderDbContext> options)
            : base(options) {}        public DbSet<CapDemo.OrderService.Domains.Order> Order { get; set; } = default!;
    }
}

然后创建OrdersController并添加PostOrder方法如下所示:

using System;using System.Collections.Generic;using System.Linq;using System.Threading.Tasks;using Microsoft.AspNetCore.Http;using Microsoft.AspNetCore.Mvc;using Microsoft.EntityFrameworkCore;using CapDemo.OrderService.Data;using CapDemo.OrderService.Domains;using DotNetCore.CAP;using CapDemo.Shared;using CapDemo.Shared.Models;namespace CapDemo.OrderService.Controllers{
    [Route("api/[controller]")]
    [ApiController]    public class OrdersController : ControllerBase
    {        private readonly OrderDbContext _context;        private readonly ICapPublisher _capPublisher;        private readonly ILogger<OrdersController> _logger;        public OrdersController(OrderDbContext context, ICapPublisher capPublisher,ILogger<OrdersController> logger)
        {
            _context = context;
            _capPublisher = capPublisher;
            _logger = logger;
        }
        [HttpPost]        public async Task<ActionResult<Order>> PostOrder(CreateOrderDto orderDto)
        {            var shoppingItems =
                orderDto.ShoppingCartItems.Select(item => new ShoppingCartItem(item.SkuId, item.Price, item.Qty));            var order = new Order(orderDto.CustomerId).NewOrder(shoppingItems.ToArray());            
            using (var trans = _context.Database.BeginTransaction(_capPublisher, autoCommit: false))
            {
                _context.Order.Add(order);                var deduceDto = new DeduceInventoryDto()
                {
                    OrderId = order.OrderId,
                    DeduceStockItems = order.OrderItems.Select(
                        item => new DeduceStockItem(item.SkuId, item.Qty, item.Price)).ToList()
                };                await _capPublisher.PublishAsync(TopicConsts.DeduceInventoryCommand,deduceDto,
                    callbackName: TopicConsts.CancelOrderCommand);                await _context.SaveChangesAsync();                await trans.CommitAsync();
            }
                
            _logger.LogInformation($"Order [{order.OrderId}] created successfully!");            return CreatedAtAction("GetOrder", new { id = order.OrderId }, order);
        }
    }
}

从代码中可以看出,在订单持久化和事件发布之前先行使用事务包裹:using (var trans = _context.Database.BeginTransaction(_capPublisher, autoCommit: false)) {},以确保订单和事件的持久化共享同一个事务,这一步是使用CAP的重中之重。订单服务通过注入了ICapPublisher服务,并通过PublishAsync方法发布扣减库存事件,并指定了callbackName: TopicConsts.CancelOrderCommand
订单服务还需要订阅取消订单和订单支付结果的事件,进行订单状态的更新,添加OrderConsumers如下所示,其中通过实现ICapSubscribe接口来显式标记为消费者,然后定义方法并在方法体上通过[CapSubscribe]特性指定订阅的事件名称来完成事件的消费。

using CapDemo.OrderService.Data;using CapDemo.Shared;using DotNetCore.CAP;namespace CapDemo.OrderService.Consumers;public class OrderConsumers:ICapSubscribe{    private readonly OrderDbContext _orderDbContext;    private readonly ILogger<OrderConsumers> _logger;    public OrderConsumers(OrderDbContext orderDbContext,ILogger<OrderConsumers> logger)
    {
        _orderDbContext = orderDbContext;
        _logger = logger;
    }
    [CapSubscribe(TopicConsts.CancelOrderCommand)]    public async Task CancelOrder(string orderId)
    {        if(string.IsNullOrEmpty(orderId)) return;        var order = await _orderDbContext.Order.FindAsync(orderId);
        order?.CancelOrder();
        _logger.LogWarning($"Order [{orderId}] has been canceled!");        await _orderDbContext.SaveChangesAsync();
    }

    [CapSubscribe(TopicConsts.PayOrderSucceedTopic)]    public async  Task MarkToPaid(string orderId)
    {        var order = await _orderDbContext.Order.FindAsync(orderId);
        
        order?.UpdateToPaid();        await _orderDbContext.SaveChangesAsync();
    }
}

最后修改Program.cs添加CAP服务和消费者的注册。

using CapDemo.OrderService.Consumers;using CapDemo.OrderService.Data;using Microsoft.EntityFrameworkCore;using DotNetCore.CAP;var builder = WebApplication.CreateBuilder(args);// 注册 DbContextvar connectionStr = builder.Configuration.GetConnectionString("Default");
builder.Services.AddDbContext<OrderDbContext>(options =>
    options.UseMySql(connectionStr ?? throw new InvalidOperationException("Connection string 'OrderDbContext' not found."), ServerVersion.AutoDetect(connectionStr)));// 注册CAPbuilder.Services.AddCap(x =>
{
    x.UseEntityFramework<OrderDbContext>();
    x.UseRabbitMQ("localhost");
});// 注册消费者builder.Services.AddTransient<OrderConsumers>();

库存服务

库存服务在整个下单流程的职责主要是库存的扣减和返还,添加InventoryConsumer来消费库存扣减和返还事件即可。

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Text.Json;using System.Threading.Tasks;using CapDemo.Shared;using CapDemo.Shared.Models;using DotNetCore.CAP;namespace

分布式事务 | 使用 dotnetcore/CAP 的本地消息表模式

分布式事务 | 使用 dotnetcore/CAP 的本地消息表模式本地消息表模式本地消息表模式,其作为柔性事务的一种,核心是将一个分布式事务拆...
点击下载文档文档为doc格式

声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。

已经是第一篇
确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息
QQ群
  • 答案:my7c点击这里加入QQ群
支持邮箱
微信
  • 微信