MassTransit 知多少 | 基于MassTransit Courier实现Saga 编排式分布式事务

MassTransit 知多少 | 基于MassTransit Courier实现Saga 编排式分布式事务
Saga 模式
Saga 最初出现在1987年Hector Garcaa-Molrna & Kenneth Salem发表的一篇名为《Sagas》的论文里。其核心思想是将长事务拆分为多个短事务,借助Saga事务协调器的协调,来保证要么所有操作都成功完成,要么运行相应的补偿事务以撤消先前完成的工作,从而维护多个服务之间的数据一致性。举例而言,假设有个在线购物网站,其后端服务划分为订单服务、支付服务和库存服务。那么一次下订单的Saga流程如下图所示:

在Saga模式中本地事务是Saga 参与者执行的工作单元,每个本地事务都会更新数据库并发布消息或事件以触发 Saga 中的下一个本地事务。如果本地事务失败,Saga 会执行一系列补偿事务,以撤消先前本地事务所做的更改。
对于Saga模式的实现又分为两种形式:
协同式:把Saga 的决策和执行顺序逻辑分布在Saga的每个参与方中,通过交换事件的方式进行流转。示例图如下所示:

编排式:把Saga的决策和执行顺序逻辑集中定义在一个Saga 编排器中。Saga 编排器发出命令式消息给各个Saga 参与方,指示这些参与方执行怎样的操作。

