如何使这个异步方法调用起作用?

4
我试图使用异步方法调用开发一个方法管道。该管道的逻辑如下:
  1. 有一个包含n个数据的集合,需要将其提供给管道中的m个方法
  2. 枚举T的集合
  3. 将第一个元素提供给第一个方法
  4. 获取输出,异步地将其提供给第二个方法
  5. 同时,将集合的第二个元素提供给第一个方法
  6. 在第一个方法完成后,将结果提供给第二个方法(如果第二个方法仍在运行,则将结果放入队列中,并在第一个方法中开始执行第三个元素)
  7. 当第二个方法完成执行时,从队列中取出第一个元素并执行,以此类推(每个方法都应异步运行,没有人应等待下一个完成)
  8. 在第m个方法中,在执行数据之后,将结果存储到列表中
  9. 在第m个方法完成第n个元素后,将结果列表(n个结果)返回到最初的级别。
我编写了以下代码,但它没有按预期工作,结果从未返回,而且它的执行顺序也不正确。
static class Program
    {
        static void Main(string[] args)
        {
            var list = new List<int> { 1, 2, 3, 4 };
            var result = list.ForEachPipeline(Add, Square, Add, Square);
            foreach (var element in result)
            {
                Console.WriteLine(element);
                Console.WriteLine("---------------------");
            }
            Console.ReadLine();
        }

        private static int Add(int j)
        {
            return j + 1;
        }

        private static int Square(int j)
        {
            return j * j;
        }

        internal static void AddNotify<T>(this List<T> list, T item)
        {
            Console.WriteLine("Adding {0} to the list", item);
            list.Add(item);
        }    
    }

    internal class Function<T>
    {
        private readonly Func<T, T> _func;

        private readonly List<T> _result = new List<T>();
        private readonly Queue<T> DataQueue = new Queue<T>();
        private bool _isBusy;
        static readonly object Sync = new object();
        readonly ManualResetEvent _waitHandle = new ManualResetEvent(false);

        internal Function(Func<T, T> func)
        {
            _func = func;
        }

        internal Function<T> Next { get; set; }
        internal Function<T> Start { get; set; }
        internal int Count;

        internal IEnumerable<T> Execute(IEnumerable<T> source)
        {
            var isSingle = true;
            foreach (var element in source) {
                var result = _func(element);
                if (Next != null)
                {
                    Next.ExecuteAsync(result, _waitHandle);
                    isSingle = false;
                }
                else
                    _result.AddNotify(result);
            }
            if (!isSingle)
                _waitHandle.WaitOne();
            return _result;
        }


        internal void ExecuteAsync(T element, ManualResetEvent resetEvent)
        {
            lock(Sync)
            {
                if(_isBusy)
                {
                    DataQueue.Enqueue(element);
                    return;
                }
                _isBusy = true;

                _func.BeginInvoke(element, CallBack, resetEvent);
            }           
        }

        internal void CallBack(IAsyncResult result)
        {
            bool set = false;
            var worker = (Func<T, T>) ((AsyncResult) result).AsyncDelegate;
            var resultElement = worker.EndInvoke(result);
            var resetEvent = result.AsyncState as ManualResetEvent;

            lock(Sync)
            {
                _isBusy = false;
                if(Next != null)
                    Next.ExecuteAsync(resultElement, resetEvent);
                else
                    Start._result.AddNotify(resultElement);

                if(DataQueue.Count > 1)
                {
                    var element = DataQueue.Dequeue();
                    ExecuteAsync(element, resetEvent);
                }
                if(Start._result.Count == Count)
                    set = true;
            }
            if(set)
              resetEvent.Set();
        }
    }

    public static class Pipe
    {
        public static IEnumerable<T> ForEachPipeline<T>(this IEnumerable<T> source, params Func<T, T>[] pipes)
        {
            Function<T> start = null, previous = null;
            foreach (var function in pipes.Select(pipe => new Function<T>(pipe){ Count = source.Count()}))
            {
                if (start == null)
                {
                    start = previous = function;
                    start.Start = function;
                    continue;
                }
                function.Start = start;
                previous.Next = function;
                previous = function;
            }
            return start != null ? start.Execute(source) : null;
        }
    }

请问您们能帮我让这件事情实现吗?如果这种设计对于实际的方法管道不太合适,请随时建议其他方法。

编辑: 我必须严格遵守 .Net 3.5。

3个回答

1

采用管道方式的特定原因是什么?在我看来,为每个输入启动一个单独的线程,并将所有函数链接在一起会更简单,执行速度也更快。例如:

function T ExecPipe<T>(IEnumerable<Func<T, T>> pipe, T input)
{
  T value = input;
  foreach(var f in pipe)
  {
    value = f(value);
  }
  return value;
}

