如何合并两个 Linq 的 IEnumerable<T> 查询,而不运行它们?

5

如何合并一个TPL基础任务的 List<T> 以供稍后执行?

 public async IEnumerable<Task<string>> CreateTasks(){ /* stuff*/ }

My assumption is .Concat() ...

     void MainTestApp()  // Full sample available upon request.
     {
        List<string> nothingList = new List<string>();
        nothingList.Add("whatever");
        cts = new CancellationTokenSource();

         delayedExecution =
            from str in nothingList
            select AccessTheWebAsync("", cts.Token);
         delayedExecution2 =
          from str in nothingList
          select AccessTheWebAsync("1", cts.Token);

         delayedExecution = delayedExecution.Concat(delayedExecution2);
     }


    /// SNIP

    async Task AccessTheWebAsync(string nothing, CancellationToken ct)
    {
        // return a Task
    }

我希望确保这不会生成任何任务或评估任何内容。实际上,我想问的是,“什么逻辑执行IQueryable以返回数据?”
背景
由于我正在进行递归,并且不想在正确的时间之前执行此操作,因此在多次调用时合并结果的正确方法是什么?
如果有影响,我正在考虑运行此命令来启动所有任务 var AllRunningDataTasks = results.ToList();,然后是此代码:
while (AllRunningDataTasks.Count > 0)
{
    // Identify the first task that completes.
    Task<TableResult> firstFinishedTask = await Task.WhenAny(AllRunningDataTasks);

    // ***Remove the selected task from the list so that you don't
    // process it more than once.
    AllRunningDataTasks.Remove(firstFinishedTask);

    // TODO: Await the completed task.
    var taskOfTableResult = await firstFinishedTask;

    // Todo: (doen't work)
    TrustState thisState = (TrustState)firstFinishedTask.AsyncState;

    // TODO: Update the concurrent dictionary with data
    // thisState.QueryStartPoint + thisState.ThingToSearchFor 

    Interlocked.Decrement(ref thisState.RunningDirectQueries);
    Interlocked.Increment(ref thisState.CompletedDirectQueries);

    if (thisState.RunningDirectQueries == 0)
    {
        thisState.TimeCompleted = DateTime.UtcNow;
    }
}

为什么连接函数不起作用,它应该可以工作的?另外,您不想运行任务,但运行查询是可以的,对吗? - Tilak
@Tilak 我的重点是在任务上,这是我第一次在任务或查询中进行此操作。 我以前从未用过查询来做这件事,但我记得阅读过 Concat 是如何完成的。 - makerofthings7
@Tilak 或许我在我的代码中发现了一个错误... 待会儿会更新。 - makerofthings7
2个回答

