如果我有一系列的消息类型,要在MassTransit中注册一个通用的消费者适配器,应该如何操作?

11
我成功地在一个愚蠢的示例应用程序中使用MassTransit,我在发布者控制台应用程序中发布一条消息(一个事件),然后在两个不同的消费者控制台应用程序中使用RabbitMq接收它。
这是整个示例项目的git仓库链接:https://gitlab.com/sunnyatticsoftware/DiegoDrivenDesign/DiDrDe.MessageBus
我希望有一个项目来封装MassTransit的功能,这样我的发布者和消费者项目就不需要知道MassTransit的存在。依赖关系应该是这样的:
  • DiDrDe.MessageBus ==> MassTransit
  • DiDrDe.MessageBus ==> DiDrDe.Contracts
  • DiDrDe.Model ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDrDe.MessageBus
  • DiDrDe.Publisher ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDrDe.Model
  • DiDrDe.ConsumerOne ==> DiDrDe.Contracts
  • DiDrDe.ConsumerOne ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerOne ==> DiDrDe.Model
  • DiDrDe.ConsumerTwo ==> DiDrDe.Contracts
  • DiDrDe.ConsumerTwo ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerTwo ==> DiDrDe.Model

注意,DiDrDe.MessageBus对DiDrDe.Model一无所知,因为它是一个通用项目,适用于任何消息类型。

为了实现这一点,我正在实施适配器模式,以便我的自定义接口IEventDtoBus(用于发布事件)和IEventDtoHandler(用于消费事件)是所有发布者和消费者所知道的。MassTransit包装项目(称为DiDrDe.MessageBus)使用适配器实现了这些,其中包括一个由IEventDtoBus和EventDtoHandlerAdapter组成的EventDtoBusAdapter,以及一个由IEventDtoHandler组成的唯一泛型IConsumer。
我遇到的问题是MassTransit要求消费者在注册时必须知道其类型,而我的消费者是一个泛型消费者,其类型在编译时不应该被MassTransit包装器所知道。
我需要找到一种方法,在运行时将EventDtoHandlerAdapter注册为每个传递的TEventDto类型的消费者(例如作为类型集合)。请查看我的存储库以获取所有详细信息。
MassTransit支持一个重载方法,它接受一个类型(很好!正是我想要的),但它还需要一个第二个参数Func<type, object> consumerFactory,我不知道如何实现它。 更新1: 问题是我无法像这样注册这个泛型消费者:
consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>();

因为我遇到了编译错误

严重程度 代码 描述 项目 文件 行 抑制状态 错误 CS0310 'EventDtoHandlerAdapter' 必须是一个非抽象类型,具有公共的无参数构造函数,才能在泛型类型或方法'ConsumerExtensions.Consumer(IReceiveEndpointConfigurator, Action<IConsumerConfigurator>)'中使用它作为参数'TConsumer' DiDrDe.MessageBus C:\src\DiDrDe.MessageBus\DiDrDe.MessageBus\IoCC\Autofac\RegistrationExtensions.cs

更新2:我尝试了几种方法,并在我的存储库上更新了项目。这些是我在MassTransit包装器项目中的尝试。请注意,如果我为每个我想处理的消息(事件)添加一个依赖项,我就能使所有东西都正常工作。但我不想要那样..我不希望这个项目知道任何关于它可以处理的消息。如果我只能注册消费者,只知道消息类型就好了..

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    //THIS WORKS
    var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
    consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));

    // DOES NOT WORK
    //var typeEventDtoHandler = typeof(IEventDtoHandler<>).MakeGenericType(typeof(ThingHappened));
    //var eventDtoHandler = context.Resolve(typeEventDtoHandler);
    //consumer.Consumer(eventDtoHandler);

    // DOES NOT WORK
    //consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);

    // DOES NOT WORK
    //var consumerGenericType = typeof(IConsumer<>);
    //var consumerThingHappenedType = consumerGenericType.MakeGenericType(typeof(ThingHappened));
    //consumer.Consumer(consumerThingHappenedType,  null);
});

更新3:根据Igor的建议,我尝试做以下事情:

var adapterType = typeof(EventDtoHandlerAdapter<>).MakeGenericType(typeof(ThingHappened));
                            consumer.Consumer(adapterType, (Type x) => context.Resolve(x));

但是我遇到了一个运行时错误,错误信息如下:
请求的服务'DiDrDe.MessageBus.EventDtoHandlerAdapter`1[[DiDrDe.Model.ThingHappened, DiDrDe.Model, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]'未注册。为了避免这个异常,您可以注册一个组件来提供该服务,使用IsRegistered()方法检查服务是否已注册,或者使用ResolveOptional()方法来解决可选依赖。
我甚至尝试将EventDtoHandlerAdapter<>单独注册为IConsumer,以防这是问题所在,但是没有成功。
builder
    .RegisterGeneric(typeof(EventDtoHandlerAdapter<>))
    .As(typeof(IConsumer<>))
    .SingleInstance();

还有:
builder
  .RegisterType<EventDtoHandlerAdapter<ThingHappened>>()
  .AsSelf();

