使用带有进度报告的Stream.CopyToAsync - 即使复制完成后也会报告进度

20
我已经开发了一个简单的控制台应用程序,它可以从互联网上下载文件。
由于我在使用WebClient时遇到了问题,因此我决定改用HttpClient编写我的应用程序。

基本上,我正在请求以读取标头,然后使用ReadAsStreamAsync获取流,最后使用CopyToAsync将其复制到本地文件中。

我找到了支持IProgress的流扩展方法:

public static class StreamExtensions
{
    public static async Task CopyToAsync(this Stream source, Stream destination, IProgress<long> progress, CancellationToken cancellationToken = default(CancellationToken), int bufferSize = 0x1000)
    {
        var buffer = new byte[bufferSize];
        int bytesRead;
        long totalRead = 0;
        while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
        {
            await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken);
            cancellationToken.ThrowIfCancellationRequested();
            totalRead += bytesRead;
            //Thread.Sleep(10);
            progress.Report(totalRead);
        }
    }
}

我的应用程序能够工作,但我获得的进度信息不正确。
例如,在下载 2 个文件时,我在输出窗口中看到以下内容:

file1.tmp 60.95%
file2.tmp 98.09%
file1.tmp 60.98%
file2.tmp 98.21%
file2.tmp 98.17%
file2.tmp 98.25%
file1.tmp 61.02%
file2.tmp 98.41%
file2.tmp downloaded
file2.tmp 98.29%
file2.tmp 98.37%
file1.tmp 61.06%
file2.tmp 89.27%
file2.tmp 89.31%
file2.tmp 98.33%
file2.tmp 98.45%
file2.tmp 98.48%
file1.tmp 61.10%
file1.tmp 61.14%
file2.tmp 98.52%
file1.tmp 61.22%
file2.tmp 98.60%
file2.tmp 98.56%
file1.tmp 61.30%
file2.tmp 98.88%
file2.tmp 90.44%
file1.tmp 61.53%
file2.tmp 98.72%
file1.tmp 61.41%
file1.tmp 61.73%
file2.tmp 98.80%
file1.tmp 61.26%
file1.tmp 61.49%
file1.tmp 61.57%
file1.tmp 61.69%
...
file1.tmp 99.31%
file1.tmp 98.84%
file1.tmp 98.80%
file1.tmp 99.04%
file1.tmp 99.43%
file1.tmp 99.12%
file1.tmp 99.00%
file1.tmp downloaded
file1.tmp 100.00%
file1.tmp 98.73%
file1.tmp 98.88%
file1.tmp 99.47%
file1.tmp 99.98%
file1.tmp 99.90%
file1.tmp 98.96%
file1.tmp 99.78%
file1.tmp 99.99%
file1.tmp 99.74%
file1.tmp 99.59%
file1.tmp 99.94%
file1.tmp 98.49%
file1.tmp 98.53%
ALL FILES DOWNLOADED
file1.tmp 99.55%
file1.tmp 98.41%
file1.tmp 99.62%
file1.tmp 98.34%
file1.tmp 99.66%
file1.tmp 98.69%
file1.tmp 98.37%

您可以看到,我得到了有关已下载文件2的信息,但我仍然从CopyToAsync获得进度报告,文件1也是如此。

因此,有时会出现奇怪的控制台输出:

输入图像描述

理想情况下,当我调用以下内容时,我希望能确保:

await streamToReadFrom.CopyToAsync(streamToWriteTo, progress, source.Token,0x2000);
Debug.WriteLine(filename+" downloaded");

在我获取了调试信息之后,没有任何进展的报告(文件已下载)。 我认为await可以解决我的问题,但实际上并没有。

我该如何解决这个问题? 作为一个临时解决方案,我在汇报进度之前,在CopyToAsync中添加Thread.Sleep。

