原子操作是一组“全有或全无”的小操作。下面我们来看看。
let i=0;
i++
i++
实际上包含三个步骤:
- 读取当前
i
的值
- 将
i
加 1
- 返回旧值
如果有两个线程执行相同的操作会发生什么?它们都可以同时读取同一个值 1
并在完全相同的时间内将其递增。
但是 JavaScript 不是单线程的吗?
是的!JavaScript 确实是单线程的,但浏览器/Node 允许在并行中使用多个 JavaScript 运行时(Worker Threads、Web Workers)。
Chrome 和 Node(基于 v8)为每个线程创建了一个Isolate,它们都在自己的 context
中运行。
它们唯一可以共享内存
的方式是通过 ArrayBuffer
/ SharedArrayBuffer
下面程序的输出结果是什么?
在 node > =10 上运行(您可能需要使用 --experimental_worker
标志)
node example.js
const { isMainThread, Worker, workerData } = require('worker_threads');
if (isMainThread) {
const shm = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
process.on('exit', () => {
const res = new Int32Array(shm);
console.log(res[0]);
});
Array(5).fill(null).map(() => new Worker(__filename, { workerData: shm }));
} else {
const arr = new Int32Array(workerData);
for (let i = 0; i < 500000; i++) {
arr[i]++;
}
}
输出结果可能是2,500,000,但我们并不确定,在大多数情况下,它不会是2.5M,实际上,你获得相同输出的机会非常低,作为程序员,我们肯定不喜欢我们不知道最终结果的代码。
这是竞争条件的一个例子,其中n个线程互相竞争,并且没有同步。
现在出现了原子操作,它允许我们从开始到结束进行算术运算。
让我们稍微改变一下程序,然后运行:
const { isMainThread, Worker, workerData } = require('worker_threads');
if (isMainThread) {
const shm = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
process.on('exit', () => {
const res = new Int32Array(shm);
console.log(res[0]);
});
Array(5).fill(null).map(() => new Worker(__filename, { workerData: shm }));
} else {
const arr = new Int32Array(workerData);
for (let i = 0; i < 500000; i++) {
Atomics.add(arr, 0, 1);
}
}
现在输出将始终为2,500,000
奖励,使用原子操作的互斥锁
有时候,我们希望只有一个线程可以同时访问某个操作,让我们看一下下面的类。
class Mutex {
static once(mutex, resource, onceFlagCell, cb) {
if (Atomics.load(resource, onceFlagCell) === 1) {
return;
}
mutex.lock();
if (Atomics.load(resource, onceFlagCell) === 1) {
mutex.unlock();
return;
}
cb(() => {
Atomics.store(resource, onceFlagCell, 1);
mutex.unlock();
});
}
constructor(resource, cell) {
this.resource = resource;
this.cell = cell;
this.lockAcquired = false;
}
lock() {
if (this.lockAcquired) {
console.warn('you already acquired the lock you stupid');
return;
}
const { resource, cell } = this;
while (true) {
if (Atomics.load(resource, cell) > 0) {
while ('ok' !== Atomics.wait(resource, cell, 0));
}
const countOfAcquiresBeforeMe = Atomics.add(resource, cell, 1);
if (countOfAcquiresBeforeMe >= 1) {
Atomics.sub(resource, cell, 1);
continue;
}
this.lockAcquired = true;
return;
}
}
unlock() {
if (!this.lockAcquired) {
console.warn('you didn\'t acquire the lock you stupid');
return;
}
Atomics.sub(this.resource, this.cell, 1);
Atomics.notify(this.resource, this.cell, 1);
this.lockAcquired = false;
}
}
现在,您需要分配
SharedArrayBuffer
并在所有线程之间共享它们,并确保每次只有1个线程进入
critical section
。
使用node > 10运行
node --experimental_worker example.js
const { isMainThread, Worker, workerData, threadId } = require('worker_threads');
const { promisify } = require('util');
const doSomethingFakeThatTakesTimeAndShouldBeAtomic = promisify(setTimeout);
if (isMainThread) {
const shm = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
Array(5).fill(null).map(() => new Worker(__filename, { workerData: shm }));
} else {
(async () => {
const arr = new Int32Array(workerData);
const mutex = new Mutex(arr, 0);
mutex.lock();
console.log(`[${threadId}] ${new Date().toISOString()}`);
await doSomethingFakeThatTakesTimeAndShouldBeAtomic(1000);
mutex.unlock();
})();
}