而且它告诉我

System.ObjectDisposedException:'此解析操作已经结束。在使用lambda注册组件时,lambda表达式中的IComponentContext 'c'参数不能被存储。相反,要么再次从'c'解析IComponentContext,要么解析基于Func<>的工厂来创建后续组件

只是为了澄清,我只需要注册我的EventDtoHandlerAdapter<TEventDto>消费者。它是泛型的,所以对于我支持的每个TEventDto,都会存在一个注册。问题是我不需要提前知道类型,所以我需要操作类型。

更新4:根据Igor的建议,尝试了一种新的方法。这次使用了“代理”。我已经在我的存储库中更新了最新尝试的所有细节。 我有我的消费者和标记接口:

public interface IEventDtoHandler
{
}

public interface IEventDtoHandler<in TEventDto>
    : IEventDtoHandler
        where TEventDto : IEventDto
{
    Task HandleAsync(TEventDto eventDto);
}

我有自己的一个消费者实现,对MassTransit一无所知。
public class ThingHappenedHandler
    : IEventDtoHandler<ThingHappened>
{
    public Task HandleAsync(ThingHappened eventDto)
    {
        Console.WriteLine($"Received {eventDto.Name} " +
                            $"{eventDto.Description} at consumer one that uses an IEventDtoHandler");
        return Task.CompletedTask;
    }
}

现在我的“包装消费者”是我称之为适配器的东西,因为它了解MassTransit(它实现了MassTransit的IConsumer接口)。
public class EventDtoHandlerAdapter<TConsumer, TEventDto>
    : IConsumer<TEventDto>
        where TConsumer : IEventDtoHandler<TEventDto>
        where TEventDto : class, IEventDto
{
    private readonly TConsumer _consumer;

    public EventDtoHandlerAdapter(TConsumer consumer)
    {
        _consumer = consumer;
    }

    public async Task Consume(ConsumeContext<TEventDto> context)
    {
        await _consumer.HandleAsync(context.Message);
    }
}

现在最后一步是将我的“包装消费者”注册到MassTransit。但由于它是泛型的,我不知道该如何做。这是问题所在。
我可以按照建议,将所有我的消费者类型扫描并在Autofac中注册。
var interfaceType = typeof(IEventDtoHandler);
var consumerTypes =
    AppDomain.CurrentDomain.GetAssemblies()
        .SelectMany(x => x.GetTypes())
        .Where(x => interfaceType.IsAssignableFrom(x)
                    && !x.IsInterface
                    && !x.IsAbstract)
        .ToList();

现在我有所有的消费者类型(所有实现了的实现,包括我的)。现在怎么办?如何注册它?
类似以下的方法不起作用
foreach (var consumerType in consumerTypes)
{
    consumer.Consumer(consumerType, (Type x) => context.Resolve(x));
}

但我猜这是正常的,因为我想要注册的是我的EventDtoHandlerAdapter,它是真正的IConsumer。
所以,我想我没有理解你的建议。对不起!
我需要的是像这样的东西:
//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<IEventDtoHandler<ThingHappened>, ThingHappened>(eventDtoHandler));

但是不能使用ThingHappened模型,因为模型不应该被知道。这就是我卡住的地方。
更新5:根据Chris Patterson的建议进行了新的尝试(他的解决方案已合并到我的存储库中),但问题仍然存在。
澄清一下,DiDrDe.MessageBus必须对任何发布者、消费者和模型都是不可知的。它只应该依赖于MassTransit和DiDrDe.Contracts,而Chris的解决方案中有一行代码:
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
});

这个直接依赖于ThingHappened模型。这是不允许的,实际上与我已经有的解决方案没有太大区别,那个解决方案是:
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    //THIS works, but it uses ThingHappened explicitly and I don't want that dependency
    var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
    consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
});

抱歉如果这不够清楚,但是DiDrDe.MessageBus最终将成为一个nuGet包,被许多不同的消费者和发布者项目共享,并且不应该依赖于任何特定的消息/模型。
更新6: 问题已解决。非常感谢Igor和Chris的时间和帮助。 我已将解决方案推送到我的存储库的主分支。
附注:不幸的是,当我在同一个消费者中有两个处理程序处理相同的事件时,这个解决方案有其局限性,因为似乎只有一个处理程序被执行(两次)。我希望两个处理程序都被执行,或者只有一个处理程序被执行一次(而不是两次)。但这已经是另一个话题了 :)

你有研究过 builder.RegisterGeneric 吗? - cl0ud
是的,我有。不确定RegisterGeneric如何与此配合。请参见更新2。 - diegosasw
@iberodev,我也在寻找类似的方法。我可以使用你的GitLab项目吗?它是否完整或存在任何已知问题?如果有,你能否更新最新的代码呢? - bommina
1
@venkat.bommina 可以随意克隆我的存储库并根据您的需求进行调整。 - diegosasw
谢谢@diegosasw,我试过了,它有效了,现在只有几个小问题,我需要将masstransit总线配置代码和服务集合代码分开。但我还会研究这种方法。 - undefined
显示剩余2条评论
2个回答

