如何为Azure服务编写MassTransit Json反序列化器

5

这是我将对象发布到事件网格的方式。 我希望能够使用Azure服务总线来监听它。

        public void Publicar<T>(T model, string operation, string entity)
    {
        _nomeEvento = entity + operation;

        Boolean.TryParse(Configuration["EventGridConfig:Enabled"], out var eventGridIsActive);
        if (!eventGridIsActive)
            return;

        var primaryTopicKey = Configuration["EventGridConfig:AcessKey"];
        var primaryTopic = Configuration["EventGridConfig:Endpoint"];

        var primaryTopicHostname = new Uri(primaryTopic).Host;

        var topicCredentials = new TopicCredentials(primaryTopicKey);
        var client = new EventGridClient(topicCredentials);

        client.PublishEventsAsync(primaryTopicHostname, GetEventsList(model)).GetAwaiter().GetResult();
    }

    private List<EventGridEvent> GetEventsList<T>(T model)
    {
        return new List<EventGridEvent>
        {
            new EventGridEvent()
            {
                Id = Guid.NewGuid().ToString(),
                EventType = _nomeEvento,
                Data = model,
                EventTime = DateTime.Now,
                Subject = "MS_Clientes",
                DataVersion = "1.0",
            }
        };
    }

这就是我连接到服务总线的方式

    static class CustomExtensionsMethods
{
    public static IServiceCollection AddBus(this IServiceCollection services, IConfiguration configuration,
        IHostingEnvironment env)
    {
        services.AddMassTransit(x => { x.AddConsumer<NomeEmailChangeConsumer>(); });
        services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
            var keyName = "RootManageSharedAccessKey";
            var busName = configuration["ServiceBus:Name"];
            var secret = configuration["ServiceBus:Secret"];
            var host = cfg.Host(
                "Endpoint=sb://" + busName + ".servicebus.windows.net/;" +
                "SharedAccessKeyName=" + keyName + ";" +
                "SharedAccessKey=" + secret,
                z =>
                {
                    TokenProvider
                        .CreateSharedAccessSignatureTokenProvider(keyName, secret);
                });
            cfg.ConfigureJsonSerializer(settings =>
            {
                settings.Converters.Add(new InterfaceConverter());

                return settings;
            });
            cfg.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
            cfg.ReceiveEndpoint(host, configuration["ServiceBus:Topic"],
                e => { e.Consumer<NomeEmailChangeConsumer>(provider); });
        }));
        services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
        services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
        services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
        services.AddScoped(provider => provider.GetRequiredService<IBus>().CreateRequestClient<NomeEmailChange>());
        services.AddSingleton<IHostedService, BusService>();
        return services;
    }
}

但后来我依旧遇到了相同的错误

    fail: MassTransit.Messages[0]
      R-FAULT sb://dev.servicebus.windows.net/bff-queue 9ade19ec-238c-4c08-8e03-28bac695ea7b No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
System.Runtime.Serialization.SerializationException: No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
   at MassTransit.Serialization.SupportedMessageDeserializers.Deserialize(ReceiveContext receiveContext)
   at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)

我尝试添加一个在网上找到的JsonConverter,但没有成功。

    public class InterfaceConverter : JsonConverter
    {
        public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
        {
            serializer.Serialize(writer, value);
        }

        public override object ReadJson(JsonReader reader, Type objectType, object existingValue,
            JsonSerializer serializer)
        {
            // Set TypeNameHandling to Auto for deserializing objects with $type
            // Should be set directly in ConfigureJsonDeserializer when setting up MT Service bus
            serializer.TypeNameHandling = TypeNameHandling.Auto;
            return serializer.Deserialize(reader);
        }

        public override bool CanConvert(Type objectType)
        {
            return objectType.IsInterface;
        }
    }

你的情况并不是很清楚。在你的代码示例中我看到了 MassTransit 但你没有在任何地方提到它。你是否有意使用它? - Vlad DX
我看到你正在尝试使用EventGridClient写入事件网格,并为服务总线进行一些MassTransit注册。但是它似乎不匹配,或者图片中缺少某些内容。 - Vlad DX
弗拉德,我正在尝试将服务总线用作事件网格订阅处理程序。这是完全有效的场景。我可以成功接收到事件,但我需要一种反序列化的方法。 - CESCO
我真的很想理解使用情况。我不知道MassTransit是什么。我看到你正在写application/json但读取application/vnd.masstransit+json。这应该有一个原因,但从你的问题中并不清楚。你能否详细说明一下? - Vlad DX
3个回答

10

我尝试了几个测试,并成功得到了可行的解决方案。在我的测试用例中,我将来自EventGridTopic的消息重定向到ServiceBusQueue,就像您的情况一样——如果我理解得好。

