缓冲LINQ查询

22

最终编辑:

我选择了 Timothy 的答案,但如果你想要一个更可爱的实现,可以利用 C# 的 yield 语句,请查看 Eamon 的答案:https://dev59.com/SWIj5IYBdhLWcg3w2ohr#19825659


默认情况下,LINQ 查询是 惰性流式传输的。

ToArray/ToList 可以提供完全的缓冲,但首先它们是急切的,其次在无限序列中完成可能需要相当长的时间。

有没有一种方法可以同时具有流式传输缓冲值的组合行为,从而在生成这些元素时即时缓冲,以便后续查询不会触发已经查询过的元素的生成。

这里有一个基本的用例:

static IEnumerable<int> Numbers
{
    get
    {
        int i = -1;

        while (true)
        {
            Console.WriteLine("Generating {0}.", i + 1);
            yield return ++i;
        }
    }
}

static void Main(string[] args)
{
    IEnumerable<int> evenNumbers = Numbers.Where(i => i % 2 == 0);

    foreach (int n in evenNumbers)
    {
        Console.WriteLine("Reading {0}.", n);
        if (n == 10) break;
    }

    Console.WriteLine("==========");

    foreach (int n in evenNumbers)
    {
        Console.WriteLine("Reading {0}.", n);
        if (n == 10) break;
    }
}

这里是输出结果:

Generating 0.
Reading 0.
Generating 1.
Generating 2.
Reading 2.
Generating 3.
Generating 4.
Reading 4.
Generating 5.
Generating 6.
Reading 6.
Generating 7.
Generating 8.
Reading 8.
Generating 9.
Generating 10.
Reading 10.
==========
Generating 0.
Reading 0.
Generating 1.
Generating 2.
Reading 2.
Generating 3.
Generating 4.
Reading 4.
Generating 5.
Generating 6.
Reading 6.
Generating 7.
Generating 8.
Reading 8.
Generating 9.
Generating 10.
Reading 10.

生成代码被触发了22次。

我希望它在可枚举对象第一次迭代时只被触发11次。

然后第二次迭代将受益于已经生成的值。

大概是这样的:

IEnumerable<int> evenNumbers = Numbers.Where(i => i % 2 == 0).Buffer();

对于熟悉Rx的人来说,它类似于ReplaySubject的行为。


2
不是LINQ需要缓存,而是IEnumerable,已经有一些在互联网上的例子 - Scott Chamberlain
1
这个方案昨天在Reddit上(在此处)被提出。我不想抄袭那位作者的解决方案。 - Austin Salonen
@AustinSalonen:太巧了,谢谢提供链接。 :) - Pragmateek
1
这个的通用术语是“记忆化”。请注意,这里的许多实现处理一些简单情况,但不能处理在一个枚举器完全完成之前多个枚举器枚举结果的情况,不能处理不同枚举器的并行枚举,不能在整个序列未迭代时处理底层可枚举对象等。为了处理这些更复杂的问题,最好使用现有的库实现。 - Servy
@Servy:谢谢你的信息。我不知道记忆化这个术语在这里也适用。 - Pragmateek
显示剩余2条评论
8个回答

16

IEnumerable<T>.Buffer() 扩展方法

public static EnumerableExtensions
{
    public static BufferEnumerable<T> Buffer(this IEnumerable<T> source)
    {
        return new BufferEnumerable<T>(source);
    }
}

public class BufferEnumerable<T> : IEnumerable<T>, IDisposable
{
    IEnumerator<T> source;
    List<T> buffer;
    public BufferEnumerable(IEnumerable<T> source)
    {
        this.source = source.GetEnumerator();
        this.buffer = new List<T>();
    }
    public IEnumerator<T> GetEnumerator()
    {
        return new BufferEnumerator<T>(source, buffer);
    }
    public void Dispose()
    {
        source.Dispose()
    }
}