11
请查看此处的自定义消费者约定:https://github.com/MassTransit/MassTransit/blob/develop/tests/MassTransit.Tests/Conventional/ConventionConsumer_Specs.cs。您需要创建自己的IMyConsumerInterface和IMyMessageInterface,并将其插入到该测试中的代码中。在创建总线之前,注册它ConsumerConvention.Register<CustomConsumerConvention>();。它应该可以工作。
此外,您还可以创建自己的包装器,将其与消息一起传递。 LoadFrom(MassTransit.AutofacIntegration)对于我来说与自定义约定不起作用,因此我必须手动注册消费者。
foreach (var consumer in consumerTypes)
  cfg.Consumer(consumer, (Type x) => _container.Resolve(x));

或者,如果你想采用“代理”方法,请按以下方式操作:

public class WrapperConsumer<TConsumer, TMessage> : IConsumer<TMessage>
    where TMessage : class, IMyMessageInterface 
    where TConsumer : IMyConsumerInterface<TMessage>
{
    private readonly TConsumer _consumer;

    public WrapperConsumer(TConsumer consumer)
    {
        _consumer = consumer;
    }

    public Task Consume(ConsumeContext<TMessage> context)
    {
        return _consumer.Consume(context.Message);
    }
}

...

// create wrapper registrations
cfg.Consumer(() => new WrapperConsumer<MyConsumer, MyMessage>(new MyConsumer()));

// marker interface
public interface IMyConsumerInterface
{
}

public interface IMyConsumerInterface<T> : IMyConsumerInterface
    where T : IMyMessageInterface 
{
    Task Consume(T message);
}

...

builder.RegisterAssemblyTypes(ThisAssembly)
           .AssignableTo<IMyConsumerInterface>()
           .AsSelf()
           .As<IMyConsumerInterface>();

...

var interfaceType = typeof(IMyConsumerInterface);
var consumerTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
    .Where(x => interfaceType.IsAssignableFrom(x) && !x.IsInterface && !x.IsAbstract)
    .ToList();

builder.Register(context =>
{
    var ctx = context.Resolve<IComponentContext>();
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
        {
            foreach (var adapterType in adapterTypes)
                consumer.Consumer(adapterType, (Type type) => ctx .Resolve(adapterType));
        });
    });
    return busControl;
})

我的回答是关于完全改变你的设计,并使用自定义消费者约定来抽象masstransit。如果你想要替代方案,我会更新我的回答。 - Igor Dražić
谢谢您的时间,但我仍然不明白您的答案。我尝试了“代理”方法。使用标记接口标记我的通用消费者接口,有助于注册所有消费者实现。但不幸的是,这不是问题所在。问题是,一旦您拥有所有的consumerTypes,如何在MassTransit中注册它们?仅有类型是不够的。请查看我的UPDATE 4,再次感谢! - diegosasw
2
(关于更新5,无法评论Chris的答案)你已经接近成功了,只需使用foreach (var adapterType in adapterTypes) consumer.Consumer(adapterType, (Type type) => ctx.Resolve(adapterType));代替consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);。你不需要模型,只需要契约。删除模型并添加契约即可。 - Igor Dražić
1
你真的不应该将每个处理程序都放在同一个队列中,它们是无关的命令或事件,而且从单个队列处理所有这些内容最终会成为一个问题,随着负载的增加。 - Chris Patterson
那么你的意思是每个队列基本上只应存储一种类型的消息? 我认为每个微服务都可以使用一个队列,但可以从该队列处理多种消息类型,尽管每种消息类型最多只有一个处理程序(消费者)也是有道理的。 谢啦。 - diegosasw
显示剩余3条评论

2
我向您的问题提交了一个拉取请求,它对我有效,消费者没有任何问题。
如果需要,您可以扩展它以包括所有消费者在其自己的端点上的自动注册。
诀窍是使用正确的通用接口类型正确地注册处理程序类型。 有点类型机制,但在我的端上可以工作。

谢谢Chris。已合并到主分支。我看到你的解决方案唯一的问题是DiDrDe.MessageBus依赖于DiDrDe.Model,而这不符合我的要求。最终,DiDrDe.MessageBus将成为一个独立的NuGet包,它不知道任何模型。只知道DiDrDe.ContractsMassTransit。因此,consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);这行代码仍然有问题。 - diegosasw
问题依旧。请参见更新5。实际上,根据更新4,我已经有了一个不充分的“解决方案”,在那里它是可行的,但以 DiDrDe.MessageBus 直接依赖于 DiDrDe.Model 为代价,而我不想这样做。 - diegosasw
1
在你的回答指引下,Chris 指出正确方向后,我已经解决了这个问题。谢谢! - diegosasw
@diegosasw,你有解决它的示例吗? - kuldeep
1
可能是在这里注册自己的约定:https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.Tests/Conventional/ConventionConsumer_Specs.cs - Chris Patterson
2
@kuldeep 我的代码库位于 https://gitlab.com/DiegoDrivenDesign/DiDrDe.MessageBus/-/tree/master/DiDrDe.MessageBus.Infra.MassTransit.Autofac。 - diegosasw

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接