由于MassTransit要求以特定格式呈现的信息才能解释它们,因此我们需要确保具有以下内容:

  1. 事件网格消息的自定义反序列化器,它们的类型是EventGridEvent
  2. 确保所有需要被MassTransit消费的消息都具有ContentType ——没有这个,它将无法工作。

因此,我构建了一个示例,可以处理从EventGrid重定向的消息,也可以直接将其传输到服务总线。以下代码是实现事件网格消息反序列化器的示例:

public class EventGridMessgeDeserializer : IMessageDeserializer
    {
        private string _contentType;

        public EventGridMessgeDeserializer(string contentType)
        {
            _contentType = contentType;
        }
        public ContentType ContentType => new ContentType(_contentType);

        public ConsumeContext Deserialize(ReceiveContext receiveContext)
        {
            var body = Encoding.UTF8.GetString(receiveContext.GetBody());
            var customMessage = JsonConvert.DeserializeObject<EventGridEvent>(body);
            var serviceBusSendContext = new AzureServiceBusSendContext<EventGridEvent>(customMessage, CancellationToken.None);

            // this is the default scheme, that has to match in order messages to be processed
            // EventGrid messages type of EventGridEvent within namespace Microsoft.Azure.EventGrid.Models
            string[] messageTypes = { "urn:message:Microsoft.Azure.EventGrid.Models:EventGridEvent" };
            var serviceBusContext = receiveContext as ServiceBusReceiveContext;
            serviceBusSendContext.ContentType = new ContentType(JsonMessageSerializer.JsonContentType.ToString());
            serviceBusSendContext.SourceAddress = serviceBusContext.InputAddress;
            serviceBusSendContext.SessionId = serviceBusContext.SessionId;

            // sending JToken because we are using default Newtonsoft deserializer/serializer
            var messageEnv = new JsonMessageEnvelope(serviceBusSendContext, JObject.Parse(body), messageTypes);
            return new JsonConsumeContext(JsonSerializer.CreateDefault(), receiveContext, messageEnv);
        }

        public void Probe(ProbeContext context)
        {
        }
    }

这里的重要部分是在你的自定义反序列化器中指定消息类型。由于 MassTransit 需要一些格式并忽略不符合要求的消息,因此在这里我们需要指定该信息,以满足 MassTransit 的要求。

string[] messageTypes = { "urn:message:Microsoft.Azure.EventGrid.Models:EventGridEvent" }

这是默认方案,必须匹配才能处理消息。

最后,完整的代码可以在Github上找到:https://github.com/kgalic/MassTransitSample

附注:如果您直接将消息发送到SB队列并希望对其进行反序列化,如先前所述,您需要按以下方式指定ContentType:

var message = new Message(UTF8Encoding.UTF8.GetBytes(request));
message.ContentType = "application/json"; //must have
await _senderClient.SendAsync(message);

如果您有类似这样的情况,您需要编写与EventGridEvent类似的反序列化器,该类可以用作示例。


JsonConsumeContext 在 MassTransit 的后续版本中似乎不再可用 :( - Rob Vermeulen

3
MassTransit将消息封装在消息包裹中,而事件网格的消息不符合该格式,因此出现了错误。您设置的JsonSerializer设置已使用,但反序列化时预期的对象是MessageEnvelope,位于此行。我猜您有两种方法可以解决它:
  1. 创建并使用自定义反序列化器(类似于JsonMessageDeserializer),它将反序列化为一个简单的JObject或Message

    更新:在进一步尝试后,它看起来比我最初想象的要复杂得多,并且像Richard在其他答案中提到的那样,您最好使用Azure Service Bus客户端本身(甚至需要通用AMQP客户端)

  2. 触发Azure函数或逻辑应用程序,将事件网格有效负载包装到消息包裹中,使MassTransit能够对其进行反序列化,然后将其发送到服务总线队列中


0
我认为问题在于您试图将MassTransit作为客户端库来消费通用的JSON编码消息,而实际上它是一个有见解的应用程序框架,定义并要求使用特定的消息包结构来使用自己的半私有编码方案:

https://masstransittemp.readthedocs.io/en/latest/advanced/interop.html

要使用Mass Transit,您必须在发布和订阅组件中都使用该库,然后使用Azure Service Bus作为它们之间传递消息的“传输”。这些库将处理反序列化,并使用消息信封中的数据执行路由和异常处理。因此,“vnd.masstransit”编码的消息不适用于其他框架(如EventGrid)生产或消费。
从过去运行消息传递库选择流程的经验来看,我建议改用基于通用协议(例如AMQP 1.0)的客户端库(请参见https://github.com/Azure/amqpnetlite),并定义自己的路由和异常模式。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接