并行执行多个任务对

4

细节:

我有一个游戏,有两个独立的AI在对抗。 每个AI都有自己的任务。 这两个任务需要同时开始,并且需要取一些参数并返回值。 现在我想并行运行100-200场比赛(每场比赛中有两个任务)。

问题是两个任务并不是同时开始。它们在所有可用资源空闲时才会随机开始。

代码:

我的当前方法如下所示。

  • 我有一个包含一些参数的inputobjects列表。
  • 使用Parallel.ForEach为每个inputobject创建一个游戏和两个AI。
  • 无论哪个AI先完成游戏,都会使用CancellationToken停止另一个AI,使其玩相同的游戏。
  • 所有返回的值都保存在ConcurrentBag中。

因为只有这样,每个游戏的两个AI任务并不会同时启动,所以我添加了一个AutoResetEvent。我希望我可以等待一个任务,直到第二个任务已经开始,但是AutoResetEvent.WaitOne却阻塞了所有资源。因此,使用AutoResetEvent的结果是第一个AI任务开始并等待第二个任务开始,但由于它们没有释放线程,它们将永远等待。

        private ConcurrentBag<Individual> TrainKis(List<Individual> population) {
            ConcurrentBag<Individual> resultCollection = new ConcurrentBag<Individual>();
            ConcurrentBag<Individual> referenceCollection = new ConcurrentBag<Individual>();

            Parallel.ForEach(population, individual =>
            {
                GameManager gm = new GameManager();

                CancellationTokenSource c = new CancellationTokenSource();
                CancellationToken token = c.Token;
                AutoResetEvent waitHandle = new AutoResetEvent(false);

                KI_base eaKI = new KI_Stupid(gm, individual.number, "KI-" + individual.number, Color.FromArgb(255, 255, 255));
                KI_base referenceKI = new KI_Stupid(gm, 999, "REF-" + individual.number, Color.FromArgb(0, 0, 0));
                Individual referenceIndividual = CreateIndividual(individual.number, 400, 2000);

                var t1 = referenceKI.Start(token, waitHandle, referenceIndividual).ContinueWith(taskInfo => {
                    c.Cancel();
                    return taskInfo.Result;
                }).Result;
                var t2 = eaKI.Start(token, waitHandle, individual).ContinueWith(taskInfo => { 
                    c.Cancel(); 
                    return taskInfo.Result; 
                }).Result;

                referenceCollection.Add(t1);
                resultCollection.Add(t2);
            });

            return resultCollection;
        }

这是 AI 的开始方法,我在这里等待第二个 AI 的下棋:
            public Task<Individual> Start(CancellationToken _ct, AutoResetEvent _are, Individual _i) {
                i = _i;
                gm.game.kis.Add(this);
                if (gm.game.kis.Count > 1) {
                    _are.Set();
                    return Task.Run(() => Play(_ct));
                }
                else {
                    _are.WaitOne();
                    return Task.Run(() => Play(_ct));
                }
            }

还有简化的玩法方法

public override Individual Play(CancellationToken ct) {
            Console.WriteLine($"{player.username} started.");
            while (Constants.TOWN_NUMBER*0.8 > player.towns.Count || player.towns.Count == 0) {
                try {
                    Thread.Sleep((int)(Constants.TOWN_GROTH_SECONDS * 1000 + 10));
                }
                catch (Exception _ex) {
                    Console.WriteLine($"{player.username} error: {_ex}");
                }
                
                //here are the actions of the AI (I removed them for better overview)

                if (ct.IsCancellationRequested) {
                    return i;
                }
            }
            if (Constants.TOWN_NUMBER * 0.8 <= player.towns.Count) {
                winner = true;
                return i;
            }
            return i;
        }

有没有更好的方法来做到这一点,保持所有事物不变,但确保每个游戏中的两个KI任务同时开始?


