如何标记TPL数据流循环以完成?

5

假设在TPL数据流中有以下设置。

var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");

var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);

var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
    return directory.GetDirectories();

});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
    return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);

var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
    var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
    return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
    XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
        (z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
    () => new TransformBlock<string, string>((s) =>
    {
        Trace.TraceInformation("Combining {0}", s);
        return s;
    }));

tileFilder.LinkTo(block);


using (new TraceTimer("Time"))
{
    dirBroadcast.Post(directory);

    block.LinkTo(new ActionBlock<FileInfo>((s) =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);

    }));
    block.Complete();
    block.Completion.Wait();

}

我想知道如何标记它为完成状态,因为这是一个循环操作。一个目录被发布到dirBroadcast广播器上,该广播器将其发布到dirfinder,可能会从广播器返回新的目录,因此我不能简单地将其标记为已完成,否则将阻止dirfinder添加任何目录。我应该重新设计它以跟踪目录数量,还是TPL中有类似的功能。


对于你而言,“完成”是什么意思?你想停止哪个链接,让哪些链接继续运行? - i3arnon
当tileFinder完成后,我想标记block.complete()并等待其完成。但在dirbroadcast完成之前,我无法标记tileFinder.complete(),而dirboardcost与自身循环,因此无法将其标记为完成。 - Poul K. Sørensen
5个回答

4
如果您的代码目的是使用某种并行性遍历目录结构,那么我建议不要使用TPL Dataflow,而是使用微软的响应式框架。我认为这样会更简单。
以下是我的建议:
首先定义一个递归函数来建立目录列表:
Func<DirectoryInfo, IObservable<DirectoryInfo>> recurse = null;
recurse = di =>
    Observable
        .Return(di)
        .Concat(di.GetDirectories()
            .ToObservable()
            .SelectMany(di2 => recurse(di2)))
        .ObserveOn(Scheduler.Default);

这将递归目录并使用默认的Rx调度程序,使可观察对象并行运行。

因此,通过使用输入DirectoryInfo调用recurse,我可以获得一个包含输入目录及其所有后代的可观察列表。

现在,我可以构建一个相当简单的查询来获得我想要的结果:

var query =
    from di in recurse(new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles"))
    from fi in di.GetFiles().ToObservable()
    let zxy =
        fi
            .FullName
            .Split('\\')
            .Reverse()
            .Take(3)
            .Reverse()
            .Select(s => int.Parse(Path.GetFileNameWithoutExtension(s)))
            .ToArray()
    let suffix = String.Format("{0}/{1}/{2}.png", zxy[0], zxy[1], zxy[2])
    select new FileInfo(Path.Combine(di.FullName, suffix));

现在我可以像这样执行查询:
query
    .Subscribe(s =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);
    });

现在,我可能错过了您的自定义代码的一部分,但如果这是您想采取的方法,我相信您可以轻松解决任何逻辑问题。

当它运行完子目录和文件时,此代码会自动处理完成。

要将Rx添加到项目中,请在NuGet中查找“Rx-Main”。


很好,谢谢。TPL 的一个好处是我可以控制各个部分的并行属性。在 Rx 中是否有类似的功能呢?(更一般化,不是针对这个具体问题,对于查找文件之类的其他示例看起来更简单) - Poul K. Sørensen
@pksorensen - 通常情况下,您不需要(也不希望)在 Rx 中控制并行性。它自己做得很好。但是,他们编写了使用各种调度程序的代码,提供各种调度选项,最重要的是,它们基于“IScheduler”接口,因此,如果您不喜欢内置选项,则可以自己编写以获得最终控制。 - Enigmativity

3

我确定这不是总是可能的,但在许多情况下(包括目录枚举),您可以使用运行计数器和Interlocked函数来实现循环一对多数据流的完成:

public static ISourceBlock<string> GetDirectoryEnumeratorBlock(string path, int maxParallel = 5)
{
    var outputBuffer = new BufferBlock<string>();

    var count = 1;

    var broadcastBlock = new BroadcastBlock<string>(s => s);

    var getDirectoriesBlock = new TransformManyBlock<string, string>(d =>
    {
        var files = Directory.EnumerateDirectories(d).ToList();

        Interlocked.Add(ref count, files.Count - 1); //Adds the subdir count, minus 1 for the current directory.

        if (count == 0) //if count reaches 0 then all directories have been enumerated.
            broadcastBlock.Complete();

        return files;

    }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = maxParallel });

    broadcastBlock.LinkTo(outputBuffer, new DataflowLinkOptions() { PropagateCompletion = true });
    broadcastBlock.LinkTo(getDirectoriesBlock, new DataflowLinkOptions() { PropagateCompletion = true });

    getDirectoriesBlock.LinkTo(broadcastBlock);

    getDirectoriesBlock.Post(path);

    return outputBuffer;
}

我稍作修改就可以枚举文件,不过它效果很好。请注意并行度的最大值,这可能会快速饱和网络文件系统!


2
我不认为这可以完成,因为每个块(dirBroadcasttileFilder)都依赖于另一个块,无法独立完成。
我建议您重新设计目录遍历而不使用TPL Dataflow,因为它不适合这种问题。在我看来,更好的方法是递归扫描目录,并用文件流填充您的block
private static void FillBlock(DirectoryInfo directoryInfo, XYZTileCombinerBlock<FileInfo> block)
{
    foreach (var fileInfo in directoryInfo.GetFiles())
    {
        block.Post(fileInfo);
    }

    foreach (var subDirectory in directoryInfo.GetDirectories())
    {
        FillBlock(subDirectory, block);
    }
}

FillBlock(directory, block);
block.Complete();
await block.Completion;

1
这是Andrew Hanlon的solution的一般方法。 它返回一个TransformBlock,支持递归地将消息发布到自身,并在没有更多消息需要处理时自动完成。 transform lambda有三个参数,而不是通常的一个。 第一个参数是正在处理的项目。 第二个参数是已处理消息的“路径”,它是一个包含其父消息的序列IEnumerable<TInput>。 第三个参数是一个Action<TInput>,它将新消息发布到块中,作为当前消息的子项。
/// <summary>Creates a dataflow block that supports posting messages to itself,
/// and knows when it has completed processing all messages.</summary>
public static IPropagatorBlock<TInput, TOutput>
    CreateRecursiveTransformBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TInput>, Action<TInput>, Task<TOutput>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    int pendingCount = 1; // The initial 1 represents the completion of input1 block
    var input1 = new TransformBlock<TInput, (TInput, IEnumerable<TInput>)>(item =>
    {
        Interlocked.Increment(ref pendingCount);
        return (item, Enumerable.Empty<TInput>());
    }, new ExecutionDataflowBlockOptions()
    {
        CancellationToken = dataflowBlockOptions.CancellationToken,
        BoundedCapacity = dataflowBlockOptions.BoundedCapacity
    });

    var input2 = new BufferBlock<(TInput, IEnumerable<TInput>)>(new DataflowBlockOptions()
    {
        CancellationToken = dataflowBlockOptions.CancellationToken
        // Unbounded capacity
    });

    var output = new TransformBlock<(TInput, IEnumerable<TInput>), TOutput>(async entry =>
    {
        try
        {
            var (item, path) = entry;
            var postChildAction = CreatePostAction(item, path);
            return await transform(item, path, postChildAction).ConfigureAwait(false);
        }
        finally
        {
            if (Interlocked.Decrement(ref pendingCount) == 0) input2.Complete();
        }
    }, dataflowBlockOptions);

    Action<TInput> CreatePostAction(TInput parentItem, IEnumerable<TInput> parentPath)
    {
        return item =>
        {
            // The Post will be unsuccessful only in case of block failure
            // or cancellation, so no specific action is needed here.
            if (input2.Post((item, parentPath.Append(parentItem))))
            {
                Interlocked.Increment(ref pendingCount);
            }
        };
    }

    input1.LinkTo(output);
    input2.LinkTo(output);

    PropagateCompletion(input1, input2,
        condition: () => Interlocked.Decrement(ref pendingCount) == 0);
    PropagateCompletion(input2, output);
    PropagateFailure(output, input1, input2); // Ensure that all blocks are faulted

    return DataflowBlock.Encapsulate(input1, output);

    async void PropagateCompletion(IDataflowBlock block1, IDataflowBlock block2,
        Func<bool> condition = null)
    {
        try
        {
            await block1.Completion.ConfigureAwait(false);
        }
        catch { }

        if (block1.Completion.Exception != null)
        {
            block2.Fault(block1.Completion.Exception.InnerException);
        }
        else
        {
            if (block1.Completion.IsCanceled) return; // On cancellation do nothing
            if (condition == null || condition()) block2.Complete();
        }
    }

    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2,
        IDataflowBlock block3)
    {
        try
        {
            await block1.Completion.ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            if (block1.Completion.IsCanceled) return; // On cancellation do nothing
            block2.Fault(ex); block3.Fault(ex);
        }
    }
}