public class BufferEnumerator<T> : IEnumerator<T>
{
    IEnumerator<T> source;
    List<T> buffer;
    int i = -1;
    public BufferEnumerator(IEnumerator<T> source, List<T> buffer)
    {
        this.source = source;
        this.buffer = buffer;
    }
    public T Current
    {
        get { return buffer[i]; }
    }
    public bool MoveNext()
    {
        i++;
        if (i < buffer.Count)
            return true;
        if (!source.MoveNext())
            return false;
        buffer.Add(source.Current);
        return true;
    }
    public void Reset()
    {
        i = -1;
    }
    public void Dispose()
    {
    }
}

使用方法

using (var evenNumbers = Numbers.Where(i => i % 2 == 0).Buffer())
{
    ...
}

评论

关键点在于,作为输入给Buffer方法的IEnumerable<T> source只调用一次GetEnumerator,无论Buffer的结果被枚举多少次。所有对Buffer结果的枚举器共享相同的源枚举器和内部列表。


2
它会立即完全评估数字,甚至在 evenNumbers 被使用之前。 - sinelaw
2
嗯,Timothy,就像我在无限序列“ToList”中所说的那样,它相当长。;) - Pragmateek
1
@Pragmateek,我错过了那个点。我明白你想要什么了,并且已经更新了答案。 - Timothy Shields
@TimothyShields:感谢你的实现。我真的希望有一种标准的方法来完成这个,但没什么是完美的。你做得很好。 :) - Pragmateek
是的,我在考虑一个不正确的用例来测试3,但这个处理方式可以解决。至于2,我之前回答过一个关于线程安全版本的问题。实际上,如果你想要一个真正健壮的实现,这是一个相当困难的问题。它们之间有很多奇怪的交互方式。 - Servy
显示剩余12条评论

8
你可以使用 F# Power Pack 中的 Microsoft.FSharp.Collections.LazyList<> 类型来实现这个功能(是的,即使没有安装 F#,也可以从 C# 中使用它!)。它位于 Nuget 包 FSPowerPack.Core.Community 中。
具体而言,你需要调用 LazyListModule.ofSeq(...) 方法,该方法返回一个 LazyList,它实现了 IEnumerable 接口,是懒加载且具有缓存机制的。
在你的情况下,使用只需要简单地...
var evenNumbers = LazyListModule.ofSeq(Numbers.Where(i => i % 2 == 0));
var cachedEvenNumbers = LazyListModule.ofSeq(evenNumbers);