2
回答问题“什么会逻辑地执行IQueryable并返回数据?”的是任何强制产生至少一个值或者强制发现值是否可用的东西。例如,ToList、ToArray、First、Single、SingleOrDefault和Count都会强制评估。(尽管First不会评估整个集合-它将检索第一个项目然后停止。)这些方法都必须起码开始检索值,因为没有任何一种方法可以在不这样做的情况下返回所需内容。对于ToList和ToArray,这些方法返回完全填充的非延迟集合,这就是它们必须评估所有内容的原因。返回单个项的方法至少需要请求第一个项,而Single方法将继续检查如果评估继续是否还会有其他项出现(如果结果还有更多,则抛出异常)。
使用foreach遍历查询也会强制评估。(同样的原因:你要求它提供来自集合的实际值,因此它必须提供这些值。)
Concat不会立即评估,因为它不需要-只有当您要求连接序列的值时,它才需要请求其输入的值。
顺便说一句,尽管您在这里提到了IQueryable,但实际上并没有在示例中使用。这很重要,因为与您实际获取的纯IEnumerable的LINQ to Objects实现相比,它的工作方式有所不同。我不认为这在本例中有什么区别,但这让我想知道您原始代码和您用于说明的版本之间是否有所变化?这很重要,因为不同的LINQ提供程序可以以不同的方式执行操作。IEnumerable的Concat实现绝对使用了延迟评估,尽管我希望这对大多数其他LINQ实现也是正确的,但这并不是绝对的。
如果您需要多次使用结果,并且希望确保只评估一次,但是直到您确实需要它们才进行评估,则通常的方法是在您确定需要评估的点上调用ToList,然后持有生成的List,以便您可以再次使用它。一旦您将数据以List(或数组)形式获得,就可以随意多次使用该列表。
顺便说一句,您的第一个问题有一个问题:“如何合并基于TPL的任务列表以供稍后执行?”
一般而言,如果您已经有一个TPL任务,则无法阻止它执行。(有一个例外情况。如果您直接构造一个Task而不是使用创建它的更常规的方式之一,它实际上不会运行,直到您告诉它为止。但通常,返回任务的API会返回活动任务,也就是说,当您拿到它们时,它们可能已经在运行,甚至已经完成。)
您示例中的“后期执行”来自于您根本没有任务列表。(如果您确实有一个List<T>任务,“后期执行”将不是一个选项。)您只有一些可枚举对象,如果您对它们进行评估,将会创建任务。在任何返回任务的TAP风格的API中,创建任务的操作与启动任务的操作是不可分割的。
根据您写的其余内容,我认为您真正想问的是:
“如何将多个IEnumerable<Task<T>>对象合并为单个IEnumerable<Task<T>>,以一种推迟评估底层可枚举对象的方式,直到组合的可枚举对象本身被评估?” Concat应该适用于此。

有趣...我正在使用Azure存储获取按需加载树,我不想执行子节点,直到所有相同深度的分支都被搜索。Azure存储使用Begin..End语义。我将它们包装在TPL中,如此处所述http://stackoverflow.com/q/13216475/328397 - makerofthings7

0
以下是一种取得数据合并的hacky方法...我不喜欢在Main中使用"nothingList"或其他方面的一些内容,但它似乎完成了工作并允许我合并待处理任务。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;

// Add a using directive and a reference for System.Net.Http.
using System.Net.Http;

// Add the following using directive.
using System.Threading;


namespace ProcessTasksAsTheyFinish
{
    public partial class MainWindow : Window
    {
        // Declare a System.Threading.CancellationTokenSource.
        CancellationTokenSource cts;
        List<IEnumerable<Task>> launchList = new List<IEnumerable<Task>>();

        public MainWindow()
        {
            InitializeComponent();

            List<string> nothingList = new List<string>();
            nothingList.Add("whatever");

            cts = new CancellationTokenSource();

             delayedExecution =
                from str in nothingList
                select AccessTheWebAsync("", cts.Token);


             List<string> nothingList2 = new List<string>();
             nothingList2.Add("whatever");

             delayedExecution2 =
              from str in nothingList2
              select AccessTheWebAsync("1", cts.Token);


             launchList.Add(delayedExecution);
             launchList.Add(delayedExecution2);

             delayedExecution = delayedExecution.Concat(delayedExecution2);
        }
        IEnumerable<Task> delayedExecution = null;
        IEnumerable<Task> delayedExecution2 = null;

        private async void startButton_Click(object sender, RoutedEventArgs e)
        {
            resultsTextBox.Clear();

            // Instantiate the CancellationTokenSource.

            try
            {
                // ***Set up the CancellationTokenSource to cancel after 25 seconds.
                //cts.CancelAfter(250000);

                var test  =  delayedExecution;// AccessTheWebAsync("", cts.Token);

                var testList = test.ToList();

                while (testList.Count() > 0)
                {
                    var firstFinishedTask = await Task.WhenAny(testList);
                    testList.Remove(firstFinishedTask);

                      await firstFinishedTask;
                }

                resultsTextBox.Text += "\r\nDownloads complete.";
            }
            catch (OperationCanceledException tee)
            {
                resultsTextBox.Text += "\r\nDownloads canceled.\r\n";
            }
            catch (Exception)
            {
                resultsTextBox.Text += "\r\nDownloads failed.\r\n";
            }

            cts = null;
        }


        private void cancelButton_Click(object sender, RoutedEventArgs e)
        {
            if (cts != null)
            {
                cts.Cancel();
            }
        }


