Node.js工作线程共享对象/存储

29

所以,我正在阅读有关Node.js的一些内容,当我遇到Worker Threads时,我感到非常惊讶。

在我看来,拥有线程是一个很大的优势,特别是如果你将它与共享内存访问相结合。正如你可能已经想到的 -> SharedArrayBuffer...

是的,这就是我想到的。所以我脑海中首先想到的是进行一些简单的测试,并尝试实现一个简单的store(目前只是一个简单的对象),该对象将在线程之间共享。

问题是,(除非我在这里漏掉了什么)如何使用SharedArrayBuffer使对象能够从n个线程中访问?

我知道对于一个简单的Uint32Array是可行的,但是对于对象,可以做些什么呢?

起初,我想将其转换为Uint32Array,就像下面所示,但甚至看着该死的源代码也让我想哭...

const {
    Worker,
    isMainThread,
    workerData
} = require('worker_threads');

const store = {
    ks109: {
        title: 'some title 1',
        description: 'some desciption 1',
        keywords: ['one', 'two']
    },
    ks110: {
        title: 'some title 2',
        description: 'some desciption 2',
        keywords: ['three', 'four']
    },
    ks111: {
        title: 'some title 3',
        description: 'some desciption 3',
        keywords: ['five', 'six']
    }
}

const shareObject = (obj) => {

    let OString = JSON.stringify(obj);
    let SABuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * OString.length);
    let sArray = new Int32Array(SABuffer);

    for (let i = 0; i < OString.length; i++) {
        sArray[i] = OString.charCodeAt(i);
    }

    return sArray;

}

if (isMainThread) {

    const sharedStore = shareObject(store);

    console.log('[Main][Data Before]:', sharedStore.slice(-10));

    let w = new Worker(__filename, {
        workerData: sharedStore
    });

    w.on('message', (msg) => {
        console.log(`[Main][Thread message]: ${msg}`);
    });
    w.on('error', (err) => {
        console.error(`[Main][Thread error] ${err.message}`);
    });
    w.on('exit', (code) => {
        if (code !== 0) {
            console.error(`[Main][Thread unexpected exit]: ${code}`);
        }
    });

    setInterval(() => {
        // At some point you ll see a value change,
        // it's 'six' becoming 'sax' (big time!) 
        console.log('[Main][Data Check]:', sharedStore.slice(-10));
    }, 1000);

} else {

    let str = String.fromCharCode.apply(this, workerData);
    let obj = JSON.parse(str);

    obj.ks111.keywords[1] = 'sax'; // big time!

    let OString = JSON.stringify(obj);

    for (let i = 0; i < OString.length; i++) {
        workerData[i] = OString.charCodeAt(i);
    }

}

总之,在Node.js 10.5.0中,线程之间是否可以共享对象?如何实现?


根据我所了解的,唯一可以直接共享的内存是SharedArrayBuffer,它只包含原始二进制数据(而不是对象)。因此,我不知道如何在线程之间直接共享对象。工作线程似乎类似于浏览器中的Web Worker,在那里没有共享常规变量,它们必须通过postMessage()与其他代码通信(出于同步原因)。 - jfriend00
2
将一段 JSON 放入 SharedArrayBuffer 中似乎并没有太大的帮助,除了共享一个字符串。每个工作线程都必须将 JSON 解析为自己独立的非共享对象。这种级别的共享可以通过向每个工作线程发布消息来完成,以便在任何人想要更改数据时,所有工作线程都有相同的数据副本。 - jfriend00
在Node中与sharedObjects最接近的答案可能在这里 - https://dev59.com/-oLba4cB1Zd3GeqPkNhO - Pruthvi Kumar
可能会感兴趣的是 comlink - Kevin
3个回答

25

ECMA Script没有共享对象,但它有SharedArrayBuffer。您可以使用DataView和包装器自己编写在缓冲区中直接写入数据的行为:

// Shared value
class SharedPoint {
  constructor(array) {
    this.dataview = new DataView(array);
  }

  set x(value) {
    this.dataview.setUint8(0, value);
  }

  set y(value) {
    this.dataview.setUint8(1, value);
  }