尽管我个人更喜欢在所有这种情况下使用 var ,但请注意这意味着编译时类型将比仅使用 IEnumerable <> 更具体 - 尽管这可能永远不会成为缺点。 F#非接口类型的另一个优点是它们公开了一些使用纯枚举无法高效执行的有效操作,例如 LazyListModule.skip
我不确定 LazyList 是否线程安全,但我认为它是安全的。
下面评论中指出的另一种选择(如果您已安装F#)是 SeqModule.Cache (命名空间 Microsoft.FSharp.Collections ,它将在GACed程序集FSharp.Core.dll中)。它具有相同的有效行为。 与其他.NET枚举类似, Seq.cache 没有您可以有效地链接的尾部(或跳过)运算符。
线程安全:与此问题的其他解决方案不同,Seq.cache 在多个枚举器并行运行的意义上是线程安全的(每个枚举器都不是线程安全的)。
性能:我进行了快速基准测试,发现 LazyList 可枚举对象的开销至少是 SeqModule.Cache 变体的4倍,后者的开销至少比自定义实现答案多三倍。因此,虽然F#变体可以工作,但它们不太快。请注意,与执行I / O或任何非平凡计算的可枚举对象相比,3-12倍较慢仍然不是很慢,因此在大多数情况下这可能并不重要,但记住这一点很好。
简而言之:如果需要高效的、线程安全的缓存的可枚举对象,请使用 SeqModule.Cache

1
谢谢 Eamon,F#充满了惊喜。 :) +1 - Pragmateek
1
@Pragmateek 是的 - 这只是 F# 中的 Seq.cache - Reed Copsey

7

我希望本回答既能像sinelaw的回答一样简洁明了,又能像Timothy的回答一样支持多个枚举:

public static IEnumerable<T> Cached<T>(this IEnumerable<T> enumerable) {
    return CachedImpl(enumerable.GetEnumerator(), new List<T>());
}

static IEnumerable<T> CachedImpl<T>(IEnumerator<T> source, List<T> buffer) {
    int pos=0;
    while(true) {
        if(pos == buffer.Count) 
            if (source.MoveNext()) 
                buffer.Add(source.Current); 
            else 
                yield break;
        yield return buffer[pos++];
    }
}

重点是使用yield return语法来实现简短的可枚举实现,但你仍然需要一个状态机来决定是否可以从缓冲区获取下一个元素,或者是否需要检查基础枚举器。 限制:这不尝试保证线程安全,也不处理底层枚举器的释放(一般而言,底层未缓存的枚举器必须保持未释放状态,只要任何已缓存的可枚举对象可能仍在使用)。

不错。它也通过了Zip测试。 - sinelaw
我添加了另一种解决方案,它更长,但使用了一般模式来模拟匿名迭代器,因此更美观。 - sinelaw
1
当你有一个悬挂的 else 时,使用大括号包围你的 if 通常是个好主意,就像你在这里一样。 - Servy
@servy 我本意是想在之前的评论中提到你,但是拼错了你的用户名 :-) - Eamon Nerbonne
@EamonNerbonne:因为Timothy先发了,他花了很多精力修复他的第一次尝试,他的实现是从头开始使用IEnumerable的好提醒。你的当然更可爱,在生产中这是我会使用的。这是在SO上经常发生的困境,如果可以的话,我会接受你们两个的答案。我会更新我的问题,让你的实现得到应有的关注。 :) - Pragmateek
显示剩余9条评论

7

Eamon上面的答案的基础上,这是另一个功能性解决方案(没有新类型),也适用于同时评估。这证明了一个通用模式(共享状态的迭代)潜在地存在于这个问题中。

首先,我们定义一个非常通用的辅助方法,旨在允许我们模拟C#中匿名迭代器的缺失特征:

public static IEnumerable<T> Generate<T>(Func<Func<Tuple<T>>> generator)
{
    var tryGetNext = generator();
    while (true)
    {
        var result = tryGetNext();
        if (null == result)
        {
            yield break;
        }
        yield return result.Item1;
    }
}

Generate类似于带状态的聚合器。它接受一个返回初始状态的函数和一个生成器函数,如果在C#中允许,则该函数将是匿名的,其中包含 yield return initialize返回的状态意味着每个枚举的状态,而调用方可以通过闭包变量来维护更全局的状态(在所有枚举之间共享),如下所示。

现在我们可以将其用于“缓冲的Enumerable”问题:

public static IEnumerable<T> Cached<T>(IEnumerable<T> enumerable)
{
    var cache = new List<T>();
    var enumerator = enumerable.GetEnumerator();

    return Generate<T>(() =>
    {
        int pos = -1;
        return () => {
            pos += 1;
            if (pos < cache.Count())
            {
                return new Tuple<T>(cache[pos]);
            }
            if (enumerator.MoveNext())
            {
                cache.Add(enumerator.Current);
                return new Tuple<T>(enumerator.Current);
            }
            return null;
        };
    });
}

谢谢你的这个 sinelaw。:) +1 - Pragmateek
使用Tuple<T>作为可选的T,这实际上是我以前从未想过的。绝妙的技巧。+1 - Timothy Shields
@sinelaw:我喜欢这个想法,但你在参数传递方面过于创意了:你可以使用闭包避免“通过数组引用int”的技巧(即生成参数可以是Func<Func<Tuple<T>>);而且你可能想要为生成器状态命名概念(即生成参数可以是Func<Func<ValueOrEnd>>)。 - Eamon Nerbonne
@EamonNerbonne 关于元组和可选值 - 理想情况下,.NET应该为此提供一种类型。 - sinelaw
1
好的回答,谢谢。我开始使用这段代码作为起点,并为其编写了一些测试。我的测试揭示了一个事实,即在缓冲结果(到达“end”时)的每次重用中,原始枚举器都会调用“MoveNext”一次。这几乎永远不会成为问题,因为您可以想象IEnumerator的大多数实现都具有某些状态并知道它们已完成,但我不确定是否有保证。如果意图是*精确地重播第一次发生的事情,则闭包中应该有另一个状态变量,例如bool completed - OlduwanSteve
显示剩余11条评论