        async Task<string> AccessTheWebAsync(string nothing, CancellationToken ct)
        {
            // CHANGE THIS VALUE TO CONTROL THE TESTING
            bool delayConversionOfQueryToList = false;

            HttpClient client = new HttpClient();

            // Make a list of web addresses.
            List<string> urlList = null;

            if (nothing == "1")
            {
                urlList = SetUpURLList2();
            }
            else urlList = SetUpURLList();

            // ***Create a query that, when executed, returns a collection of tasks.
            IEnumerable<Task<int>> downloadTasksQuery =
                from url in urlList select ProcessURL(url, client, ct);

            // DEBUG!!!
            if (delayConversionOfQueryToList == true)
            {
                await Task.Delay(10000);
                resultsTextBox.Text += String.Format("\r\nDelay of IQueryable complete.  Tip: Did you see any IsRunning messages?");
            }

            // ***Use ToList to execute the query and start the tasks. 
            List<Task<int>> downloadTasks = downloadTasksQuery.ToList();

            // DEBUG!!!
            if (delayConversionOfQueryToList == false)
            {
                await Task.Delay(10000);
                resultsTextBox.Text += String.Format("\r\nDelay of .ToList() complete.   Tip: Did you see any IsRunning messages?");
            }

            // ***Add a loop to process the tasks one at a time until none remain.
            while (downloadTasks.Count() > 0)
            {
                // Identify the first task that completes.
                Task<int> firstFinishedTask = await Task.WhenAny(downloadTasks);

                resultsTextBox.Text += String.Format("\r\nID  {0}", firstFinishedTask.Id);

                // ***Remove the selected task from the list so that you don't
                // process it more than once.
                downloadTasks.Remove(firstFinishedTask);

                // Await the completed task.
                int length = await firstFinishedTask;
                resultsTextBox.Text += String.Format("\r\nLength of the download:  {0}", length);
            }

            return nothing;
        }


        private List<string> SetUpURLList()
        {
            List<string> urls = new List<string> 
            { 
                "http://msdn.microsoft.com",
                "http://msdn.microsoft.com/library/windows/apps/br211380.aspx",
                "http://msdn.microsoft.com/en-us/library/hh290136.aspx",
                "http://msdn.microsoft.com/en-us/library/dd470362.aspx",
                "http://msdn.microsoft.com/en-us/library/aa578028.aspx",
                "http://msdn.microsoft.com/en-us/library/ms404677.aspx",
                "http://msdn.microsoft.com/en-us/library/ff730837.aspx"
            };
            return urls;
        }
        private List<string> SetUpURLList2()
        {
            List<string> urls = new List<string> 
            { 
                "http://www.google.com",

            };
            return urls;
        }

        async Task<int> ProcessURL(string url, HttpClient client, CancellationToken ct)
        {
            resultsTextBox.Text += String.Format("\r\nIS RUNNING {0}", url);

            // GetAsync returns a Task<HttpResponseMessage>. 
            HttpResponseMessage response = await client.GetAsync(url, ct);
            // Retrieve the website contents from the HttpResponseMessage.
            byte[] urlContents = await response.Content.ReadAsByteArrayAsync();

           // Thread.Sleep(3000);
           // await Task.Delay(1000, ct);
           return urlContents.Length;
        }
    }
}

// Sample Output:

IS RUNNING http://msdn.microsoft.com
IS RUNNING http://msdn.microsoft.com/library/windows/apps/br211380.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/hh290136.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/dd470362.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/aa578028.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/ms404677.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/ff730837.aspx
IS RUNNING http://www.google.com
Delay of .ToList() complete.   Tip: Did you see any IsRunning messages?
ID  1
Length of the download:  48933
ID  2
Length of the download:  375328
ID  3
Length of the download:  220428
ID  4
Length of the download:  222256
ID  5
Length of the download:  229330
ID  6
Length of the download:  136544
ID  7
Length of the download:  207171
Delay of .ToList() complete.   Tip: Did you see any IsRunning messages?
ID  8
Length of the download:  43945
Downloads complete.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接