当客户端线程退出时,我该如何检测?

18

一个有趣的库编写者的困境。在我的库(在这种情况下是EasyNetQ)中,我正在分配线程本地资源。因此,当客户端创建一个新线程并调用我的库的某些方法时,会创建新资源。在EasyNetQ的情况下,当客户端在新线程上调用“Publish”时,将创建到RabbitMQ服务器的新通道。我想能够检测到客户端线程退出,以便我可以清理资源(通道)。

我能想到的唯一方法是创建一个新的“watcher”线程,它只是在客户端线程上阻塞Join调用。下面是一个简单的演示:

首先是我的“库”。它获取客户端线程,然后创建一个新线程,在该线程上阻塞“Join”:

public class Library
{
    public void StartSomething()
    {
        Console.WriteLine("Library says: StartSomething called");

        var clientThread = Thread.CurrentThread;
        var exitMonitorThread = new Thread(() =>
        {
            clientThread.Join();
            Console.WriteLine("Libaray says: Client thread existed");
        });

        exitMonitorThread.Start();
    }
}

这是一个使用我的库的客户端。它创建了一个新线程,然后调用了我的库的StartSomething方法:

public class Client
{
    private readonly Library library;

    public Client(Library library)
    {
        this.library = library;
    }

    public void DoWorkInAThread()
    {
        var thread = new Thread(() =>
        {
            library.StartSomething();
            Thread.Sleep(10);
            Console.WriteLine("Client thread says: I'm done");
        });
        thread.Start();
    }
}

当我以这种方式运行客户端时:

var client = new Client(new Library());

client.DoWorkInAThread();

// give the client thread time to complete
Thread.Sleep(100);

我得到了这个输出:

Library says: StartSomething called
Client thread says: I'm done
Libaray says: Client thread existed

它能够工作,但是很丑陋。我真的不喜欢所有这些被阻止的监视线程挂起来的想法。有没有更好的方法?

第一种替代方案。

提供一个返回实现IDisposable接口的worker方法,并在文档中明确指出不应在线程之间共享workers。以下是修改后的库:

public class Library
{
    public LibraryWorker GetLibraryWorker()
    {
        return new LibraryWorker();
    }
}

public class LibraryWorker : IDisposable
{
    public void StartSomething()
    {
        Console.WriteLine("Library says: StartSomething called");
    }

    public void Dispose()
    {
        Console.WriteLine("Library says: I can clean up");
    }
}

现在客户端变得更加复杂了:

public class Client
{
    private readonly Library library;

    public Client(Library library)
    {
        this.library = library;
    }

    public void DoWorkInAThread()
    {
        var thread = new Thread(() =>
        {
            using(var worker = library.GetLibraryWorker())
            {
                worker.StartSomething();
                Console.WriteLine("Client thread says: I'm done");
            }
        });
        thread.Start();
    }
}
这个变化的主要问题在于它会破坏API,现有的客户端将不得不重新编写。但这并不是什么坏事,这意味着需要重新检查它们并确保清理工作正确完成。
非破坏性的第二种替代方案是,API提供了一种让客户端声明“工作范围”的方式。一旦范围完成,库就可以进行清理。该库提供了一个实现IDisposable接口的WorkScope,但与上述第一种替代方案不同,StartSomething方法保留在Library类中。
public class Library
{
    public WorkScope GetWorkScope()
    {
        return new WorkScope();
    }

    public void StartSomething()
    {
        Console.WriteLine("Library says: StartSomething called");
    }
}

public class WorkScope : IDisposable
{
    public void Dispose()
    {
        Console.WriteLine("Library says: I can clean up");
    }
}

客户端只需在 WorkScope 中添加 StartSomething 调用...

public class Client
{
    private readonly Library library;

    public Client(Library library)
    {
        this.library = library;
    }

    public void DoWorkInAThread()
    {
        var thread = new Thread(() =>
        {
            using(library.GetWorkScope())
            {
                library.StartSomething();
                Console.WriteLine("Client thread says: I'm done");
            }
        });
        thread.Start();
    }
}

我喜欢这个方案比第一个更少,因为它不强制要求库用户考虑作用域。