以下是我的当前代码:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncDownloadTest
{
    class Program
    {
        private const string LocalPath = @"D:\TEMP";

        static void Main()
        {
            try
            {
                var filesToDownlad = new List<Tuple<string, string>>
                {
                    new Tuple<string, string>("file1.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip"),
                    new Tuple<string, string>("file2.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip")
                };
                _consolePosition = -1;
                Console.CursorVisible = false;

                Parallel.ForEach(filesToDownlad, new ParallelOptions { MaxDegreeOfParallelism = 4 }, doc =>
                {
                    DownloadFile(doc.Item2,doc.Item1).Wait();
                });
                Debug.WriteLine("ALL FILES DOWNLOADED");
                Console.CursorVisible = true;    
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                Console.ReadLine();
            }
        }

        private static readonly object ConsoleLock = new object();
        private static int _consolePosition;

        static readonly CancellationTokenSource source = new CancellationTokenSource();

        private static async Task DownloadFile(string url, string filename)
        {
            int currenctLineNumber = 0;
            int currectProgress = 0;

            try
            {
                lock (ConsoleLock)
                {
                    _consolePosition++;
                    currenctLineNumber = _consolePosition;
                }

                long fileSize = -1;

                IProgress<long> progress = new Progress<long>(value =>
                {
                    decimal tmp = (decimal)(value * 100) / fileSize;

                    if (tmp != currectProgress && tmp > currectProgress)
                    {
                        lock (ConsoleLock)
                        {
                            currectProgress = (int)tmp;
                            Console.CursorTop = currenctLineNumber;
                            Console.CursorLeft = 0;
                            Console.Write("{0,10} - {2,11} - {1,6:N2}%", filename, tmp, "DOWNLOADING");
                        }
                        Debug.WriteLine("{1} {0:N2}%", tmp, filename);
                    }
                });

                using (HttpClient client = new HttpClient())
                {
                    using (HttpResponseMessage response = await client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, source.Token))
                    {
                        response.EnsureSuccessStatusCode();
                        if (response.Content.Headers.ContentLength.HasValue) fileSize = response.Content.Headers.ContentLength.Value;

                        if (response.Content.Headers.ContentDisposition != null)
                        {
                            var tmp = response.Content.Headers.ContentDisposition.FileName.Replace("\"", "");
                            Debug.WriteLine("Real name: {0}",tmp);
                        }

                        using (Stream streamToReadFrom = await response.Content.ReadAsStreamAsync())
                        {
                            using (Stream streamToWriteTo = File.Open(Path.Combine(LocalPath, filename), FileMode.Create, FileAccess.Write))
                            {
                                await streamToReadFrom.CopyToAsync(streamToWriteTo, progress, source.Token,0x2000);

                                Debug.WriteLine(filename+" downloaded");

                                lock (ConsoleLock)
                                {
                                    Console.CursorTop = currenctLineNumber;
                                    Console.CursorLeft = 0;
                                    var oldColor = Console.ForegroundColor;
                                    Console.ForegroundColor = ConsoleColor.Green;
                                    Console.Write("{0,10} - {2,11} - {1,6:N2}%", filename, 100, "SUCCESS");
                                    Console.ForegroundColor = oldColor;
                                }
                            }
                        }
                    }
                }
            }
            catch (Exception e)
            {
                Debug.WriteLine(e.Message);
                lock (ConsoleLock)
                {
                    Console.CursorTop = currenctLineNumber;
                    Console.CursorLeft = 0;
                    var oldColor = Console.ForegroundColor;
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.Write("{0,10} - {2,11} - {1,6:N2}%", filename, currectProgress, "ERROR");
                    Console.ForegroundColor = oldColor;
                }
            }
        }
    }

    public static class StreamExtensions
    {
        public static async Task CopyToAsync(this Stream source, Stream destination, IProgress<long> progress, CancellationToken cancellationToken = default(CancellationToken), int bufferSize = 0x1000)
        {
            var buffer = new byte[bufferSize];
            int bytesRead;
            long totalRead = 0;
            while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
            {
                await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken);
                cancellationToken.ThrowIfCancellationRequested();
                totalRead += bytesRead;
                Thread.Sleep(10);
                progress.Report(totalRead);
            }
        }
    }
}

