自定义 ActionBlock<T>

9
我想实现一个有优先级的 ActionBlock<T>,这样我就可以通过使用 Predicate<T> 条件性地给一些 TInput 项目设置优先级。
我阅读了 Parallel Extensions Extras SamplesGuide to Implementing Custom TPL Dataflow Blocks,但仍然不清楚如何实现这种情况。
---------------------------- 编辑 ---------------------------
有一些任务,其中5个可以同时运行。当用户按下按钮时,根据谓词函数,应该以最高优先级运行一些任务。
实际上,我写了这段代码。
TaskScheduler taskSchedulerHighPriority;
ActionBlock<CustomObject> actionBlockLow;
ActionBlock<CustomObject> actionBlockHigh;
...
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5);
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0);
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1);
...
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow });
...     
if (predicate(customObject))
    actionBlockHigh.Post(customObject);
else
    actionBlockLow.Post(customObject);

但是似乎优先级根本没有生效。
---------------------------- 编辑 ------------------
我发现当我使用这行代码时:

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerLow });

应用程序正确地观察任务的优先级,但一次只能执行一个任务。同时,使用以下代码块,应用程序可以同时运行5个任务,但优先级顺序不合适。

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerLow });

更新:
感谢svick的提示,我应该为taskSchedulerLow指定MaxMessagesPerTask


2
优先级是由什么决定的?它是否与 T 没有关系?还是优先级是 T 的固有/派生属性? - casperOne
你可以创建一个使用ConcurrentPriorityQueue的自定义缓冲块,或者创建一个自定义的异步转换块。这两个选项都不是简单的。同时也同意@casperOne的观点,你的情况中“priority”是什么意思? - Panagiotis Kanavos
1个回答

8
您的问题没有包含太多细节,因此以下只是猜测您可能需要什么。
我认为最简单的方法是使用两个ActionBlock,在 ParallelExtensionsExtrasQueuedTaskScheduler上以不同的优先级运行。您将使用谓词链接到高优先级块,然后连接到低优先级块。此外,为了确保高优先级的Task不会等待,设置低优先级块的MaxMessagesPerTask
在代码中,它应该是这样的:
static ITargetBlock<T> CreatePrioritizedActionBlock<T>(
    Action<T> action, Predicate<T> isPrioritizedPredicate)
{
    var buffer = new BufferBlock<T>();

    var scheduler = new QueuedTaskScheduler(1);

    var highPriorityScheduler = scheduler.ActivateNewQueue(0);
    var lowPriorityScheduler = scheduler.ActivateNewQueue(1);

    var highPriorityBlock = new ActionBlock<T>(
        action, new ExecutionDataflowBlockOptions
        {
            TaskScheduler = highPriorityScheduler
        });
    var lowPriorityBlock = new ActionBlock<T>(
        action, new ExecutionDataflowBlockOptions
        {
            TaskScheduler = lowPriorityScheduler,
            MaxMessagesPerTask = 1
        });

    buffer.LinkTo(highPriorityBlock, isPrioritizedPredicate);
    buffer.LinkTo(lowPriorityBlock);

    return buffer;
}

这只是一个示例,展示了你可以做的事情,例如,返回块的Completion行为不正确。

1
在你的代码中,你没有为低优先级块指定 MaxMessagesPerTask。就像我说的那样,这样做非常重要。 - svick

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接