如何在线程之间传递数据?

3
.NET中在线程之间传递数据的方法有哪些?目前我能想到的有两种: 1. 成员变量,例如使用生产者-消费者队列模式。 2. 在启动线程时使用 ParameterizedThreadStart 委托。(仅适用于一次性使用,不适合长时间运行的后台工作者线程。) .NET框架解决此问题的设施是什么?也许.NET已经实现了通用的生产者-消费者模式?我是否可以以某种方式使用Thread.GetData和Thread.SetData?

1
你是在谈论在线程启动时传递数据,还是在线程之间进行持续对话,或者在线程关闭时传递数据? - Ed Power
4个回答

阿里云服务器只需要99元/年,新老用户同享,点击查看详情
7
作为 Ash 解决方案的替代方案,考虑以下示例。 假设您有两个线程 - 一个用于从套接字接收数据包,另一个用于处理这些数据包。显然,接收线程需要通知处理器线程何时有可用的数据包进行处理,因此数据包需要以某种方式在线程之间共享。我通常使用共享数据队列来实现这一点。 同时,我们不一定想将线程紧密耦合在一起。例如,接收线程甚至不应该知道处理器线程的存在。接收器只需要专注于从网络接收数据包,然后通知任何感兴趣的订阅者数据包可用于处理。在 .NET 中,事件是实现此目的的完美方式。 下面是一些代码。
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
public class Packet
{
    public byte[] Buffer { get; private set; }
    public Packet(byte[] buffer)
    {
        Buffer = buffer;
    }
}
public class PacketEventArgs : EventArgs
{
    public Packet Packet { get; set; }
}
public class UdpState
{
    public UdpClient Client{get;set;}
    public IPEndPoint EndPoint{get;set;}
}
public class Receiver
{
    public event EventHandler<PacketEventArgs> PacketReceived;
    private Thread _thread;
    private ManualResetEvent _shutdownThread = new ManualResetEvent(false);
    public void Start() { _thread.Start(); }
    public void Stop() { _shutdownThread.Set(); }
    public Receiver()
    {
        _thread = new Thread(
            delegate() {
                // Create the client UDP socket.
                IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 5006);
                UdpClient client = new UdpClient( endPoint );
                // Receive the packets asynchronously.
                client.BeginReceive(
                    new AsyncCallback(OnPacketReceived),
                    new UdpState() { Client = client, EndPoint = endpoint });
                // Wait for the thread to end.
                _shutdownThread.WaitOne();
            }
        );
    }
    private void OnPacketReceived(IAsyncResult ar)
    {
        UdpState state = (UdpState)ar.AsyncState;
        IPEndPoint endPoint = state.EndPoint;
        byte[] bytes = state.Client.EndReceive(ar, ref endPoint);
        // Create the packet. 
        Packet packet = new Packet(bytes);
        // Notify any listeners.
        EventHandler<PacketEventArgs> handler = PacketReceived;
        if (handler != null) {
            handler(this, new PacketEventArgs() { Packet = packet });
        }
        // Read next packet.
        if (!_shutdownThread.WaitOne(0)) {
            state.Client.BeginReceive(
                new AsyncCallback(OnPacketReceived),
                state);
        }
    }
}
public class Processor
{
    private Thread _thread;
    private object _sync = new object();
    private ManualResetEvent _packetReceived = new ManualResetEvent(false);
    private ManualResetEvent _shutdownThread = new ManualResetEvent(false);
    private Queue<Packet> _packetQueue = new Queue<Packet>(); // shared data
    public void Start() { _thread.Start(); }
    public void Stop() { _shutdownThread.Set(); }
    public Processor()
    {
        _thread = new Thread(
            delegate() {
                WaitHandle[] handles = new WaitHandle[] {
                    _shutdownThread,
                    _packetReceived
                };

                while (!_shutdownThread.WaitOne(0)) {
                    switch (WaitHandle.WaitAny(handles)) {
                        case 0: // Shutdown Thread Event
                            break;
                        case 1: // Packet Received Event
                            _packetReceived.Reset();
                            ProcessPackets();
                            break;
                        default:
                            Stop();
                            break;
                    }
                }
            }
        );
    }
    private void ProcessPackets()
    {
        Queue<Packet> localPacketQueue = null;
        Queue<Packet> newPacketQueue = new Queue<Packet>();
        lock (_sync) {
            // Swap out the populated queue with the empty queue.
            localPacketQueue = _packetQueue;
            _packetQueue = newPacketQueue;
        }

        foreach (Packet packet in localPacketQueue) {
            Console.WriteLine(
                "Packet received with {0} bytes",
                packet.Buffer.Length );
        }
    }
    public void OnPacketReceived(object sender, PacketEventArgs e)
    {
        // NOTE:  This function executes on the Receiver thread.
        lock (_sync) {
            // Enqueue the packet.
            _packetQueue.Enqueue(e.Packet);
        }

        // Notify the Processor thread that a packet is available.
        _packetReceived.Set();
    }
}
static void Main()
{
    Receiver receiver = new Receiver();
    Processor processor = new Processor();

    receiver.PacketReceived += processor.OnPacketReceived;

    processor.Start();
    receiver.Start();

    Thread.Sleep(5000);

    receiver.Stop();
    processor.Stop();
}

