事件处理程序中的"yield return"

4

我有一个类,构造函数需要传入一个流(stream)。你可以为各种事件设置回调函数,然后调用StartProcessing方法。问题是,我想从一个应该返回IEnumerable的函数中使用它。

例如:

public class Parser
{
    public Parser(System.IO.Stream s) { // saves stream and does some set up }
    public delegate void OnParsedHandler(List<string> token);
    public event OnParsedHandler OnParsedData;
    public void StartProcessing()
    {
        // reads stream and makes callback when it has a whole record
    }
 }

 public class Application
 {
      public IEnumerable<Thing> GetThings(System.IO.Stream s)
      {
            Parser p = new Parser(s);
            p.OnParsedData += (List<string> str) => 
            {
                Thing t = new Thing(str[0]);
                // here is where I would like to yield
                // but I can't
                yield return t;
            };


            p.StartProcessing();
      }
 }

目前我的解决方案不是很好,是将所有的Things放入一个列表中,然后由lambda表达式捕获,调用StartProcessing后再对它们进行迭代。

public class Application
 {
      public IEnumerable<Thing> GetThings(System.IO.Stream s)
      {
            Parser p = new Parser(s);
            List<Thing> thingList = new List<Thing>();

            p.OnParsedData += (List<string> str) => 
            {
                Thing t = new Thing(str[0]);
                thingList .Add(t);
            };


            p.StartProcessing();
            foreach(Thing t in thingList )
            {
                  yield return t;
            }
      }
 }

这里的问题在于现在我必须将所有的Thing对象保存到列表中。

那么 StartProcessing 同步解析整个流,定期通过事件处理程序丢弃已解析的项? - D Stanley
或者,StartProcessing 在内部是以异步方式工作的吗?这似乎是一种奇怪的 API 设计。 - Berin Loritsch
StartProcessing 是同步的。 - bpeikes
2个回答

4
您在这里遇到的问题是,您根本没有“拉”机制,您正在尝试从解析器中推送数据。如果解析器将向您推送数据而不是让调用者拉取数据,则 GetThings 应该返回一个 IObservable 而不是一个 IEnumerable,以便调用者可以在准备好时消耗数据。
如果真的很重要在这里使用拉取机制,那么 Parser 不应该触发事件来表示它有新数据,而是调用者应该能够请求新数据并获取它;它应该返回所有解析的数据或自身返回一个 IEnumerable

我明白你的意思。问题在于解析器可以引发多种类型的事件,使用拉取方法可能行不通。我将不得不再次查看可能需要重新排列代码。 - bpeikes

2
有趣的问题。我想要在@servy关于推送和拉取的论述上进行补充。在您上面的实现中,您有效地将推送机制适应为拉取接口。
首先,您没有指定对StartProcessing()方法的调用是否是阻塞调用。对此有几点说明:
  • If the method is blocking (synchronous), then there is really no point in adapting it to a pull model anyway. The caller will see all the data processed in a single blocking call.

  • In that regard, receiving the data indirectly via an event handler scatters into two seemingly unrelated constructs what should otherwise be a single, cohesive, explicit operation. For example:

    void ProcessAll(Action<Thing> callback);
    
另一方面,如果StartProcessing()方法实际上会生成一个新的线程(也许更好地命名为BeginProcessing()并遵循Event-based Asynchronous Pattern或其他异步处理模式),那么你可以通过使用同步构造器使用等待句柄(如ManualResetEvent、互斥量等)来将其适应为pull机制。伪代码:
public IEnumerable<Thing> GetThings(System.IO.Stream s)
{
    var parser = new Parser(s);
    var waitable = new AutoResetEvent(false);
    Thing item = null;

    parser.OnParsedData += (Thing thing) => 
    {
        item = thing;
        waitable.Set();
    };

    IAsyncResult result = parser.BeginProcessing();
    while (!result.IsCompleted)
    {
        waitable.WaitOne();
        yield return item;            
    }
}

免责声明:
上述代码只作为展示一个想法的手段,它不是线程安全的,同步机制不能正常工作。请参考 生产者消费者 模式获取更多信息。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接