4
据我所知,目前没有内置的方法可以实现此功能。既然您提到它,这让我有些惊讶(我的猜测是,由于人们经常需要使用此选项,因此可能不值得花费分析代码的工作量来确保生成器每次都产生完全相同的序列)。
但是,您可以自己实现它。最简单的方法是在调用站点上进行操作。
var evenNumbers = Numbers.Where(i => i % 2 == 0).
var startOfList = evenNumbers.Take(10).ToList();

// use startOfList instead of evenNumbers in the loop.

更一般和准确的做法是在生成器中实现:创建一个List<int>缓存,每次生成一个新的数字时,先将其添加到缓存,然后再进行yield return操作。然后,在下一次循环中,首先服务于所有已缓存的数字。例如:
List<int> cachedEvenNumbers = new List<int>();
IEnumerable<int> EvenNumbers
{
  get
  {
    int i = -1;

    foreach(int cached in cachedEvenNumbers)
    {
      i = cached;
      yield return cached;
    }

    // Note: this while loop now starts from the last cached value
    while (true) 
    {
        Console.WriteLine("Generating {0}.", i + 1);
        yield return ++i;
    }
  }
}

如果您足够深入地思考这个问题,您可以得出一个通用的实现方式,即 IEnumerable<T>.Buffered() 扩展方法——需要满足在调用之间枚举不发生更改的要求,但问题是值得吗。


我的答案提供了你所说的通用“Buffered”方法。 - Timothy Shields
感谢您的回答,CompuChip,是的,这是我正在寻找的通用解决方案。无论如何+1. :) - Pragmateek
@TimothyShields 我看到你在我发帖后编辑了你的答案。干得好,谢谢! - CompuChip

3

感谢Eamon Nerbonnesinelaw提供的答案,只需稍作调整!首先,在完成枚举器时释放它。其次,使用锁保护基础枚举器,以便可在多个线程上安全地使用可枚举对象。

// This is just the same as @sinelaw's Generator but I didn't like the name
public static IEnumerable<T> AnonymousIterator<T>(Func<Func<Tuple<T>>> generator)
{
    var tryGetNext = generator();
    while (true)
    {
        var result = tryGetNext();
        if (null == result)
        {
            yield break;
        }
        yield return result.Item1;
    }
}

// Cached/Buffered/Replay behaviour
public static IEnumerable<T> Buffer<T>(this IEnumerable<T> self)
{
    // Rows are stored here when they've been fetched once
    var cache = new List<T>();

    // This counter is thread-safe in that it is incremented after the item has been added to the list,
    // hence it will never give a false positive. It may give a false negative, but that falls through
    // to the code which takes the lock so it's ok.
    var count = 0;

    // The enumerator is retained until it completes, then it is discarded.
    var enumerator = self.GetEnumerator();

    // This lock protects the enumerator only. The enumerable could be used on multiple threads
    // and the enumerator would then be shared among them, but enumerators are inherently not
    // thread-safe so a) we must protect that with a lock and b) we don't need to try and be
    // thread-safe in our own enumerator
    var lockObject = new object();

    return AnonymousIterator<T>(() =>
    {
        int pos = -1;
        return () =>
        {
            pos += 1;
            if (pos < count)
            {
                return new Tuple<T>(cache[pos]);
            }
            // Only take the lock when we need to
            lock (lockObject)
            {
                // The counter could have been updated between the check above and this one,
                // so now we have the lock we must check again
                if (pos < count)
                {
                    return new Tuple<T>(cache[pos]);
                }

                // Enumerator is set to null when it has completed
                if (enumerator != null)
                {
                    if (enumerator.MoveNext())
                    {
                        cache.Add(enumerator.Current);
                        count += 1;
                        return new Tuple<T>(enumerator.Current);
                    }
                    else
                    {
                        enumerator = null;
                    }
                }
            }
        }
        return null;
    };
});

}


