多个线程使用yield访问IEnumerable

5
我正在使用一个第三方库,它会迭代一些非常大的平面文件,这可能需要很长时间。该库提供了一个枚举器,因此您可以产生每个结果并在处理其余枚举器时处理它,然后枚举器会提取平面文件中的下一个项目。
例如:
IEnumerable<object> GetItems()
{
    var cursor = new Cursor;

    try
    {
        cursor.Open();

        while (!cursor.EOF)
        {
            yield return new //object;

            cursor.MoveNext();
        }

    }
    finally
    {
        if (cursor.IsOpen)
        {
            cursor.Close();
        }
    }
}

我想要实现的是有两个使用同一个Enumerable的消费者,这样我就不必重复提取信息,每个消费者仍然可以在每个项目到达时立即处理,而不必等待所有项目一次性到达。

IEnumerable<object> items = GetItems();

new Thread(SaveToDateBase(items)).Start();
new Thread(SaveSomewhereElse(items)).Start();

我想我试图实现的是类似于“如果消费者要求的项目已经被提取,则产出它,否则继续移动并等待”,但我意识到两个线程之间可能会发生 MoveNext() 冲突。
如果不存在类似于此的内容,则有何想法可以实现?
谢谢。
2个回答

5

使用.NET 4 BlockingCollection<T>和TPL任务实现管道模式是您要寻找的。请参阅我在此StackOverflow帖子中的完整示例答案。

示例:3个同时消费者

BlockingCollection<string> queue = new BlockingCollection<string>();    
public void Start()
{
    var producerWorker = Task.Factory.StartNew(() => ProducerImpl());
    var consumer1 = Task.Factory.StartNew(() => ConsumerImpl());
    var consumer2 = Task.Factory.StartNew(() => ConsumerImpl());
    var consumer3 = Task.Factory.StartNew(() => ConsumerImpl());

    Task.WaitAll(producerWorker, consumer1, consumer2, consumer3);
}

private void ProducerImpl()
{
   // 1. Read a raw data from a file
   // 2. Preprocess it
   // 3. Add item to a queue
   queue.Add(item);
}

// ConsumerImpl must be thrad safe 
// to allow launching multiple consumers simulteniously
private void ConsumerImpl()
{
    foreach (var item in queue.GetConsumingEnumerable())
    {
        // TODO
    }
}

如果还有不清楚的地方,请告诉我。 管道流程的高级图示:

enter image description here


我现在只是看一下,但是我在尝试将您的“TPL特定管道实现”解决方案应用到我上面提到的问题时遇到了困难。 - Mark Vickery
@MarkVickery:你是在寻找另一个样本吗?还是已经找到了解决方案? - sll
很快我会仔细看一下,并回复一份更新和/或被接受的答案。 - Mark Vickery
我看了一下BlockingCollection,虽然它确实解决了我另一个问题,但它并没有解决我发布的问题。不过还是非常感谢。 - Mark Vickery
BlockingCollection只是一个中间缓存,允许多个任务同时添加/检索项目,并且您可以启动尽可能多的消费者任务以便在同一时间处理多个项目。请参见更新的答案。 - sll
2
问题在于每个项目只由三个消费者中的一个处理。原帖的例子是有一个生产者,然后有两个消费者,每个消费者都处理所有项目,但是他们并行进行消费,而不是一个接一个地处理。这就是为什么你的解决方案不适用的原因。 - Servy

3
基本上您想要缓存一个 IEnumerable<T> 的数据,但在存储之前不用等待它完成。 您可以尝试如下操作:
public static IEnumerable<T> Cache<T>(this IEnumerable<T> source)
{
    return new CacheEnumerator<T>(source);
}

private class CacheEnumerator<T> : IEnumerable<T>
{
    private CacheEntry<T> cacheEntry;
    public CacheEnumerator(IEnumerable<T> sequence)
    {
        cacheEntry = new CacheEntry<T>();
        cacheEntry.Sequence = sequence.GetEnumerator();
        cacheEntry.CachedValues = new List<T>();
    }

    public IEnumerator<T> GetEnumerator()
    {
        if (cacheEntry.FullyPopulated)
        {
            return cacheEntry.CachedValues.GetEnumerator();
        }
        else
        {
            return iterateSequence<T>(cacheEntry).GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }
}

private static IEnumerable<T> iterateSequence<T>(CacheEntry<T> entry)
{
    for (int i = 0; entry.ensureItemAt(i); i++)
    {
        yield return entry.CachedValues[i];
    }
}

private class CacheEntry<T>
{
    public bool FullyPopulated { get; private set; }
    public IEnumerator<T> Sequence { get; set; }

    //storing it as object, but the underlying objects will be lists of various generic types.
    public List<T> CachedValues { get; set; }

    private static object key = new object();
    /// <summary>
    /// Ensure that the cache has an item a the provided index.  If not, take an item from the 
    /// input sequence and move to the cache.
    /// 
    /// The method is thread safe.
    /// </summary>
    /// <returns>True if the cache already had enough items or 
    /// an item was moved to the cache, 
    /// false if there were no more items in the sequence.</returns>
    public bool ensureItemAt(int index)
    {
        //if the cache already has the items we don't need to lock to know we 
        //can get it
        if (index < CachedValues.Count)
            return true;
        //if we're done there's no race conditions hwere either
        if (FullyPopulated)
            return false;

        lock (key)
        {
            //re-check the early-exit conditions in case they changed while we were
            //waiting on the lock.

            //we already have the cached item
            if (index < CachedValues.Count)
                return true;
            //we don't have the cached item and there are no uncached items
            if (FullyPopulated)
                return false;

            //we actually need to get the next item from the sequence.
            if (Sequence.MoveNext())
            {
                CachedValues.Add(Sequence.Current);
                return true;
            }
            else
            {
                Sequence.Dispose();
                FullyPopulated = true;
                return false;
            }
        }
    }
}

使用示例:

private static IEnumerable<int> interestingIntGenertionMethod(int maxValue)
{
    for (int i = 0; i < maxValue; i++)
    {
        Thread.Sleep(1000);
        Console.WriteLine("actually generating value: {0}", i);
        yield return i;
    }
}

public static void Main(string[] args)
{
    IEnumerable<int> sequence = interestingIntGenertionMethod(10)
        .Cache();

    int numThreads = 3;
    for (int i = 0; i < numThreads; i++)
    {
        int taskID = i;
        Task.Factory.StartNew(() =>
        {
            foreach (int value in sequence)
            {
                Console.WriteLine("Task: {0} Value:{1}",
                    taskID, value);
            }
        });
    }

    Console.WriteLine("Press any key to exit...");
    Console.ReadKey(true);
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接