如何将事件发射器用作异步生成器

14

我尝试使用babel来使用async generator的简洁语法(我陷入了node 8),我想知道如何将事件发射器清晰地转换为async generator。

到目前为止,我的代码看起来像这样:

    const { EventEmitter } = require('events')

    // defer fonction for resolving promises out of scope
    const Defer = () => {
      let resolve
      let reject
      let promise = new Promise((a, b) => {
        resolve = a
        reject = b
      })
      return {
        promise,
        reject,
        resolve
      }
    }


    // my iterator function
    function readEvents(emitter, channel) {
      const buffer = [Defer()]
      let subId = 0

      emitter.on(channel, x => {
        const promise = buffer[subId]
        subId++
        buffer.push(Defer())
        promise.resolve(x)
      })

      const gen = async function*() {
        while (true) {
          const val = await buffer[0].promise
          buffer.shift()
          subId--
          yield val
        }
      }

      return gen()
    }

    async function main () {
      const emitter = new EventEmitter()
      const iterator = readEvents(emitter, 'data')

      // this part generates events
      let i = 0
      setInterval(() => {
        emitter.emit('data', i++)
      }, 1000)

      // this part reads events
      for await (let val of iterator) {
        console.log(val)
      }
    }

    main()

这很复杂 - 能不能简化一下?

4个回答

12
我想到了这个:

我提出了以下内容:

async *stream<TRecord extends object=Record<string,any>>(query: SqlFrag): AsyncGenerator<TRecord> {
    const sql = query.toSqlString();

    let results: TRecord[] = [];
    let resolve: () => void;
    let promise = new Promise(r => resolve = r);
    let done = false;

    this.pool.query(sql)
        .on('error', err => {
            throw err;
        })
        .on('result', row => {
            results.push(row);
            resolve();
            promise = new Promise(r => resolve = r);
        })
        .on('end', () => {
            done = true;
        })

    while(!done) {
        await promise;
        yield* results;
        results = [];
    }
}

目前看起来似乎正常运作。

也就是说,你可以像 Khanh 的解决方案一样创建一个虚拟的 Promise 来等待第一个结果,但由于可能会同时收到许多结果,因此将它们推入数组中并重置 Promise 以等待结果(或批量结果)。不必担心这个 Promise 被覆盖数十次而从未等待过。

然后,我们可以使用 yield* 一次性生成所有结果,并清除数组以进行下一批操作。


可以通过将结果行作为参数传递给resolve()来简化它,而不是存储在中间数组中。例如:resolve: (r: TRecord) => voidnew Promise<TRecord>(r => resolve = r)yield await promise - Klesun
@Klesun 你确定那样会起作用吗?如果在等待 Promise 之前 .on('result' 被调用两次会怎么样?我不确定那是否可能发生。 - mpen
嗯,现在想起来了,我也不确定。是啊,谢谢,同步写入数组可能是一个更安全的选择。 - Klesun
.on('error', err => { throw err; }) 会抛出一个错误,无处捕获。 - jcayzac
@jcayzac 不会在这里被捕捉,但是在你调用它的地方可能会被捕捉,对吧? - mpen
@mpen 只有调用您的监听器的代码才能捕获它,即很可能不是您自己的代码。在事件监听器中抛出异常并不能像一个天真的程序员想象的那样做。 - jcayzac

2

这里提供了另一种方法,使用for await循环处理计时器事件,自定义Symbol.asyncIterator和一个简单的队列来进行任何潜在事件缓冲。可在Node和浏览器环境中使用(RunKit, Gist)。

async function main() {
  const emitter = createEmitter();
  const start = Date.now();

  setInterval(() => emitter.emit(Date.now() - start), 1000);

  for await (const item of emitter) {
    console.log(`tick: ${item}`);
  }
}

main().catch(e => console.warn(`caught on main: ${e.message}`));

function createEmitter() {
  const queue = [];
  let resolve;

  const push = p => {
    queue.push(p);
    if (resolve) {
      resolve();
      resolve = null;
    }
  };

  const emitError = e => push(Promise.reject(e));

  return {
    emit: v => push(Promise.resolve(v)),
    throw: emitError,

    [Symbol.asyncIterator]: () => ({
      next: async () => {
        while(!queue.length) {
          await new Promise((...a) => [resolve] = a);
        }
        return { value: await queue.pop(), done: false };
      },
      throw: emitError
    })
  };
}


2

假设我们以redux-saga(因为它在其核心使用生成器)和socket.io作为EventEmitter的示例

import { call, put } from 'redux-saga/effects';

function* listen() {
  yield (function* () {
    let resolve;
    let promise = new Promise(r => resolve = r); // The defer

    socket.on('messages created', message => {
      console.log('Someone created a message', message);
      resolve(message); // Resolving the defer

      promise = new Promise(r => resolve = r); // Recreate the defer for the next cycle
    });

    while (true) {
      const message = yield promise; // Once the defer is resolved, message has some value
      yield put({ type: 'SOCKET_MESSAGE', payload: [message] });
    }
  })();
}

export default function* root() {
    yield call(listen);
}

上述设置应该为您提供一个由事件发射器(socket.io实例)阻塞的生成器。

干杯!


2
用这种方法,如果我向套接字发送大量消息,有些消息在承诺被提交/等待之前被覆盖,那么这是否可能发生? - miThom

0
所以,我努力使它简单且通用。这是我的成果。它应该能够将任何基于回调的事件监听器转换为异步生成器。我确保所有事件都会被准确地发出一次,并按照正确的顺序。
  async function* eventListenerToAsyncGenerator(listenForEvents) {
    const eventResolvers = []
    const eventPromises = [
      new Promise((resolve) => {
        eventResolvers.push(resolve)
      })
    ]

    listenForEvents((event) => {
      eventPromises.push(
        new Promise((resolve) => {
          eventResolvers.push(resolve)
          eventResolvers.shift()!(event)
        })
      )
    })

    while (true) {
      yield await eventPromises.shift()
    }
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接