电脑桌面
添加蚂蚁七词文库到电脑桌面
安装后可以在桌面快捷访问

MassTransit | .NET 分布式应用框架

来源:金蝶云社区作者:金蝶2024-09-167

MassTransit | .NET 分布式应用框架

MassTransit | .NET 分布式应用框架

引言

A free, open-source distributed application framework for .NET.
一个免费、开源的.NET 分布式应用框架。 -- MassTransit 官网

MassTransit,直译公共交通, 是由Chris Patterson开发的基于消息驱动的.NET 分布式应用框架,其核心思想是借助消息来实现服务之间的松耦合异步通信,进而确保应用更高的可用性、可靠性和可扩展性。通过对消息模型的高度抽象,以及对主流的消息代理(包括RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS等)的集成,大大简化了基于消息驱动的开发门槛,同时内置了连接管理、消息序列化和消费者生命周期管理,以及诸如重试、限流、断路器等异常处理机制,让开发者更好的专注于业务实现。
简而言之,MassTransit实现了消息代理透明化。无需面向消息代理编程进行诸如连接管理、队列的申明和绑定等操作,即可轻松实现应用间消息的传递和消费。

快速体验

空口无凭,创建一个项目快速体验一下。

  1. 基于worker模板创建一个基础项目:dotnet new worker -n MassTransit.Demo

  2. 打开项目,添加NuGet包:MassTransit

  3. 定义订单创建事件消息契约:

using System;namespace MassTransit.Demo{    public record OrderCreatedEvent
    {        public Guid OrderId { get; set; }
    }
}
  1. 修改Worker类,发送订单创建事件:

namespace MassTransit.Demo;public class Worker : BackgroundService{    readonly IBus _bus;//注册总线
    public Worker(IBus bus)
    {
        _bus = bus;
    }    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {        while (!stoppingToken.IsCancellationRequested)
        {            //模拟并发送订单创建事件
            await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken);            await Task.Delay(1000, stoppingToken);
        }
    }
}
  1. 仅需实现IConsumer<OrderCreatedEvent>泛型接口,即可实现消息的订阅:

public class OrderCreatedEventConsumer: IConsumer<OrderCreatedEvent>
{    private readonly ILogger<OrderCreatedEventConsumer> _logger;    public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger)
    {
        _logger = logger;
    }    public Task Consume(ConsumeContext<OrderCreatedEvent> context)
    {
        _logger.LogInformation($"Received Order:{context.Message.OrderId}");        return Task.CompletedTask;
    }
}
  1. 注册服务:

using MassTransit;using MassTransit.Demo;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.AddHostedService<Worker>();
        services.AddMassTransit(configurator =>
        {            //注册消费者
            configurator.AddConsumer<OrderCreatedEventConsumer>();            //使用基于内存的消息路由传输
            configurator.UsingInMemory((context, cfg) =>
            {
                cfg.ConfigureEndpoints(context);
            });
        });
    })
    .Build();await host.RunAsync();
  1. 运行项目,一个简单的进程内事件发布订阅的应用就完成了。

如果需要使用RabbitMQ 消息代理进行消息传输,则仅需安装MassTransit.RabbitMQNuGet包,然后指定使用RabbitMQ 传输消息即可。

using MassTransit;using MassTransit.Demo;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.AddHostedService<Worker>();
        services.AddMassTransit(configurator =>
        {
            configurator.AddConsumer<OrderCreatedEventConsumer>();            
            // configurator.UsingInMemory((context, cfg) =>
            // {
            //     cfg.ConfigureEndpoints(context);
            // });
            
            configurator.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host(
                    host: "localhost",
                    port: 5672,
                    virtualHost: "/",
                    configure: hostConfig =>
                    {
                        hostConfig.Username("guest");
                        hostConfig.Password("guest");
                    });
                cfg.ConfigureEndpoints(context);
            });
        });
    })
    .Build();await host.RunAsync();

运行项目,MassTransit会自动在指定的RabbitMQ上创建一个类型为fanoutMassTransit.Demo.OrderCreatedEventExchange和一个与OrderCreatedEvent同名的队列进行消息传输,如下图所示。

核心概念

MassTranist 为了实现消息代理的透明化和应用间消息的高效传输,抽象了以下概念,其中消息流转流程如下图所示:

  1. Message:消息契约,定义了消息生产者和消息消费者之间的契约。

  2. Producer:生产者,发送消息的一方都可以称为生产者。

  3. SendEndpoint:发送端点,用于将消息内容序列化,并发送到传输模块。

  4. Transport:传输模块,消息代理透明化的核心,用于和消息代理通信,负责发送和接收消息。

  5. ReceiveEndpoint:接收端点,用于从传输模块接收消息,反序列化消息内容,并将消息路由到消费者。

  6. Consumer:消费者,用于消息消费。

从上图可知,本质上还是发布订阅模式的实现,接下来就核心概念进行详解。

Message

Message:消息,可以使用class、interface、struct和record来创建,消息作为一个契约,需确保创建后不能篡改,因此应只保留只读属性且不应包含方法和行为。MassTransit使用的是包含命名空间的完全限定名即typeof(T).FullName来表示特定的消息类型。因此若在另外的项目中消费同名的消息类型,需确保消息的命名空间相同。另外需注意消息不应继承,以避免发送基类消息类型造成的不可预期的结果。为避免此类情况,官方建议使用接口来定义消息。在MassTransit中,消息主要分为两种类型:

  1. Command:命令,用于告诉服务做什么,命令被发送到指定端点,仅被一个服务接收并执行。一般以动名词结构命名,如:UpdateAddress、CancelOrder。

  2. Event:事件,用于告诉服务什么发生了,事件被发布到多个端点,可以被多个服务消费。 一般以过去式结构命名,如:AddressUpdated,OrderCanceled。

经过MassTransit发送的消息,会使用信封包装,包含一些附加信息,数据结构举例如下:

{    "messageId": "6c600000-873b-00ff-9a8f-08da8da85542",    "requestId": null,    "correlationId": null,    "conversationId": "6c600000-873b-00ff-9526-08da8da85544",    "initiatorId": null,    "sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true",    "destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent",    "responseAddress": null,    "faultAddress": null,    "messageType": [        "urn:message:MassTransit.Demo:OrderCreatedEvent"
    ],    "message": {        "orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8"
    },    "expirationTime": null,    "sentTime": "2022-09-03T12:32:15.0796943Z",    "headers": {},    "host": {        "machineName": "THINKPAD",        "processName": "MassTransit.Demo",        "processId": 24684,        "assembly": "MassTransit.Demo",        "assemblyVersion": "1.0.0.0",        "frameworkVersion": "6.0.5",        "massTransitVersion": "8.0.6.0",        "operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0"
    }
}

从以上消息实例中可以看出一个包装后的消息包含以下核心属性:

  1. messageId:全局唯一的消息ID

  2. messageType:消息类型

  3. message:消息体,也就是具体的消息实例

  4. sourceAddress:消息来源地址

  5. destinationAddress:消息目标地址

  6. responseAddress:响应地址,在请求响应模式中使用

  7. faultAddress:消息异常发送地址,用于存储异常消费消息

  8. heade

MassTransit | .NET 分布式应用框架

MassTransit | .NET 分布式应用框架引言A free, open-source distributed application framework for .NET.一个免费、开源的.NET...
点击下载文档文档为doc格式

声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。

已经是第一篇
确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息
QQ群
  • 答案:my7c点击这里加入QQ群
支持邮箱
微信
  • 微信