两个AI玩家是如何相互交互的?它们是否读写某个共享状态?这些读写操作是否使用lock或其他同步原语进行同步,还是无锁的? - Theodor Zoulias
是的,在这种情况下,玩家会轮流进行。但是协调员可以对一个玩家在交换到另一个玩家之前可以进行多少次移动做出任意决定。例如,它可以包括调用Random.Next()。顺便说一句,“完全自由”包括可能一个玩家根本没有行动的可能性,这正是您目前正在经历的。 :-) - Theodor Zoulias
1
哦,我明白了。你能否在你的问题中包含一个AI玩家异步“Start”方法的简化示例,这样我们就能提供替代建议了?(而不是协程) - Theodor Zoulias
我不确定是否可能确保两个任务完全同时开始——操作系统仍然需要安排事情——并且机器上可能有其他进程从一个任务中窃取处理,但不从另一个任务中窃取。因此,即使您可以这样做,也不一定对两个AI都是“公平”的。相反,您需要提供相同数量的时钟周期或迭代或类似的东西。您必须跟踪一个AI获得了多少个周期,然后将其限制,直到另一个AI能够赶上。 - John Wu
@Doc Brown 当我移除AutoResetEvent和ContinueWith CancellationToken时,它可以正常地在一个有两个AI的游戏中工作。只是在同时进行4个或更多游戏时,它会将AI分开。 - theoretisch
显示剩余8条评论
1个回答

3
我的建议是更改Play方法的签名,使其返回Task<Individual>而不是Individual,并将对Thread.Sleep的调用替换为await Task.Delay。这个小变化应该会显著提高AI玩家的响应能力,因为它们不会阻塞任何线程,并且ThreadPool线程池的小池子将得到最优化的利用。
public override async Task<Individual> Play(CancellationToken ct)
{
    Console.WriteLine($"{player.username} started.");
    while (Constants.TOWN_NUMBER * 0.8 > player.towns.Count || player.towns.Count == 0)
    {
        //...
        await Task.Delay((int)(Constants.TOWN_GROTH_SECONDS * 1000 + 10));
        //...
    }
}

您还可以考虑将该方法的名称从Play更改为PlayAsync,以遵守准则

然后,您应该放弃使用Parallel.ForEach方法,因为它不支持异步操作,而是将每个individual投影到一个Task中,将所有任务捆绑在一个数组中,并使用Task.WaitAll方法(或使用await Task.WhenAll,如果您想要全程异步{{link3:)等待它们全部完成。

Task[] tasks = population.Select(async individual =>
{
    GameManager gm = new GameManager();
    CancellationTokenSource c = new CancellationTokenSource();

    //...

    var task1 = referenceKI.Start(token, waitHandle, referenceIndividual);
    var task2 = eaKI.Start(token, waitHandle, individual);

    await Task.WhenAny(task1, task2);
    c.Cancel();
    await Task.WhenAll(task1, task2);
    var result1 = await task1;
    var result2 = await task2;
    referenceCollection.Add(result1);
    resultCollection.Add(result2);
}).ToArray();

Task.WaitAll(tasks);

这样做可以实现最大并发,但可能不是理想的,因为您的计算机的 CPU、RAM 或 CPU <=> RAM 带宽可能会饱和。您可以在这里查找限制并发异步操作数量的方法。

1
@theoretisch 简而言之,定位程序中所有的.Wait().Result调用,并尽可能消除它们(最好全部消除)。在大多数情况下,这些调用会对程序的响应能力或可扩展性造成不良影响。 - Theodor Zoulias
1
刚刚实现了它,而且完美运作。即使同时有200多个游戏也没有问题。非常感谢。所有链接也非常有帮助! - theoretisch
1
@theoretisch 快速而简单的解决方法是在程序开始时通过调用 ThreadPool.SetMinThreads(1000, 1000) 来立即按需增加线程池创建的线程数。虽然这可能会解决您的问题,但这将非常浪费,因为每个线程默认分配 1 MB 的 RAM 用于其堆栈。因此,例如,如果创建了 ~400 个线程,则会有 ~400MB 的浪费内存,专门用于大部分时间处于睡眠状态的线程。 :-) - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接