×

推荐一个高性能的.NET分布式消息总线框架 NebulaBus

独孤求败 独孤求败 发表于2026-02-27 10:42:34 浏览35 评论0

抢沙发发表评论

NebulaBus 是一个高性能的 .NET 分布式事件总线框架,让开发者专注开发。

支持即时/延迟的广播、定向推送(如微服务间精准通信),内置Quartz.Net和失败重试机制,完美适配电商秒杀、物流追踪等高并发场景。

图片.png

功能特性

  • 双引擎驱动:基于 RabbitMQ 的毫秒级传输 + Redis 的高性能存储,动态负载均衡
  • 精确延迟发送:内置Quartz.net 进行精确延迟发送并支持多节点部署
  • 分布式部署:自动节点发现、故障转移,支持横向扩展
  • 轻量级内核:核心包仅 50KB
  • 配置简单,快速上手,让开发者专注开发

使用场景

  • 微服务解耦 | IoT 设备指令分发 | 金融交易异步结算 | 游戏服务器状态同步

  • 项目定位: 比 MassTransit 更易扩展和轻量化的 .NET 消息中间件解决方案。

如何使用

VS包管理器安装 直接在vs包管理器中搜索NabulaBus.Store.Memory 和NebulaBus.Transport.Memory,然后安装。

dotnet add package NebulaBus.Store.Memory
dotnet add package NebulaBus.Transport.Memory

创建订阅Handler

public classTestHandlerV1 : NebulaHandler<TestMessage>
{
    //订阅者唯一标识,用于定向发送
    publicoverridestring Name => "NebulaBus.TestHandler.V1";
    //订阅者组,用于广播,相同组的订阅都将收到消息
    publicoverridestring Group => "NebulaBus.TestHandler";
    //重试延迟,用于配置首次失败后多久重试,默认5秒
    publicoverride TimeSpan RetryDelay => TimeSpan.FromSeconds(10);
    //最大重试次数,默认10次
    publicoverrideint MaxRetryCount => 5;
    //重试间隔,默认10秒
    publicoverride TimeSpan RetryInterval => TimeSpan.FromSeconds(10);

    protected override async Task Handle(TestMessage message, NebulaHeader header)
    {
        Console.WriteLine($"{DateTime.Now} Received Message {Name}:{message.Message} Header:{header["customHeader"]} RetryCount:{header[NebulaHeader.RetryCount]}");
        //TODO: your logic code
    }
}

注册NebulaBus

builder.Services.AddNebulaBus(options =>
{
    //集群名称,它是可选的,默认为程序集名称
    options.ClusterName = "TestCluster";
    options.ExecuteThreadCount = 1;
    options.UseMemoryTransport();
    options.UserMemoryStore();
});

注册订阅者 Handler

//一个个注册
builder.Services.AddNebulaBusHandler<TestHandlerV1, TestMessage>();
builder.Services.AddNebulaBusHandler<TestHandlerV2, TestMessage>();
builder.Services.AddNebulaBusHandler<TestHandlerV3>();

//批量注册
builder.Services.AddNebulaBusHandler(typeof(TestHandlerV1).Assembly);

广播模式

//INebulaBus 接口
private readonly INebulaBus _bus;

//广播传入的是订阅者组名,所有相同组的订阅者都将收到消息
_bus.PublishAsync("NebulaBus.TestHandler"new TestMessage { Message = "Hello World" });

高性能

多消费线程模式

NebulaBus支持多线程消费,它默认是你的处理器个数,你也可以通过为不同的Handler设置不同的ExecuteThreadCount值来设置消费线程数。对于那些高流量,高并发的应用场景,你可以为它消费线程数。

多进程横向扩展

NebulaBus支持分布式,如果多线程不能满足你的需求,你可以将高并发的Handler分离到独立进程,并将它们分别部署到不同的机器或者K8s中,这样你的应用将拥有更好的扩展性,你可以随时添加新的机器或者K8s来扩展你的应用。

分布式最佳实践

要支持分布式,你必须配置Redis存储,请为不同的集群服务配置不同ClusterName,它将隔离集群服务之间的延迟消息,同时各个节点会通过竞争获取Redis锁来保证消息的唯一调度性。同时若有节点宕机,会有集群内的其他节点自动替换它,保证消息的可靠性

项目地址



https://nebulabus.jiewit.com



群贤毕至

访客