Azure Service Fabric 服务间通信

3
我目前有一个由多个服务组成的Service Fabric应用程序。我想实现的是一种排队机制,以便一个服务可以将消息发布到队列中,另一个服务可以从同一队列接收消息。
以下方式不起作用(对于侦听器服务,没有任何内容可出队):
PublisherService:
protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var myQueue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("fooQueue");
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();
        using (var tx = this.StateManager.CreateTransaction())
        {
            // Put some message in the queue
            await myQueue.EnqueueAsync(tx, "Foobar");

            await tx.CommitAsync();
        }

        await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
    }
}

ListenerService:

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var myQueue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("fooQueue");
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();
        using (var tx = this.StateManager.CreateTransaction())
        {
            var result = await myQueue.TryDequeueAsync(tx);

            if (result.HasValue)
            {
                ServiceEventSource.Current.ServiceMessage(this.Context, "New message receieved: {0}", result.Value.ToString());
            }

            await tx.CommitAsync();
        }

        await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
    }
}

看起来队列的范围似乎仅限于单个服务。这似乎不是文档中指定的限制。
因此,我的问题是:
- 这实际上是一些未记录的限制吗? - 或者上述代码有问题吗? - 我该如何实现上述情况(一个服务向队列添加消息,另一个服务从同一队列检索消息)?
显然,我可以使用 Azure 服务总线,但由于以下几个原因,我无法使用:
- 在我的实际场景中,我将有多个队列(数量可变),因此需要按需创建服务总线队列(这不是一项非常快速的操作) - 增加了对另一个 Azure 服务的依赖(因此增加了整个系统的故障概率) - 成本更高 - 部署更加复杂 - 等等。
3个回答

3

可靠队列是本地服务的一部分,因为其意图是存储该特定服务的状态。该状态会被复制到其他实例。它就像.Net中的普通System.Collections.Generic.Queue<T>

对于成本较低的解决方案,也许您可以使用Azure Storage Queues。是的,它增加了一个依赖项,但它具有高可用性。这是一个只有您可以决定是否接受的权衡。

另一方面,要打破常规思维:

创建一个带有多个ReliableQueues的有状态服务,并使用类似于标准远程通信的方法公开其他服务可以调用的方法:

class QueuingService
{
    void AddToQueue<T>(string queuename, T input) { .. }
    void DeQueue(string queuename) { .. }
}

这样做会产生依赖关系,但它具备 Service Fabric 提供的所有安全机制,并且成本不高。但是,您正在构建一个穷人版的服务总线/ Azure 存储队列。
关于文档,没有明确说明可靠队列与 1 个服务相关联,但这取决于您如何解释此处提到的“this”。
Service Fabric 为 .NET 开发人员提供了一种有状态编程模型,通过 Reliable Collections 实现。具体而言,Service Fabric 提供了可靠字典和可靠队列类。当您使用这些类时,“您的状态(我的理解:服务的状态)”被分区(以实现可伸缩性)、复制(以实现可用性)并在分区内进行事务处理(以实现 ACID 语义)。

谢谢您的回答,这很有道理。我还不确定我会选择哪种方式(Azure 存储队列或远程通信)。两种方式各有优缺点。再次感谢,这非常有帮助。 - ken2k
但是,再说一遍,你正在构建一个穷人版的服务总线/ Azure 存储队列。这意味着这不是使用 ReliableQueue 的正确方式吗? - AsValeO
1
@AsValeO,“适当的方式”是什么意思?如果您将其用于设计用于存储数据的目的,即在单个有状态服务内部使用,那么就有了适当的方式。您不能将其转化为其他东西,比如跨多个服务的队列。 - Peter Bons
我们可以在本地使用类似于Azure存储队列的东西吗?我们需要一种事件驱动/本地解决方案,其中一个服务将触发消息,并将其存储在某个队列中,另一个服务将从队列中弹出并处理它。 - Alex Gordon

1

1
如果您在所有调用代码中添加了故障处理重试模式,则不需要在调用之间添加队列,请参阅https://learn.microsoft.com/en-us/azure/service-fabric/service-fabric-reliable-services-communication
链接中的相关部分如下:
异常处理程序负责确定异常发生时要采取的操作。异常分为可重试和不可重试两类。 不可重试异常只需重新抛出给调用方即可。 可重试异常进一步分为瞬态和非瞬态两类。 瞬态异常是指可以简单地进行重试而无需重新解析服务端点地址的异常。这些将包括瞬态网络问题或除指示服务端点地址不存在以外的服务错误响应。 非瞬态异常是指需要重新解析服务端点地址的异常。这些包括指示无法到达服务端点的异常,表明服务已经移动到不同的节点的异常。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接