应用程序退出时如何处理RX线程的释放问题

3

我遇到了正确处理使用RX创建的线程在应用程序退出时释放的问题。我在Process Explorer中发现,应用程序关闭后,线程仍然在运行,导致IO异常。

class Program
{
    static void Main(string[] args)
    {
        CompositeDisposable subsriptions = new CompositeDisposable();

       subscriptions.Add(Observable.Interval(TimeSpan.FromSeconds(15))
                .Subscribe(_ =>
                {
                    getData();

                }));
        Thread.Sleep(TimeSpan.FromSeconds(20));     
            subscriptions.Dispose();
         }   
    }
}

如果我取消订阅.Dispose(),线程将在未获取任何数据的情况下终止。感谢您的帮助。谢谢。

getData 函数是做什么的? - JerKimball
它只是从 .txt 文件中读取一些值。 - Jim
3个回答

1
您要查找的模式类似于此。
class Program {

   public string GetData(){
       return "Hello";
   }

   public string async GetDataAsync(){

       return await Observable
            .Interval(TimeSpan.FromSeconds(15))
            .Take(1)
            .Select(()=>GetData());


   }

   static void Main(string[]args){

       var s = GetDataAsync().Wait();
   }

}
Wait 的原因是入口点 Main 不能被标记为 async。在这种情况下,Wait 阻塞当前线程,直到由 GetDataAsync 返回的任务产生一个值。
还要注意,IObservable 与异步/等待兼容,并且将返回序列产生的最后一个值。这就是我添加 Take(1) 的原因,它只会产生 1 个 tick。
另一种选择是直接在 IObservable 上调用 Wait
class Program {

   public string GetData(){
       return "Hello";
   }

   public IObservable<string> GetDataObservable(){

       return Observable
            .Interval(TimeSpan.FromSeconds(15))
            .Take(1)
            .Select(()=>GetData());


   }

   static void Main(string[]args){

       var s = GetDataObservable().Wait();
   }

}

谢谢您的详细回复,我可以问一下如何在应用程序关闭前或关闭后处理线程的释放吗?目前我只是在应用程序关闭时杀死进程,但我担心这样做可能不是正确的方式,因为当它正在工作时会出现问题。 - Jim
2
您不需要终止任何线程。线程会自动返回到线程池,进程将退出。 - bradgonesurfing
如果你使用响应式扩展和 / 或异步 / 等待框架,那么你很少需要直接操作线程。我建议进行一些研究。一旦掌握了它,就是非常酷的东西。 - bradgonesurfing
我同意,在正常情况下,这应该是正常的行为,但在我的情况下,我需要在应用程序退出时终止所有线程。因为应用程序关闭可能是由用户与应用程序交互引起的。因此,当用户通过关闭应用程序打破应用程序的正常工作流程时,后台仍然运行线程,导致IO异常。希望我清楚地表达了我的意思。 - Jim
在应用程序退出之前,只需处理订阅即可。同样,您无需考虑线程。 - bradgonesurfing

1
你需要在 subsriptions.Add(...)subscriptions.Dispose() 之间加入一些延迟。如果没有这个延迟,你的应用程序将立即订阅和释放它们,没有时间让线程完成它们的工作。(而且 Thread.Sleep(1000) 不起作用,因为它在订阅函数内部,不是主函数的一部分。)

我明白了,我已经将Thread.Sleep(20000)移动并增加了mlsec以在处理订阅之前进行处理,但是没有成功。 - Jim
1
使用 Thread.Sleep() 是不好的。 请看我的答案。 如果您正确地使用 ReactiveExtensions 和 async / await,可以在没有它的情况下完成任务。 - bradgonesurfing

0

您可以使用CancellationToken取消底层任务执行并订阅您的可观察对象:

static void Main(string[] args)
{
    var cts = new CancellationTokenSource();
    Observable.
        Interval(TimeSpan.FromSeconds(15)).
        Subscribe(_ => getData(), cts.Token));
    cts.CancelAfter(TimeSpan.FromSeconds(20));
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接