我知道这里有很多需要消化的内容。只要在端口5006上有UDP流量,程序应该可以在.NET 3.5中运行。

至于线程之间的数据共享,感兴趣的点是Processor类的ProcessPackets()和OnPacketReceived()方法。请注意,即使该方法是Processor类的一部分,OnPacketReceived()方法仍发生在接收器线程上,并且队列使用同步对象进行同步。


2
虽然这不是一个内置的解决方案,但你可以创建一个包含私有“sync”对象的类。然后,在属性和方法调用中,使用锁定语句在同步对象上以确保序列化访问。 例如:
class DataClass{
    private object m_syncObject=new object();

    private string m_data;

    public string Data
    {
        get{
            lock(m_syncobject)
            {
                return m_data;
            }
        }
        set{
           lock(m_syncobject)
           {
                m_data=value;
           }

        }
    }
} 
在一个线程上创建DataClass()的实例,然后将此实例传递给第二个或更多线程。在需要时访问线程安全的Data属性,在线程之间传递/接收数据。

你应该非常谨慎地使用这样的线程安全类。 有关详细信息,请参见我下面的答案(注释不允许足够的字符...)或搜索“线程安全列表”。 - Janis

1

请看这里,其中一些回复可能会回答您的问题。


0
关于Ash的解决方案: 这种“线程安全”的数据类(我称之为“伪线程安全”),特别是如果它们具有不同的成员,问题在于这些成员可能会在线程安全调用之间发生更改。 这适用于所有多成员类,但在所有枚举(列表、数组)中尤其成为问题,因为它使得使用像“.Count”这样的函数实际上变得不可能(详见谷歌)。 例如:
class pseudoThreadsafeHuman{
   private object m_syncobject;
   public string firstName;
   public string lastName;

   public string fullName
   get{
        lock(m_syncobject)
        {
            return lastName & "," & firstName;
        }
    }
    set{
       lock(m_syncobject)
       {
            lastName = value.Split(",")[1];
            firstName = value.Split(",")[2];
       }
    }
}

有人可能会尝试使用类似这样的代码:

public void isJohn(pseudoThreadSafeHuman currentPerson) {
    if currentPerson.firstName == "John"  
       {
         MessageBox.Show(currentPerson.fullName)
       }
}
成员变量firstName、lastName和fullName都是线程安全的。但是,由于值在if语句和MessageBox.Show()之间可能会发生更改,因此可能会打印出与"John"不同的内容。 另一个例子:像getInitials(pseudoThreadSafeHuman currentPerson)这样的东西可能会抛出异常:
public getInitials(pseudoThreadSafeHuman currentPerson)
   string initials = ""
   if currentPerson.firstName != "" {
     initials += currentPerson.firstName[0];  // crash here if firstName got changed to ""
   }
   if currentPerson.lastName != "" {
      initials += currentPerson.lastName[0];  // crash here if lastName got changed to ""
   }
}

这些是错误代码使用的非常愚蠢的例子。此外,我对C#不太熟悉(我自己使用VB.Net),因此语法可能完全不正确。但我想你明白我的意思。所以在我看来,线程安全类会导致编程错误,最好只使用经典的同步锁(这对其他程序员来说也更易读)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,