var pipe = new List<Func<int, int>>() { Add, Square, Add, Square };
var list = new List<int> { 1, 2, 3, 4 };
foreach(var value in list)
{
  ThreadPool.QueueUserWorkItem(o => ExecPipe(pipe, (int)o), value);
}

现在,谈到你的代码,我认为要实现具有M阶段的准确流水线实现,您必须恰好拥有M个线程,因为每个阶段都可以并行执行 - 现在,某些线程可能会空闲,因为输入尚未到达它们。我不确定您的代码是否正在启动任何线程以及特定时间的线程计数。


这还算可以,但是如何从中获取结果呢? - Anindya Chatterjee
ExecPipe将返回结果,因此您可以以线程安全的方式将其存储回结果列表(另一个列表/数组/队列)。 - VinayC
这段代码似乎不能正常工作。在 ThreadPool.QueueUserWorkItem(o => ExecPipe(pipe, value)); 中,value 的值始终为4。请再次检查一下? - Anindya Chatterjee
@Anindya,这是我的疏忽 - 这是由于lambda闭包引起的。我应该放置一个免责声明:我没有测试过这段代码 - 它只是为了说明目的。无论如何,我已经编辑了代码,应该可以解决问题(同样未经测试)。 - VinayC
@Wim,OP在谈论输入值而不是返回值! - VinayC
显示剩余3条评论

1

我并没有立刻在你的代码中找到问题,但是你可能在过于复杂化事情。这可能是一种更简单的方法来完成你想要的。

public static class Pipe 
{
   public static IEnumerable<T> Execute<T>(
      this IEnumerable<T> input, params Func<T, T>[] functions)
   {
      // each worker will put its result in this array
      var results = new T[input.Count()];

      // launch workers and return a WaitHandle for each one
      var waitHandles = input.Select(
         (element, index) =>
         {
            var waitHandle = new ManualResetEvent(false);
            ThreadPool.QueueUserWorkItem(
               delegate
               {
                  T result = element;
                  foreach (var function in functions)
                  {
                     result = function(result);
                  }
                  results[index] = result;
                  waitHandle.Set();
               });
            return waitHandle;
         });

      // wait for each worker to finish
      foreach (var waitHandle in waitHandles)
      {
          waitHandle.WaitOne();
      }
      return results;
   }
}

这种方法不同于你自己尝试的为管道的每个阶段创建锁定的方式。我忽略了这一点,因为它似乎没有什么用处。不过,你可以通过像这样包装函数来轻松添加它:

var wrappedFunctions = functions.Select(x => AddStageLock(x));

其中 AddStageLock 是这样的:

private static Func<T,T> AddStageLock<T>(Func<T,T> function)
{
   object stageLock = new object();
   Func<T, T> wrappedFunction =
      x =>
      {
         lock (stageLock)
         {
            return function(x);
         }
      };
   return wrappedFunction;
}

编辑:Execute 的实现可能比单线程执行慢,除非每个单独元素的工作量超过了创建等待句柄和在线程池上调度任务的开销。要真正从多线程中受益,您需要限制开销;.NET 4 中的 PLINQ 通过分区数据来实现这一点。


Wim,你的代码似乎存在一个小问题,由于waithandles数量的限制(64),如果我在可枚举对象中使用超过64个数据,它就会失败;将数据分组为64个元素也没有帮助。你能提供任何好的解决方法吗?顺便说一下,我不能使用PLINQ或TPL,必须坚持使用.NET 3.5解决方案。 - Anindya Chatterjee
1
我忘记了WaitAll的限制。我已经编辑了代码,只是分别等待每个waitHandle,这具有相同的效果。 - Wim Coenen
但是就像我说的那样,你应该通过对数据进行分区来限制开销和任务数量。系统无论如何都无法并行运行那么多任务!将可枚举的所有元素放入列表中,将列表分成x个块,并使用x个任务处理这些块。否则,整个过程可能比单线程版本运行得更慢。测量性能以找到最佳方案。 - Wim Coenen
请问您能否更详细地说明如何将数据进行分区并使用x个任务运行它们;似乎我必须使用TPL或Rx,但我不能。您能否展示一些在net 3.5中实现此功能的示例?另外,感谢您的编辑,+1。 - Anindya Chatterjee
@Anindya Chatterjee:如果你觉得自己无法完成,可以使用.NET 3.5的PLINQ和TPL预发布版本。这是在.NET 4发布之前可用的技术预览版。它仍然作为反应扩展预览版的一部分可用:http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx - Wim Coenen

0

为什么不为每个迭代分配一个线程,并在锁定资源中聚合结果。您只需要这样做。可以使用PLinq来实现。 我认为您可能将方法误认为资源。只有当方法处理其中包含共享资源的关键块时,才需要锁定方法。通过从资源中选择并从那里打破新线程,您可以消除管理第二个方法的需要。

例如:方法X调用方法1,然后将值传递给方法2 对于arr中的每个项目 异步(MethodX(item));


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接