// Overload with synchronous delegate
public static IPropagatorBlock<TInput, TOutput>
    CreateRecursiveTransformBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TInput>, Action<TInput>, TOutput> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateRecursiveTransformBlock<TInput, TOutput>((item, path, postAction) =>
        Task.FromResult(transform(item, path, postAction)), dataflowBlockOptions);
}

生成的块内部包含三个块:两个接收消息的输入块和一个处理消息的输出块。第一个输入块从外部接收消息,第二个输入块从内部接收消息。第二个输入块具有无限容量,因此无限递归最终会导致 OutOfMemoryException
用法示例:
var fileCounter = CreateRecursiveTransformBlock<string, int>(
    (folderPath, parentPaths, postChild) =>
{
    var subfolders = Directory.EnumerateDirectories(folderPath);
    foreach (var subfolder in subfolders) postChild(subfolder);
    var files = Directory.EnumerateFiles(folderPath);
    Console.WriteLine($"{folderPath} has {files.Count()} files"
        + $", and is {parentPaths.Count()} levels deep");
    return files.Count();
});
fileCounter.LinkTo(DataflowBlock.NullTarget<int>());
fileCounter.Post(Environment.GetFolderPath(Environment.SpecialFolder.MyDocuments));
fileCounter.Complete();
fileCounter.Completion.Wait();

上述代码在控制台中打印出文件夹"MyDocuments"的所有子文件夹。

0

只是为了展示我的真实答案,一种TPL和Rx的组合。

            Func<DirectoryInfo, IObservable<DirectoryInfo>> recurse = null;
            recurse = di =>
                Observable
                    .Return(di)
                    .Concat(di.GetDirectories()
                        .Where(d => int.Parse(d.Name) <= br_tile[0] && int.Parse(d.Name) >= tl_tile[0])
                        .ToObservable()
                        .SelectMany(di2 => recurse(di2)))
                    .ObserveOn(Scheduler.Default);
            var query =
                from di in recurse(new DirectoryInfo(Path.Combine(directory.FullName, baselvl.ToString())))
                from fi in di.GetFiles().Where(f => int.Parse(Path.GetFileNameWithoutExtension(f.Name)) >= br_tile[1]
                    && int.Parse(Path.GetFileNameWithoutExtension(f.Name)) <= tl_tile[1]).ToObservable()
                select fi;
            query.Subscribe(block.AsObserver());
            Console.WriteLine("Done subscribing");
            block.Complete();

            block.Completion.Wait();
            Console.WriteLine("Done TPL Block");

这里的 block 是我的 var block = new XYZTileCombinerBlock<FileInfo>


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接