TPL FromAsync与TaskScheduler和TaskFactory

5
我正在尝试创建一个任务管道/有序调度器,同时使用TaskFactory.FromAsync。 我想能够触发 Web 服务请求(使用FromAsync 使用 I/O 完成端口),但保持其顺序,并且任何时候只有一个正在执行。
目前我没有使用FromAsync,所以可以执行TaskFactory.StartNew(()=>api.DoSyncWebServiceCall())并依赖于OrderedTaskScheduler,由TaskFactory使用来确保只有一个请求是未完成状态。
我认为当使用FromAsync方法时,这种行为将保持不变,但事实并非如此:
TaskFactory<Stuff> taskFactory = new TaskFactory<Stuff>(new OrderedTaskScheduler());
var t1 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t2 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t3 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));

所有这些beginGetStuff方法都在FromAsync调用中被调用(虽然它们按顺序分派,但同时发生了n个API调用)。 FromAsync有一个重载,需要传入一个TaskScheduler:
public Task FromAsync(
    IAsyncResult asyncResult,
    Action<IAsyncResult> endMethod,
    TaskCreationOptions creationOptions,
    TaskScheduler scheduler
)

但是文档中提到:
任务计划程序用于安排执行结束方法的任务。
并且,您可以看到它使用已经构造好的 IAsyncResult,而不是 Func。这是否需要一个自定义的FromAsync方法或者我漏掉了什么?有人能建议在哪里开始进行此实现吗?谢谢,
编辑:
我希望将这种行为从调用方中抽象出来,因此,根据 TaskFactory 的行为(具有专门的 TaskScheduler),需要立即返回Task——此Task不仅封装了 FromAsync 任务,而且还封装了等待轮到它执行时的任务队列。其中一种可能的解决方案:
class TaskExecutionQueue
{
    private readonly OrderedTaskScheduler _orderedTaskScheduler;
    private readonly TaskFactory _taskFactory;
    public TaskExecutionQueue(OrderedTaskScheduler orderedTaskScheduler)
    {
        _orderedTaskScheduler = orderedTaskScheduler;
        _taskFactory = new TaskFactory(orderedTaskScheduler);

    }

    public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator)
    {
        return _taskFactory.StartNew(taskGenerator).Unwrap();
    }
}

然而,这会在 FromAsync 调用时使用一个线程。理想情况下,我不希望这样做。

3个回答

2

最简单的方法是使用TPL Dataflow

您可以定义一个“块”,该块接收异步委托流并逐个执行它们(在开始下一个之前等待每个完成):

var block = new ActionBlock<Func<Task>>(func => func());

接着,发起一个网络服务请求:

block.Post(() => Task.Factory.FromAsync(...));

或者(我更喜欢这个):
block.Post(() => client.GetStuffAsync(a, b, c));

如果你只想执行任务,那么ActionBlock方法是可以的。但如果你想要生成一系列输出流,那么可以看一下TransformBlock方法:

var block = new TransformBlock<Func<Task<Stuff>>, Stuff>(func => func());

您可以通过调用ReceiveReceiveAsync来获取结果,发送请求的方式也是相同的。

注:该段内容为关于IT技术的说明。

现在我想了想 - 如果 OP 只想要一个请求未完成... 为什么他不只是使用循环呢?如果他能做到,他可能应该更喜欢使用 TPL 数据流。 - usr
那是我的第一反应,但从他的描述中听起来他想要更多的“点火即走”的使用模式。 - Stephen Cleary
谢谢Stephen - 听起来非常有趣,我想去了解一下。抱歉我的问题没有表述清楚,但你说得对,我的使用情况是在任务排队执行之前就能立即处理它(与TaskFactory和专用任务调度程序具有相同的行为)。我还不确定TPL Dataflow是否能够实现这一点,但我会去看看!干杯! - jamespconnor

2
您无法安排IO任务,因为它们没有与之关联的线程。Windows内核提供了无线程IO操作。启动这些IO操作不涉及托管代码,也不涉及TaskScheduler类的使用。
因此,在您确信实际需要访问网络之前,必须延迟启动IO。您可以使用SemaphoreSlim.WaitAsync来限制当前正在运行的任务数量。在启动各个IO和等待其结果之前,请先等待该方法的结果。

0

我在这里决定采用自定义解决方案...锁定方式有些混乱和不受欢迎,但目前来看,它能够完成我想要的工作。

public interface ITaskExecutionQueue
{
    Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator);
    Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator);
    int OutstandingTaskCount { get; }
    event EventHandler OutstandingTaskCountChanged;
}

/// This class ensures that only a single Task is executed at any one time.  They are executed sequentially in order being queued.
/// The advantages of this class over OrderedTaskScheduler is that you can use any type of Task such as FromAsync (I/O Completion ports) 
/// which are not able to be scheduled using a traditional TaskScheduler.
/// Ensure that the `outer` tasks you queue are unstarted.  E.g. <![CDATA[
/// _taskExeQueue.QueueTask(new Task<Task<TResult>>(() => StartMyRealTask()));
/// ]]>
class OrderedTaskExecutionQueue : ITaskExecutionQueue
{
    private readonly Queue<Task> _queuedTasks = new Queue<Task>();
    private Task _currentTask;
    private readonly object _lockSync = new object();

    /// <summary>
    /// Queues a task for execution
    /// </summary>
    /// <typeparam name="TResult"></typeparam>
    /// <param name="taskGenerator">An unstarted Task that creates your started real-work task</param>
    /// <returns></returns>
    public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator)
    {
        return QueueTask(new Task<Task<TResult>>(taskGenerator));
    }

    public Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator)
    {
        Task<TResult> unwrapped = taskGenerator.Unwrap();
        unwrapped.ContinueWith(_ =>
                               {
                                   EndTask();
                                   StartNextTaskIfQueued();
                               }, TaskContinuationOptions.ExecuteSynchronously);

        lock (_lockSync)
        {
            _queuedTasks.Enqueue(taskGenerator);

            if (_currentTask == null)
            {
                StartNextTaskIfQueued();
            }
        }

        TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>();
        tcs.TrySetFromTaskIncomplete(unwrapped);

        OutstandingTaskCountChanged.Raise(this);

        return tcs.Task;
    }

    private void EndTask()
    {
        lock (_lockSync)
        {
            _currentTask = null;
            _queuedTasks.Dequeue();
        }

        OutstandingTaskCountChanged.Raise(this);
    }

    private void StartNextTaskIfQueued()
    {
        lock (_lockSync)
        {
            if (_queuedTasks.Count > 0)
            {
                _currentTask = _queuedTasks.Peek();

                _currentTask.RunSynchronously();
            }
        }
    }

    /// <summary>
    /// Includes the currently executing task.
    /// </summary>
    public int OutstandingTaskCount
    {
        get
        {
            lock (_lockSync)
            {
                return _queuedTasks.Count;
            }
        }
    }

    public event EventHandler OutstandingTaskCountChanged;
}

接受一个未启动的 Task<Task<TResult>> - 这允许队列决定何时执行它并开始 FromAsync 调用(即内部任务)。用法:

Task<Task<TResult>> queueTask = new Task<Task<TResult>>(() => Task.Factory.FromAsync(beginAction, endAction));
Task<TResult> asyncCallTask = _taskExecutionQueue.QueueTask(queueTask);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接