在C#控制台应用程序中使用async和await异步处理文件列表

3
我在一个简单的控制台应用程序中使用C#的asyncawait进行实验。我的目标很简单:以异步方式处理文件列表,以便处理一个文件不会阻塞其他文件的处理。这些文件之间互不依赖,并且有数千个文件需要处理。
以下是我目前拥有的代码。
public class MyClass
{
    public void Go()
    {
        string[] fileSystemEntries = Directory.GetFileSystemEntries(@"Path\To\Files");

        Console.WriteLine("Starting to read from files!");
        foreach (var filePath in fileSystemEntries.OrderBy(s => s))
        {
            Task task = new Task(() => DoStuff(filePath));
            task.Start();
            task.Wait();
        }
    }

    private async void DoStuff(string filePath)
    {
        await Task.Run(() =>
        {
            Thread.Sleep(1000);
            string fileName = Path.GetFileName(filePath);
            string firstLineOfFile = File.ReadLines(filePath).First();
            Console.WriteLine("{0}: {1}", fileName, firstLineOfFile);
        });
    }
}

我的Main()方法仅仅调用了这个类:

public static class Program
{
    public static void Main()
    {
        var myClass = new MyClass();
        myClass.Go();
    }
}

我好像缺失了异步编程模式的某些要素,因为每次运行程序时,似乎处理的文件数量是随机的,可以是没有处理任何一个文件,也可以是六个文件全部都处理了(在我的示例文件集中)。

基本上,主线程并不会等待所有文件都被处理完毕,这或许是异步运行的一部分,但我并不想这样。我只想:在尽可能多的线程中处理这些文件,但仍然需要等待它们全部完成处理后再结束。


1
你可以在foreach循环中启动并等待每个任务... 创建一个任务数组并使用WaitAll。 - David Brabant
3
这段代码存在一些概念性问题,但其中一个主要的技术问题是:new Task(() => DoStuff(filePath)) 中,DoStuff 是一个 async void 方法。你在这里进行了一次“fire-and-forget”(点火并忘记)调用,任务完成之前 DoStuff 方法已经完成了,以及 myClass.Go() 也完成了。 - noseratio - open to work
@usr,我认为OP在这里忽略的主要问题是他应该使用异步IO,而不是使用Parallel.ForEachTask.Factory.StartNew,并让其余处理在IOCP线程上进行。虽然这方面已经有了重复的内容。 - noseratio - open to work
1
@Scott,是的,自从引入了async/await这些东西,对于初学者来说,这些概念变得模糊和误导。我经常看到人们在使用async/await时,本可以使用同步线程更简单地完成相同的事情。 - usr
2
这个问题现在已经足够独特,可以与现有的材料共存。如果你愿意,可以在你现有的答案中添加内容。重点是让这个问题对未来的访问者更有用。接受你认为对他人最有帮助的答案。 - usr
显示剩余6条评论
2个回答

6
async/await 的主要设计目标之一是方便使用自然异步 I/O API。因此,您的代码可能会被重写为以下形式(未经测试):
public class MyClass
{
    private int filesRead = 0;

    public void Go()
    {
        GoAsync().Wait();
    }

    private async Task GoAsync()
    {
        string[] fileSystemEntries = Directory.GetFileSystemEntries(@"Path\To\Files");

        Console.WriteLine("Starting to read from files! Count: {0}", fileSystemEntries.Length);

        var tasks = fileSystemEntries.OrderBy(s => s).Select(
            fileName => DoStuffAsync(fileName));
        await Task.WhenAll(tasks.ToArray());

        Console.WriteLine("Finish! Read {0} file(s).", filesRead);
    }

    private async Task DoStuffAsync(string filePath)
    {
        string fileName = Path.GetFileName(filePath);
        using (var reader = new StreamReader(filePath))
        {
            string firstLineOfFile = 
                await reader.ReadLineAsync().ConfigureAwait(false);
            Console.WriteLine("[{0}] {1}: {2}", Thread.CurrentThread.ManagedThreadId, fileName, firstLineOfFile);
            Interlocked.Increment(ref filesRead);
        }
    }
}

请注意,它没有显式地产生任何新线程,但是使用await reader.ReadLineAsync().ConfigureAwait(false)时可能在幕后发生。

注:这里的“幕后发生”指的是可能会有新线程被创建。

3

我结合了以上的评论以达到我的解决方案。实际上,我根本不需要使用asyncawait关键字。我只需要创建一个任务列表、启动它们,并调用WaitAll。任何东西都不需要用asyncawait修饰。下面是最终的代码:

public class MyClass
{
    private int filesRead = 0;

    public void Go()
    {
        string[] fileSystemEntries = Directory.GetFileSystemEntries(@"Path\To\Files");

        Console.WriteLine("Starting to read from files! Count: {0}", fileSystemEntries.Length);
        List<Task> tasks = new List<Task>();
        foreach (var filePath in fileSystemEntries.OrderBy(s => s))
        {
            Task task = Task.Run(() => DoStuff(filePath));
            tasks.Add(task);
        }
        Task.WaitAll(tasks.ToArray());
        Console.WriteLine("Finish! Read {0} file(s).", filesRead);
    }

    private void DoStuff(string filePath)
    {
        string fileName = Path.GetFileName(filePath);
        string firstLineOfFile = File.ReadLines(filePath).First();
        Console.WriteLine("[{0}] {1}: {2}", Thread.CurrentThread.ManagedThreadId, fileName, firstLineOfFile);
        filesRead++;
    }
}

在测试时,我添加了Thread.Sleep调用以及忙循环来占用我的机器的CPU。打开任务管理器,我观察到在忙循环期间所有内核都被占用,并且每次运行程序时,文件以不一致的顺序运行(这是一件好事,因为它表明唯一的瓶颈是可用线程的数量)。
每次运行程序时,fileSystemEntries.Length总是与filesRead相匹配。
编辑:根据上面的评论讨论,我发现更清晰的解决方案(并且基于评论中链接的问题更有效)是使用Parallel.ForEach
public class MyClass
{
    private int filesRead;

    public void Go()
    {
        string[] fileSystemEntries = Directory.GetFileSystemEntries(@"Path\To\Files");

        Console.WriteLine("Starting to read from files! Count: {0}", fileSystemEntries.Length);
        Parallel.ForEach(fileSystemEntries, DoStuff);
        Console.WriteLine("Finish! Read {0} file(s).", filesRead);
    }

    private void DoStuff(string filePath)
    {
        string fileName = Path.GetFileName(filePath);
        string firstLineOfFile = File.ReadLines(filePath).First();
        Console.WriteLine("[{0}] {1}: {2}", Thread.CurrentThread.ManagedThreadId, fileName, firstLineOfFile);
        filesRead++;
    }
}

现在,C#中有很多异步编程的方式。在ParallelTaskasync/await之间,选择很多。根据这个线程,看起来最适合我的解决方案是Parallel,因为它提供了最简洁的解决方案,在手动创建Task对象时更高效,并且不会通过使用asyncawait关键字来使代码混乱,同时达到类似的结果。


1
提示:使用 Task.Run 代替 new Task + Task.Start - Stephen Cleary
谢谢!已更新答案以反映这一点。 - Doctor Blue
3
这里使用 Task.Run 是错误的方法。对于长时间运行的 CPU 密集型操作,请使用 Task.Run。对于长时间运行的 I/O 密集型操作,请使用 async/await。Noseratio 的回答是这里正确的方法。 - Daniel Mann
我认为Parallel.ForEach现在最适合我的需求。不过你说得对,我想我不应该使用Task。 - Doctor Blue

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接