  get x() {
    return this.dataview.getUint8(0);
  }

  get y() {
    return this.dataview.getUint8(1);
  }
}

// Usage

const buffer = new SharedArrayBuffer(2);

// Create two instances of shared point.
const point0 = new SharedPoint(buffer);
const point1 = new SharedPoint(buffer);

// Get initial values for point #1
console.log('x', point1.x); // 0
console.log('y', point1.y); // 0

// Update point #0
point0.x = 64;
point0.y = 32;

// Get changes in point #1
console.log('x', point1.x); // 64
console.log('y', point1.y); // 32

你能创建能够操作字符串或类似C语言结构体的类。而 struct.js。而 SharedArrayBuffer 是可转移的对象,可以在 worker 和主进程之间共享。

⚠️ 注意由于 Spectre攻击,所有主流浏览器都禁用了SharedArrayBuffer功能,并已重新启用。尽管API成熟,但其支持可能低于人们的预期。请在 can i use 网站上查看浏览器支持情况。


有没有工具可以将JS对象转换为SharedArrayBuffer - João Pimentel Ferreira
有任何与NodeJS相关的内容吗? - João Pimentel Ferreira
1
@JoãoPimentelFerreira 如果你想转换任何js对象,那么最好的方法是将其转换为json字符串,然后将该字符串转换为缓冲区,然后在另一端进行反向操作。 - Taureon

0
没有原生的解决方案,但是你可以创建一个由SharedArrayBuffer支持的序列化器/反序列化器。
你可以查看这个库 - 一个我还没有发布的库 - 它接受一个模式(对象结构),并将其序列化(反序列化)为ArrayBuffer或SharedArrayBuffer。这可以用于在主线程上进行序列化,在工作线程上进行反序列化,通过传递缓冲区来实现。它支持对象的固定数组和嵌套对象。

你是否对这个解决方案进行了与标准的postMessage对象复制进行对比测试?是否有显著的改进? - dfilkovi

-1
代码可以像下面这样编写。可以看到,在 parentPort.postMessage(sArray) 后,worker 中的对象被更改了。这表明线程使用带有 SharedArrayBuffer 的共享内存。
const {
    Worker,
    isMainThread,
    workerData,
    parentPort
} = require('worker_threads');

const store = {
    ks109: {
        title: 'some title 1',
        description: 'some desciption 1',
        keywords: ['one', 'two']
    },
    ks110: {
        title: 'some title 2',
        description: 'some desciption 2',
        keywords: ['three', 'four']
    },
    ks111: {
        title: 'some title 3',
        description: 'some desciption 3',
        keywords: ['five', 'six']
    }
}

if (isMainThread) {
    let w = new Worker(__filename, {
        workerData: store
    });

    w.on('message', (data) => {
        console.log("Received message from worker");
        const strArr = []
        for(let i = 0; i < data.byteLength; i++){
            strArr.push(String.fromCharCode(data.getUint8(i)));
        }
        console.log(JSON.parse(strArr.join("")))
    });
    w.on('error', (err) => {
        console.error(`[Main][Thread error] ${err.message}`);
    });
    w.on('exit', (code) => {
        if (code !== 0) {
            console.error(`[Main][Thread unexpected exit]: ${code}`);
        }
    });

} else {
    let OString = JSON.stringify(workerData);
    let SABuffer = new SharedArrayBuffer(OString.length);
    let sArray = new DataView(SABuffer);
    for (let i = 0; i < OString.length; i++) {
        sArray.setUint8(i,OString.charCodeAt(i))
    }
    parentPort.postMessage(sArray);
    let index1 = OString.indexOf("ks111");
    const key1SubString = OString.substring(index1);
    let index2 = key1SubString.indexOf("keywords");
    const key2SubString = key1SubString.substring(index2);
    let index3 = key2SubString.indexOf("six");
    const newIndex = index1+index2+index3+1;
    sArray.setUint8(newIndex,'a'.charCodeAt());
}

这段代码似乎在线程之间没有共享任何内存或对象,它依赖于postMessage()来传输数据。 - Bogdan Ionitza

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接