如何异步处理消息,在处理时丢弃任何新的消息?

3
我有一个C#应用程序,订阅我们的消息系统上的主题以获取值更新。当新值到达时,我会进行一些处理,然后继续执行。问题是,更新可能比应用程序处理它们的速度更快。我想做的就是只保留最新的值,所以我不想要一个队列。例如,源发布值“1”,我的应用程序接收它;在处理过程中,源发布序列(2、3、4、5)在我的应用程序完成处理之前;我的应用程序随后处理值“5”,之前的值被丢弃。
由于这基于专有的消息库,因此很难发布工作代码示例,但我认为这是一种常见模式,我只是想不出它叫什么...似乎处理函数必须在与消息回调不同的线程上运行,但我不确定如何组织这个线程,例如如何通知该线程值已更改。您需要做什么的一般提示?
5个回答

2

一个非常简单的方法可能是这样:

private IMessage _next;

public void ReceiveMessage(IMessage message)
{
    Interlocked.Exchange(ref _next, message);
}

public void Process()
{
    IMessage next = Interlocked.Exchange(ref _next, null);

    if (next != null)
    {
        //...
    }
}

+1,但如果我理解问题正确的话,第一种方法更像是一个接收者而不是发送者。 - Jeff Sternal
谢谢,我明白大意了,但是Process()如何被调用呢?它必须在其他线程中以某种循环方式运行,还是有一种方法在收到新消息时调用它,如果它还没有处理先前的值的话? - toasteroven
你至少需要两个线程:一个线程用于接收消息,另一个线程用于处理它们。<br />在某些情况下,接收线程也可以是另一个进程,将它们写入数据库等。<br /> 为了通知处理线程,您可以使用在ReceiveMessage中设置的AutoResetEvents。 - Matthias

1
一般来说,我们使用消息系统来防止消息丢失。我的初步解决方案是创建一个线程来接收传入的数据,并尝试将其传递给处理线程。如果处理线程已经在运行,则丢弃数据并等待下一个元素并重复此过程。

0
一个简单的方法是使用一个成员变量来保存最后接收到的值,并使用锁将其包装起来。另一种方法是将传入的值推入堆栈中。当你准备好获取一个新值时,调用Stack.Pop()然后Stack.Clear()。
public static class Incoming
{
    private static object locker = new object();
    private static object lastMessage = null;

    public static object GetMessage()
    {
        lock (locker)
        {
            object tempMessage = lastMessage;
            lastMessage = null;
            return tempMessage;
        }
    }
    public static void SetMessage(object messageArg)
    {
        lock (locker)
        {
            lastMessage = messageArg;
        }
    }

    private static Stack<object> messageStack = new Stack<object>();
    public static object GetMessageStack()
    {
        lock (locker)
        {
            object tempMessage = messageStack.Count > 0 ? messageStack.Pop() : null;
            messageStack.Clear();
            return tempMessage;
        }
    }
    public static void SetMessageStack(object messageArg)
    {
        lock (locker)
        {
            messageStack.Push(messageArg);
        }
    }
}

将处理函数放在单独的线程上是个好主意。可以使用回调方法从处理线程发出信号,表示它已准备好接收另一条消息;或者让它发出信号表示处理完成,然后当收到消息(通过上述 SetMessage...)时,主线程启动一个新的处理器线程。

0

这不是一个“模式”,但您可以使用共享数据结构来保存值。如果从消息库中只接收到一个值,则可以使用简单的对象。否则,您可能需要使用哈希表来存储多个消息值(如果需要)。

例如,在消息接收线程上:当消息到达时,使用其值添加/更新数据结构。在线程侧,您可以定期检查此数据结构,以确保仍具有相同的值。如果没有,则丢弃已经完成的任何处理,并使用新值重新处理。

当然,您需要确保数据结构在线程之间得到正确同步。


0

显然,消息库的设计可以影响处理此问题的最佳方式。我过去使用类似功能库时的做法是,有一个监听事件的线程将其放入队列中,然后我有线程池工作程序出队消息并处理它们。

您可以了解多线程异步作业队列:

多线程作业队列

工作队列线程化


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接