从上图可以看出,对于协同式Saga 存在一个致命的弊端,那就是存在循环依赖的问题,每个Saga参与方都需要订阅所有影响它们的事件,耦合性较高,且由于Saga 逻辑分散在各参与方,不便维护。相对而言,编排式Saga 则实现了关注点分离,协调逻辑集中在编排器中定义,Saga 参与者仅需实现供编排器调用的API 即可。
在.NET 中也有开箱即用的开源框架实现了编排式的Saga事务模型,也就是MassTransit Courier,接下来就来实际探索一番。
MassTransit Courier 简介
MassTransit Courier 是对Routing Slip(路由单) 模式的实现。该模式用于运行时动态指定消息处理步骤,解决不同消息可能有不同消息处理步骤的问题。实现机制是消息处理流程的开始,创建一个路由单,这个路由单定义消息的处理步骤,并附加到消息中,消息按路由单进行传输,每个处理步骤都会查看_路由单_并将消息传递到路由单中指定的下一个处理步骤。
在MassTransit Courier中是通过抽象IActivity和RoutingSlip来实现了Routing Slip模式。通过按需有序组合一系列的Activity,得到一个用来限定消息处理顺序的Routing Slip。而每个Activity的具体抽象就是IActivity和IExecuteActivity。二者的差别在于IActivity定义了Execute和Compensate两个方法,而IExecuteActivitiy仅定义了Execute方法。其中Execute代表正向操作,Compensate代表反向补偿操作。用一个简单的下单流程:创建订单->扣减库存->支付订单举例而言,使用Courier的实现示意图如下所示:
基于Courier 实现编排式Saga事务
那具体如何使用MassTransit Courier来应用编排式Saga 模式呢,接下来就来创建解决方案来实现以上下单流程示例。
创建解决方案
依次创建以下项目,除共享类库项目外,均安装MassTransit和MassTransit.RabbitMQNuGet包。
| 项目 | 项目名 | 项目类型 |
|---|---|---|
| 订单服务 | MassTransit.CourierDemo.OrderService | ASP.NET Core Web API |
| 库存服务 | MassTransit.CourierDemo.InventoryService | Worker Service |
| 支付服务 | MassTransit.CourierDemo.PaymentService | Worker Service |
| 共享类库 | MassTransit.CourierDemo.Shared | Class Library |
三个服务都添加扩展类MassTransitServiceExtensions,并在Program.cs类中调用services.AddMassTransitWithRabbitMq();注册服务。
using System.Reflection;using MassTransit.CourierDemo.Shared.Models;namespace MassTransit.CourierDemo.InventoryService;public static class MassTransitServiceExtensions{ public static IServiceCollection AddMassTransitWithRabbitMq(this IServiceCollection services)
{ return services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter(); // By default, sagas are in-memory, but should be changed to a durable
// saga repository.
x.SetInMemorySagaRepositoryProvider(); var entryAssembly = Assembly.GetEntryAssembly();
x.AddConsumers(entryAssembly);
x.AddSagaStateMachines(entryAssembly);
x.AddSagas(entryAssembly);
x.AddActivities(entryAssembly);
x.UsingRabbitMq((context, busConfig) =>
{
busConfig.Host(
host: "localhost",
port: 5672,
virtualHost: "masstransit",
configure: hostConfig =>
{
hostConfig.Username("guest");
hostConfig.Password("guest");
});
busConfig.ConfigureEndpoints(context);
});
});
}
}订单服务
订单服务作为下单流程的起点,需要承担构建RoutingSlip的职责,因此可以创建一个OrderRoutingSlipBuilder来构建RoutingSlip,代码如下:
using MassTransit.Courier.Contracts;using MassTransit.CourierDemo.Shared.Models;namespace MassTransit.CourierDemo.OrderService;public static class OrderRoutingSlipBuilder{ public static RoutingSlip BuildOrderRoutingSlip(CreateOrderDto createOrderDto)
{ var createOrderAddress = new Uri("queue:create-order_execute"); var deduceStockAddress = new Uri("queue:deduce-stock_execute"); var payAddress = new Uri("queue:pay-order_execute");
var routingSlipBuilder = new RoutingSlipBuilder(Guid.NewGuid());
routingSlipBuilder.AddActivity(
name: "order-activity",
executeAddress: createOrderAddress,
arguments: createOrderDto);
routingSlipBuilder.AddActivity(name: "deduce-stock-activity", executeAddress: deduceStockAddress);
routingSlipBuilder.AddActivity(name: "pay-activity", executeAddress: payAddress); var routingSlip = routingSlipBuilder.Build(); return routingSlip;
}
}从以上代码可知,构建一个路由单需要以下几步:
明确业务用例涉及的具体用例,本例中为:
创建订单:CreateOrder
扣减库存:DeduceStock
支付订单:PayOrder
根据用例名,按短横线隔开命名法(kebab-case)定义用例执行地址,格式为
queue:<usecase>_execute,本例中为:创建订单执行地址:queue:create-order_execute
扣减库存执行地址:queue:deduce-stock_execute
支付订单执行地址:queue:pay-order_execute
创建路由单:
通过
RoutingSlipBuilder(Guid.NewGuid())创建路由单构建器实例根据业务用例流转顺序,调用
AddActivity()方法依次添加Activity用来执行用例,因为第一个创建订单用例需要入口参数,因此传入了一个CreateOrderDtoDTO(Data Transfer Object)对象调用
Build()方法创建路由单
对于本例而言,由于下单流程是固定流程,因此以上路由单的构建也是按业务用例进行定义的。而路由单的强大之处在于,可以按需动态组装。在实际电商场景中,有些订单是无需执行库存扣减的,比如充值订单,对于这种情况,仅需在创建路由单时判断若为充值订单则不添加扣减库存的Activity即可。
对于订单服务必然要承担创建订单的职责,定义CreateOrderActivity(Activity的命名要与上面定义的用例对应)如下,其中OrderRepository为一个静态订单仓储类:
public class CreateOrderActivity : IActivity<CreateOrderDto, CreateOrderLog>
{ private readonly ILogger<CreateOrderActivity> _logger; public CreateOrderActivity(ILogger<CreateOrderActivity> logger)
{
_logger = logger;
} // 订单创建
public async Task<ExecutionResult> Execute(ExecuteContext<CreateOrderDto> context)
{ var order = await CreateOrder(context.Arguments); var log = new CreateOrderLog(order.OrderId, order.CreatedTime);
_logger.LogInformation($"Order [{order.OrderId}] created successfully!"); return context.CompletedWithVariables(log, new {order.OrderId});
} private async Task<Order> CreateOrder(CreateOrderDto orderDto)
{ var shoppingItems =
orderDto.ShoppingCartItems.Select(item => new ShoppingCartItem(item.SkuId, item.Price, item.Qty)); var order = new Order(orderDto.CustomerId).NewOrder(shoppingItems.ToArray()); await OrderRepository.Insert(order); return order;
} // 订单补偿(取消订单)
public async Task<CompensationResult> Compensate(CompensateContext<CreateOrderLog> context)
{ var order = await OrderRepository.Get(context.Log.OrderId);
order.CancelOrder(); var exception = context.Message.ActivityExceptions.FirstOrDefault();
_logger.LogWarning( $"Order [{order.OrderId} has been canceled duo to {exception.ExceptionInfo.Message}!"); return context.Compensated();
}
}从以上代码可知,实现一个Activity,需要以下步骤:
定义实现
IActivity<in TArguments, in TLog>需要的参数类:TArguments对应正向执行入口参数,会在Execute方法中使用,本例中为CreateOrderDto,用于订单创建。TLog对应反向补偿参数,会在Compensate方法中使用,本例中为CreateOrderLog,用于订单取消。实现
IActivity<in TArguments, in TLog>接口中的Execute方法:具体用例的实现,本例中对应订单创建逻辑
创建
TLog反向补偿参数实例,以便业务异常时能够按需补偿返回Activity执行结果,并按需传递参数至下一个Activity,本例仅传递订单Id至下一流程。
实现
IActivity<in TArguments, in TLog>接口中的Compensate方法:具体反向补偿逻辑的实现,本例中对应取消订单
返回反向补偿执行结果
订单服务的最后一步就是定义WebApi来接收创建订单请求,为简要起便创建OrderController如下:
using MassTransit.CourierDemo.Shared.Models;using Microsoft.AspNetCore.Mvc;namespace MassTransit.CourierDemo.OrderService.Controllers;
[ApiController]
[Route("[controller]")]public class OrderController : ControllerBase{ private readonly IBus _bus; public OrderController(IBus bus)
{
_bus = bus;
}
[HttpPost] public async Task<IActionResult> CreateOrder(CreateOrderDto createOrderDto)
{ // 创建订单路由单
var orderRoutingSlip = OrderRoutingSlipBuilder.BuildOrderRoutingSlip(createOrderDto); // 执行订单流程
await _bus.Execute(orderRoutingSlip); return Ok();
}
}库存服务
库存服务在整个下单流程的职责主要是库存的扣减和返还,但由于从上游用例仅传递了OrderId参数到库存扣减Activity,因此在库存服务需要根据OrderId 去请求订单服务获取要扣减的库存项才能执行扣减逻辑。而这可以通过使用MassTransit的Reqeust/Response 模式来实现,具体步骤如下:
在共享类库
MassTransit.CourierDemo.Shared中定义IOrderItemsRequest和IOrderItemsResponse:
namespace MassTransit.CourierDemo.Shared.Models;public interface IOrderItemsRequest{ public string OrderId { get; }
}public interface IOrderItemsResponse{ public List<DeduceStockItem> DeduceStockItems { get; set; } public string OrderId { get; set; }
}在订单服务中实现
IConsumer<IOrderItemsRequest:
using MassTransit.CourierDemo.OrderService.Repositories;using MassTransit.CourierDemo.Shared.Models;namespace MassTransit.CourierDemo.OrderService.Consumers;public class OrderItemsRequestConsumer : IConsumer<IOrderItemsRequest>
{ public async Task Consume(ConsumeContext<IOrderItemsRequest> context)
{ var order = await OrderRepository.Get(context.Message.OrderId); await context.RespondAsync<IOrderItemsResponse>(new
{
order.OrderId,
DeduceStockItems = order.OrderItems.Select(
item => new DeduceStockItem(item.SkuId, item.Qty)).ToList()
});
}
}在库存服务注册
service.AddMassTransit()中注册x.AddRequestClient<IOrderItemsRequest>();:
using System.Reflection;using MassTransit.CourierDemo.Shared.Models;namespace MassTransit.CourierDemo.InventoryService;public static class MassTransitServiceExtensions{ public static IServiceCollection AddMassTransitWithRabbitMq(this IServiceCollection services)
{ return services.AddMassTransit(x =>
{ //...
x.AddRequestClient<IOrderItemsRequest>(); //...
});
}
}在需要的类中注册
IRequestClient<OrderItemsRequest>服务即可。
最终扣减库存的Activity实现如下:
public class DeduceStockActivity : IActivity<DeduceOrderStockDto, DeduceStockLog>
{ private readonly IRequestClient<IOrderItemsRequest> _orderItemsRequestClient; private readonly ILogger<DeduceStockActivity> _logger; public DeduceStockActivity(IRequestClient<IOrderItemsRequest> orderItemsRequestClient,
ILogger<DeduceStMassTransit 知多少 | 基于MassTransit Courier实现Saga 编排式分布式事务
声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。



