等待异步Lambda在ActionBlock中的应用

6

我有一个带有ActionBlock的Receiver类:

public class Receiver<T> : IReceiver<T>
{

  private ActionBlock<T> _receiver;

  public Task<bool> Send(T item) 
  {
     if(_receiver!=null)
        return _receiver.SendAsync(item);

     //Do some other stuff her
  }

  public void Register (Func<T, Task> receiver)
  {
    _receiver = new ActionBlock<T> (receiver);
  }

  //...
}

注册ActionBlock的Register-Action是一个带有await语句的异步方法:
private static async Task Writer(int num)
{
   Console.WriteLine("start " + num);
   await Task.Delay(500);
   Console.WriteLine("end " + num);
}

现在我想要做的是同步等待(如果设置了一个条件)直到操作方法完成以获得独占行为:
var receiver = new Receiver<int>();
receiver.Register((Func<int, Task) Writer);
receiver.Send(5).Wait(); //does not wait the action-await here!

问题是当执行 "await Task.Delay(500);" 语句时,"receiver.Post(5).Wait();" 不再等待。我尝试了几种变体(TaskCompletionSource、ContinueWith 等),但都不起作用。有人有解决问题的想法吗?

你能不能把 _receiver 改成 TransformBlock,然后把以下操作放到一个新的 ActionBlock 中,并将其链接到 _receiver - svick
你能给我一个小的代码示例吗?我不明白那个重构是如何解决“独占行为”问题的。 - obi111
1个回答

4
默认情况下,ActionBlock 会强制执行独占行为(一次只处理一个项目)。如果您的“独占行为”有其他含义,则可以使用 TaskCompletionSource 在操作完成时通知发送方:
... use ActionBlock<Tuple<int, TaskCompletionSource<object>>> and Receiver<Tuple<int, TaskCompletionSource<object>>>
var receiver = new Receiver<Tuple<int, TaskCompletionSource<object>>>();
receiver.Register((Func<Tuple<int, TaskCompletionSource<object>>, Task) Writer);
var tcs = new TaskCompletionSource<object>();
receiver.Send(Tuple.Create(5, tcs));
tcs.Task.Wait(); // if you must

private static async Task Writer(int num, TaskCompletionSource<object> tcs)
{
  Console.WriteLine("start " + num);
  await Task.Delay(500);
  Console.WriteLine("end " + num);
  tcs.SetResult(null);
}

或者,您可以使用 AsyncLock包含在我的 AsyncEx 库中):

private static AsyncLock mutex = new AsyncLock();

private static async Task Writer(int num)
{
  using (await mutex.LockAsync())
  {
    Console.WriteLine("start " + num);
    await Task.Delay(500);
    Console.WriteLine("end " + num);
  }
}

是的,您说得对,ActionBlock确实强制执行独占行为,但如果注册的操作是异步的,那么它就不再是“真正的独占”了。是的,您的解决方案应该可以解决问题,但我不想添加TaskCompletionSource参数,因为该操作是独占逻辑的入口点 - 因此,如果用户不调用tcs.SetResult,则它将不再起作用... - obi111
在这种情况下,您可以使用 AsyncLock。请参阅更新的答案以获取代码示例。您不再知道何时完成处理项目,但每个项目将依次处理(包括 async 处理)。 - Stephen Cleary

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接