确保线程安全

6
我正在编写一个C# Windows窗体应用程序,通过算法(策略)处理市场行情并创建订单发送到经纪公司。一切看起来都很顺利,直到我尝试在每个线程上同时运行多个策略时,所有的东西开始运行不正确。我相信有一些类不是线程安全的,这导致了不稳定的行为。非常感谢您对如何以线程安全的方式进行线程化的任何见解!
行情被输入到算法中的方式如下: 1)经纪商软件触发市场数据事件到我的软件中称为ConnectionStatus的客户端类。当市场数据事件被触发时,从Bid、Ask等静态变量的当前值构建一个Quote对象。 一旦报价建立完成,我会尝试将其发送到正在运行的每个策略算法中。以下是我用来执行此操作的代码:
 foreach (StrategyAssembler assembler in StrategyAssembleList.GetStrategies())
 {                  
     BackgroundWorker thread = strategyThreadPool.GetFreeThread();
     if (thread != null)
     {
        thread.DoWork += new DoWorkEventHandler(assembler.NewIncomingQuote);
        thread.RunWorkerAsync(quote);
     }   
 }

StrategyAssembler是一个类,它创建了一个StrategyManager类的实例,而后者又创建了包含实际算法的策略的实例。可能会有4到6个不同的StrategyAssembler实例,每个实例都被添加到StrategyAssembleList的Singleton实例中,该实例是一个BindingList。

传入的报价对象被传递到StrategyAssembler类的NewIncomingQuote方法中。代码如下:

public void NewIncomingQuote(object sender, DoWorkEventArgs e)
    {
        Quote QUOTE = e.Argument as Quote;            

        lock (QuoteLocker)
        {
            manager.LiveQuote(QUOTE);

            priorQuote = QUOTE;
        }
    }

我认为,在将报价传递到manager.LiveQuote(Quote quote)方法之前使用锁定,可以使得“下游”使用该报价的所有对象以线程安全的方式进行消费,但测试结果表明并非如此。有没有一种方式可以将每个StrategyAssembler实例放在自己的线程中,以确保由Strategy Assembler创建的所有对象都是线程安全的,然后将报价馈送到StrategyAssembler?这种思路是否是处理这种情况的适当方式?
感谢您提前给予任何反馈或帮助。
Learning1

  1. manager.LiveQuote()是做什么的?它是您的策略方法的接口吗?
  2. QuoteLockerStrategyAssembler实例成员吗?
  3. 最后,您期望什么样的行为,现在又看到了什么?
- Jeff Sternal
manager.Livequote() 是一个方法,它可以 a) 将报价添加到由指标类、策略算法类和订单填充模拟类引用的列表中;b) 将报价传递给策略算法;c) 管理策略中的订单对象,以模拟订单填充/取消/通信延迟等操作。 - Learning1
  1. QuoteLocker是一个静态对象 静态对象QuoteLocker = new object();
- Learning1
  1. 就行为而言,我期望每个到达的报价都会被添加到每个StrategyAssembler实例中,然后再添加到manager.LiveQuote中。从那里,策略通过其算法来做出买入/卖出/不操作的决策。每个Assembler实例只有一个StrategyManager实例,每个StrategyManager实例只有一个Strategy类,它将报价传递进去并管理订单。我看到的行为是,一个特定的Assembler实例中可能会将单个报价添加1次、2次、3次或4次,而在另一个Assembler实例中则根本没有添加。
- Learning1
你尝试过这些建议中的任何一个吗? - Kiril
你找到了一种处理多线程的线程安全方式了吗? - ptn77
3个回答

1

在访问任何共享状态时,读写操作都应该进行锁定。如果读取操作没有加锁,那么代码仍然可以并发地读取和写入。

您可以将读写锁包装成管理器。


感谢您的反馈,spender。您是在说每个读取报价对象的“下游”代码点都需要锁定吗?(有很多地方和对象下游使用每个报价的值)。 - Learning1
将锁包装到管理器中是一种危险的方式。例如,如果您有一个(已锁定的)读取和一个(已锁定的)写入,这感觉很安全。但是,如果您执行了锁定的读取(),增量,锁定的写入(),仍然会违反您的临界区。锁应该围绕数据的总使用情况,从而实现对数据的原子修改。 - Adriaan
在这种情况下最好复制引用并避免锁定期...事实上适当的锁定会使他的策略顺序运行,这将使多线程努力变得完全无用,并且由于锁定+上下文切换的开销更加计算密集。 - Kiril

0