2
我正在分配线程本地资源 - 这不是一个好的开始 :( - Martin James
2
很遗憾,这是低级 AMQP 库的限制,您不能在线程之间共享通道。我猜库的另一种可选 API 设计是提供一个非线程安全的“发布者”,客户端需要创建它。 - Mike Hadlow
我不确定我是否正确地将用例和示例代码相关联。使用EasyNetQ的客户端代码是否需要启动新线程以使用库?您能否提供更实际的客户端代码?如果不需要,那么可以使用以下代码: var ctx = library.StartExecutionContext(); ... ctx.Complete(); 然后在Library内部,ctx.Complete将进行清理,可能通过使用ManualResetEvent来通知线程完成。 - Ken Egozi
目前,EasyNetQ提供了一个IBus实例,该实例旨在持续存在于客户端应用程序的生命周期中。要发布一条消息,您只需调用bus.Publish(msg)方法即可。底层AMQP库的发布通道不是线程安全的,因此我必须确保无论客户端使用何种线程模型,都能高效地创建和释放通道。 - Mike Hadlow
客户端为什么需要了解这个?客户端为什么要创建线程?频道概念是否向客户端公开(例如,订阅它们或发布到特定频道)? - Ken Egozi
1
我不知道(或者不关心)客户端为什么要创建线程,但是我想设计一个库,即使他们这样做也不会崩溃。通道不暴露给客户端,但我认为也许应该这样做。请参见上面的“第一种替代方案”,这是一个明显的方法。 - Mike Hadlow
5个回答

9
您可以创建一个带有终结器的线程静态监视器。当线程存活时,它将持有监视器对象。当线程死亡时,它将停止持有它。稍后,当GC启动时,它将完成您的监视器。在终结器中,您可以引发一个事件,通知框架有关客户端线程(观察到的)死亡的情况。
示例代码可以在此处找到:https://gist.github.com/2587063
public class ThreadMonitor
{
    public static event Action<int> Finalized = delegate { };
    private readonly int m_threadId = Thread.CurrentThread.ManagedThreadId;

    ~ThreadMonitor()
    {
        Finalized(ThreadId);
    }

    public int ThreadId
    {
        get { return m_threadId; }
    }
}

public static class Test
{
    private readonly static ThreadLocal<ThreadMonitor> s_threadMonitor = 
        new ThreadLocal<ThreadMonitor>(() => new ThreadMonitor());

    public static void Main()
    {
        ThreadMonitor.Finalized += i => Console.WriteLine("thread {0} closed", i);
        var thread = new Thread(() =>
        {
            var threadMonitor = s_threadMonitor.Value;
            Console.WriteLine("start work on thread {0}", threadMonitor.ThreadId);
            Thread.Sleep(1000);
            Console.WriteLine("end work on thread {0}", threadMonitor.ThreadId);
        });
        thread.Start();
        thread.Join();

        // wait for GC to collect and finalize everything
        GC.GetTotalMemory(forceFullCollection: true);

        Console.ReadLine();
    }
}

我希望这能有所帮助。我认为这比你的额外等待线程更加优雅。


4

由于您无法直接控制线程的创建,因此很难知道线程何时完成其工作。另一种替代方法可能是强制客户端在完成工作时通知您:

public interface IThreadCompletedNotifier
{
   event Action ThreadCompleted;
}

public class Library
{
    public void StartSomething(IThreadCompletedNotifier notifier)
    {
        Console.WriteLine("Library says: StartSomething called");
        notifier.ThreadCompleted += () => Console.WriteLine("Libaray says: Client thread existed");
        var clientThread = Thread.CurrentThread;
        exitMonitorThread.Start();
    }
}

这样,任何调用你的客户端都必须传递某种通知机制,告诉你它何时完成其操作:
public class Client : IThreadCompletedNotifier
{
    private readonly Library library;

    public event Action ThreadCompleted;

    public Client(Library library)
    {
        this.library = library;
    }

    public void DoWorkInAThread()
    {
        var thread = new Thread(() =>
        {
            library.StartSomething();
            Thread.Sleep(10);
            Console.WriteLine("Client thread says: I'm done");
            if(ThreadCompleted != null)
            {
               ThreadCompleted();
            }
        });
        thread.Start();
    }
}

这样更好 - 客户端应提供通知。通知应具有标识要释放的资源的参数。该参数应为分配的资源的句柄/令牌,即与线程ID或任何类似的东西无关,否则库对于池化线程、'green'线程/纤程或永不终止、循环并希望同时或不同时使用不同资源的线程将变得无用。 - Martin James
这是我的第一个想法,提供一个实现IDisposable的OperationContext,然后所有线程本地调用都可以在using语句中执行。我想我首先想探索“它只是工作”的替代方案,但我正在接受一个特定的API的想法。 - Mike Hadlow
3
“it just works” 的解决方案就是不需要<g>。Windows、Linux以及我所使用过的任何操作系统都有“token”、“handle”或其他对象来解决在调用期间维护状态的问题。使用隐式的线程本地数据和轮询线程终止等方法会耗尽你的库的生命力。 - Martin James
谢谢Martin,我被您的论点说服了,看看我的后续博客文章:http://mikehadlow.blogspot.co.uk/2012/05/how-do-i-detect-when-client-thread.html - Mike Hadlow

1
如果客户端线程调用您的库来内部分配一些资源,则客户端应该“打开”您的库并获取一个令牌以进行所有后续操作。该令牌可以是指向库内部矢量的int索引,也可以是指向内部对象/结构的void指针。要求客户在终止之前关闭令牌。
这就是99%的此类库调用的工作方式,其中必须跨客户端调用保留状态,例如套接字句柄,文件句柄。

1
除了执行任何异步的花哨操作来避免线程,我会尝试将所有监视组合成一个单独的线程,轮询已经访问过你的库的所有线程的.ThreadState属性,比如每100毫秒(我不确定你需要多快清理资源...)

1
如果客户端线程永远不终止,而是循环打开更多的库通道/资源,会发生什么?状态轮询解决方案将限制库用户仅使用不断创建和终止的线程。池线程以及任何循环运行的线程都必须禁止。这只是行不通的。 - Martin James
@MartinJames 为什么池线程不能被监视呢?每个使用该库的线程都可以注册进行轮询。我认为这个解决方案很有意义。 - usr
@Martin 我明白你说的话,我认为毋庸置疑地检测线程关闭并不是清理的唯一方法(甚至不是首选方法)。这只是库对用户错误的额外保护。你是正确的:如果用户不关闭池化线程或GUI线程上的句柄,则它们将永远悬空。 - Nathan Wiebe

0

你的.Join解决方案看起来非常优雅。阻塞观察器线程并不是什么可怕的事情。


好的,凡事适度。 :p 我所知道的确实没有其他方法来确保线程完成。 - IngisKahn
如果线程已经完成了该通道的库操作,想要关闭它,然后可能稍后再次打开它或者打开另一个通道怎么办?调用线程可能永远不会终止应用程序的生命周期,但是每个线程可能会打开和关闭数千次库资源。对于库来说,这些轮询客户端线程状态的解决方案都没有用处。 - Martin James
我同意这是一个不好的想法,但你可以使用线程ID来避免重复的观察者。 - IngisKahn
如果将监视线程的堆栈大小减少到最小,则可能可行。不幸的是,CLR正在提交所有堆栈内存。 - usr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接