Node.js - 在工作线程和主线程之间发送流数据

8

我一直在尝试将程序中的某些工作分离到不同的线程中。 其中一个函数需要将流返回到主线程,但我遇到了以下异常:

Error
    at MessagePort.<anonymous> ([worker eval]:12:16)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)
From previous event:
    at PoolWorker.work (node_modules/node-worker-threads-pool/src/pool-worker.js:22:12)
    at DynamicPool.runTask (node_modules/node-worker-threads-pool/src/pool.js:110:47)
    at DynamicPool.exec (node_modules/node-worker-threads-pool/src/dynamic-pool.js:51:17)
    at renderToPdf (src/modules/templates/render2.js:27:14)
    at Context.<anonymous> (test/modules/templates/render.test.js:185:68)

我尝试构建一个最小化的示例来重现我想要实现的内容。基本上,我需要将一个可读流发送回主线程。在这个示例中,我还遇到了一个异常:
为了拥有一个工作线程池,我正在使用库node-worker-threads-pool,具体来说是DynamicPool。而在其中,我正在尝试将html转换为PDF。但我需要以某种方式将流返回给主线程。
const os = require('os');
const { DynamicPool } = require('node-worker-threads-pool');

const Pool = new DynamicPool(os.cpus().length);

async function convertToPDF(html) {
  return await Pool.exec({
    task: function() {
      const Promise = require('bluebird');
      const pdf = require('html-pdf');

      const { html } = this.workerData;

      const htmlToPdf = (html, renderOptions) => {
        const options = {
          format: 'Letter',
        };
        return pdf.create(html, Object.assign(options, renderOptions || {}));
      };

      return Promise.fromNode((cb) => htmlToPdf(html, {}).toStream(cb));
    },
    workerData: {
      html,
    },
  });
}

convertToPDF('<div>Hello World!</div>')
  .then((resp) => console.log('resp', resp))
  .catch((err) => console.error('err', err));

err DataCloneError: function() {
    if (this.autoClose) {
      this.destroy();
    }
  } could not be cloned.
    at MessagePort.<anonymous> ([worker eval]:12:16)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)

你有想法如何实现这个吗?

PS: 我知道在工作线程中IO操作不如在nodejs主线程中高效,但我需要这样做来避免用这些操作锁定主线程。

3个回答

4

简短回答:你不能。

在node中,IPC通过一些黑盒子来处理。我们知道的是,在发送之前消息对象被序列化,在接收时进行反序列化。你不能序列化一个Stream,因为它基于底层级别(套接字,文件描述符,自定义读写函数等),这些无法序列化/反序列化。

所以你只能交换可序列化的数据。

看看html-pdf,我认为转换您的程序的一种简单方法是使用pdf.toBuffer:不要尝试将Stream发送到主线程并在主线程中读取以获取Buffer,而应该将Buffer发送到主线程,然后直接使用它。

希望能有所帮助。


3

不要试图将Stream传递给主线程,为什么不直接使用管道?

创建一个在它们之间共享的MessageChannel
父进程实现Readable,包装其监听.on('message')MessagePort
线程有一个接口,实现Writable并基本上将write()中的任何数据直接传递给.postMessage()

不要忘记在write()周围添加更多的实现以获取其返回值。我总是会在子线程的Writable中返回false,并使用postMessage()将主线程的Readable中的所有'drain'事件转发回子线程(并在主线程的可写流从我们的可读流那里返回true时强制/伪造一个)

现在你有一种方法可以流式传输到主线程。将要从子线程发送的原始“Stream”简单地.pipe()到您的“Writable”中。在主线程中,只需从您的“Readable”中读取或像真正拥有它一样进行管道传输即可。

1

我有类似的需求。我已经构建了一个线程池,使用共享内存在线程之间发送数据。下一步是添加流。这样做有两个原因:1)处理任何大小的数据(与静态分配缓冲区相比),2)减少内存利用率(即最大块大小的静态缓冲区按顺序使用)。以下存储库接近我所需的内容,但我必须在两个方向上进行控制,并控制线程何时创建/销毁等https://github.com/pinojs/thread-stream。我认为您的要求也是反向的。尽管如此,这可能有助于找到解决方案。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接