你的代码中发生了两件事情:
1. 你从一个线程(生产者,也就是市场数据源)接收到了一份报价。
2. 你将这份报价发送给另一个线程(消费者,也就是策略组装器)。

此时,报价存在争用,换句话说,生产者线程和每个消费者线程(即每个策略实例)都可以修改你刚提供的报价。为了消除争用,你必须采取以下三种方法之一:

  1. 在所有访问报价的线程之间进行同步。
    或者
  2. 使报价不可变(并确保生产者不会替换它)。
    或者
  3. 给每个消费者一份报价的副本。

对于你的情况,我建议你选择第三个选项,因为锁定比复制报价更昂贵(希望你的报价不是非常大)……第二个选项也不错,但你的策略不应该修改报价。

通过给每个消费者一份报价的副本,你确保他们不共享任何数据,因此没有其他线程会修改报价,你将消除争用。如果你的策略没有创建任何其他线程,那么你就完成了。

一般来说,应避免锁定并尽量减少数据共享,但如果您必须在线程之间共享数据,则应正确地执行以下操作:
为使您的策略正确同步,它们必须同步于相同的QuoteLocker对象上,即每个线程都必须可以看到QuoteLocker。 即使您正确地执行了同步(在QuoteLocker上锁定),则您也可能没有使用线程......您将运行上下文切换+锁定的开销,您的策略将按顺序执行同一引用。

根据评论更新: 如果您将代码保留为原样(即为每个线程提供引用副本),那么我不明白为什么您的其他策略在第一个策略完成之前不会获取引用......您的第一个策略很可能在其他策略的线程被创建时开始工作。 把您的策略运行在一个单独的线程中的整个意义就是要避免这种问题......你启动一个新线程,这样你的其他策略就不必等待彼此完成。

代码的这部分很可能在所有线程开始工作之前就已经完成了......

foreach (StrategyAssembler assembler in StrategyAssembleList.GetStrategies())
 {                  
     BackgroundWorker thread = strategyThreadPool.GetFreeThread();
     if (thread != null)
     {
        thread.DoWork += new DoWorkEventHandler(assembler.NewIncomingQuote);
        Quote copy = CopyTheQuote(quote);// make an exact copy of the quote
        thread.RunWorkerAsync(copy);
     }   
 }

在您创建线程时,您的市场数据源是否会更改实际报价?市场数据源通常提供快照,因此除非在您创建线程时有什么更改您的报价,否则上述设计应该是完全正常的。如果设计存在问题,那么我可以为您提供基于阻塞队列的生产者和多个消费者设计,这也非常高效(您可以查看此讨论以了解其工作原理),并且我可以告诉您如何针对您的具体示例进行修改。


是的,预防胜于治疗。如果您的线程可以独立运行(选项3),那么这将是最安全和最佳性能的选择。由于您无法保证第一个引用完全被处理,因此您可能需要在副本中添加(高分辨率)时间戳。 - Adriaan
感谢反馈,Lirik。这一切都很有道理。对于你提到的原因和其他原因,选项对我来说是最明智的选择。关于制作副本并将其传递给每个汇编器实例,我认为最好使用事件,以便所有汇编器同时获得他们的报价副本,然后处理它们的逻辑。否则,如果它留在循环中,直到第一个完成其逻辑(约100ms,而报价可以在50ms内到达),以下策略将无法获取报价。 - Learning1
如果我将引用放在每个汇编程序订阅的事件中,那么最好的方法是什么,可以使每个汇编程序在一个线程上运行,以便我可以利用CPU上的更多核心?目前,这些汇编程序和它们各自创建的策略算法都在主线程上运行。 - Learning1
谢谢你的见解Lirik,我会尝试这个版本,并查看它如何与报价副本一起工作。就市场变化而言,上面的代码片段位于BuildQuote()方法内。每次BidPrice、BidVolume、AskPrice或AskVolume发生更改时,都会调用此方法。每次调用时,都会使用由经纪人数据事件更改的静态变量创建新的报价。我不知道生产者和多个消费者设计是否是更好的方法。我会看看你发布的链接 - 谢谢! - Learning1
你创建了一个新的报价,但是你是否替换了当前用于初始化线程的报价? - Kiril

0

如果:

1)策略通过LiveQuote方法被调用并可以修改Quote实例。

2)对Quote实例的更改不应在策略之间共享。

在调用LiveQuote()之前,您需要创建提供的Quote的副本,并将副本发送到策略方法中,而不是原始报价。根据其他要求,您可能根本不需要任何锁定。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接