我不认为这与使用“async”有任何关系,而是与您的代码在将输出呈现到控制台时设置了错误的行/列有关。您的“lock”出现了一些问题,但我没有进一步调查。 - Krumelur
@Krumelur 感谢您的评论,即使我删除了与控制台相关的代码,我仍然可以在 VS 输出窗口中看到 file2.tmp 下载完成 的消息,但之后我仍然会收到进度报告,就像我的问题所示(第二个代码段以 file1.tmp 60.95% 开头)。 - Misiu
2
Parallel.ForEach中执行DownloadFile(doc.Item2,doc.Item1).Wait();时要小心,Parallel类将使用调用线程作为其中一个工作线程,如果该调用线程具有synchronizationContext,则会死锁程序。您可能需要查看TPL Dataflow,而不是使用Parallel,它具有支持传递异步函数的方法,因此您可以直接传递DownloadFile而无需调用.Wait() - Scott Chamberlain
@ScottChamberlain 感谢您的评论,我尝试过查看TPL Dataflow,但是我不知道如何将我的代码转换为使用它的方式。您能否尝试将其转换并将其发布为答案?这会对我很有帮助。 - Misiu
2个回答

14
你的问题实际上在这里:
new Progress<long>
Progress<T>始终SynchronizationContext中调用其回调函数,在这种情况下,SynchronizationContext是线程池SynchronizationContext。这意味着当进度报告代码调用Report时,它只是将回调排队到线程池中。因此,可能会看到它们无序(或在下载实际完成后仍然稍微延迟)。
为了解决这个问题,您可以创建自己的IProgress<T>自定义实现:
//C#6.0
public sealed class SynchronousProgress<T> : IProgress<T>
{
  private readonly Action<T> _callback;
  public SynchronousProgress(Action<T> callback) { _callback = callback; }
  void IProgress<T>.Report(T data) => _callback(data);
}
//older version
public sealed class SynchronousProgress<T> : IProgress<T>
{
    private readonly Action<T> _callback;

    public SynchronousProgress(Action<T> callback)
    {
        _callback = callback;
    }

    void IProgress<T>.Report(T data)
    {
        _callback(data);
    }
}

然后替换该行

IProgress<long> progress = new Progress<long>(value =>

使用

IProgress<long> progress = new SynchronousProgress<long>(value =>

谢谢您的回复。我使用Progres是因为我认为它会完成它的工作。也许Action或简单的Delegate会更好?我马上尝试您的解决方案,但也许有更好的方法来实现我想做的事情。 - Misiu

3

OP请求我在评论中展示如何使用TPL Dataflow编写他的程序。实际上,这是一个相当简单的转换过程。首先,添加对NuGet包System.Threading.Tasks.Dataflow的引用。然后只需将主函数更改为

static void Main()
{
    try
    {
        var filesToDownlad = new List<Tuple<string, string>>
        {
            new Tuple<string, string>("file1.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip"),
            new Tuple<string, string>("file2.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip")
        };
        _consolePosition = -1;
        Console.CursorVisible = false;

        var downloadBlock = new ActionBlock<Tuple<string, string>>(doc => DownloadFile(doc.Item2, doc.Item1),
                                                                   new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 4});

        foreach (var file in filesToDownlad)
        {
            downloadBlock.Post(file);
        }
        downloadBlock.Complete();
        downloadBlock.Completion.Wait();


        Debug.WriteLine("ALL FILES DOWNLOADED");
        Console.CursorVisible = true;
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
        Console.ReadLine();
    }
}

如果您正在使用具有同步上下文的程序,并且希望等待完成和发布而不是进行同步操作,则可以执行以下操作:
static async Task Example()
{
    try
    {
        var filesToDownlad = new List<Tuple<string, string>>
        {
            new Tuple<string, string>("file1.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip"),
            new Tuple<string, string>("file2.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip")
        };
        _consolePosition = -1;
        Console.CursorVisible = false;

        var downloadBlock = new ActionBlock<Tuple<string, string>>(doc => DownloadFile(doc.Item2, doc.Item1),
                                                                   new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 4});

        foreach (var file in filesToDownlad)
        {
            await downloadBlock.SendAsync(file);
        }
        downloadBlock.Complete();
        await downloadBlock.Completion;


        Debug.WriteLine("ALL FILES DOWNLOADED");
        Console.CursorVisible = true;
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
        Console.ReadLine();
    }
}

注意,这并不能解决“ALL FILES DOWNLOADED”过早运行的问题。您需要使用Stephen的解决方案来解决该问题。这只是在代码可能在调用线程上存在SynchronizationContext的情况下修复潜在死锁的方法。

谢谢您发布这段代码。我以为Dataflow会很难,但是代码看起来很清晰。DataFlow是我一定要研究的东西! - Misiu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接