1
这段代码存在竞态条件,因此不是线程安全的。两个线程尝试获取列表中的最后一个项目。线程A检查pos < count以查看是否有缓存结果;没有。线程B检查pos < count以查看是否有缓存结果;没有。线程B移动到最后一个项目并返回它。线程B尝试获取下一个项目,遇到列表末尾,并设置enumerator=null。线程A检查enumerator != null,看到它是null,而不是返回最后一个项目,它会return null - Cirdec
你是对的,谢谢!我已经编辑了代码,删除了枚举器的外部检查,我认为这解决了问题。你同意吗? - OlduwanSteve

3
这是一个不完整但紧凑的“功能性”实现(没有定义新类型)。
问题在于它不允许同时枚举。

原始描述: 第一个函数本应该是第二个函数内部的匿名lambda,但是C#不允许在匿名lambda中使用yield

// put these in some extensions class

private static IEnumerable<T> EnumerateAndCache<T>(IEnumerator<T> enumerator, List<T> cache)
{
    while (enumerator.MoveNext())
    {
        var current = enumerator.Current;
        cache.Add(current);
        yield return current;
    }
}
public static IEnumerable<T> ToCachedEnumerable<T>(this IEnumerable<T> enumerable)
{
    var enumerator = enumerable.GetEnumerator();
    var cache = new List<T>();
    return cache.Concat(EnumerateAndCache(enumerator, cache));
}

使用方法:

var enumerable = Numbers.ToCachedEnumerable();

这里有个bug:它不支持多个同时进行的迭代。例如,cached.ZipWith(cached.Skip(1), Tuple.Create)会崩溃——请注意,这是一个特别有趣的情况,因为同时缓存确保列表只被评估一次,但它也是惰性的。 - Eamon Nerbonne
此外,双重嵌套的函数是没有必要的 - 因为你无论如何都会立即评估它们。 - Eamon Nerbonne
糟糕,那个双匿名lambda滑过了。已修复。 - sinelaw
你对于这个 bug 也是正确的。我会把这个答案留下来作为一个“不该这样做”的例子。 - sinelaw

0
我使用以下的扩展方法。
这样,输入可以以最大速度读取,消费者也可以以最大速度处理。
public static IEnumerable<T> Buffer<T>(this IEnumerable<T> input)
{
    var blockingCollection = new BlockingCollection<T>();

    //read from the input
    Task.Factory.StartNew(() =>
    {
        foreach (var item in input)
        {
            blockingCollection.Add(item);
        }

        blockingCollection.CompleteAdding();
    });

    foreach (var item in blockingCollection.GetConsumingEnumerable())
    {
        yield return item;
    }
}

示例用法

此示例具有快速的生产者(查找文件)和慢速的消费者(上传文件)。

long uploaded = 0;
long total = 0;

Directory
    .EnumerateFiles(inputFolder, "*.jpg", SearchOption.AllDirectories)
    .Select(filename =>
    {
        total++;
        return filename;
    })
    .Buffer()
    .ForEach(filename =>
    {
        //pretend to do something slow, like upload the file.
        Thread.Sleep(1000);
        uploaded++;

        Console.WriteLine($"Uploaded {uploaded:N0}/{total:N0}");
    });

enter image description here


你有进行过测量以确定你的断言是否正确吗?我的经验是,使用 ConcurrentQueue 会使锁定变得更慢。 - Enigmativity
这也会增加CPU的负担。如果输入速度慢,yield return循环将在CPU上旋转。 - Enigmativity
谢谢@Enigmativity,我将它从ConcurrentQueue更改为BlockingCollection - Fidel
抱歉,任何形式的并发或阻塞集合都是相同的。 - Enigmativity

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接