如何在Node.js中锁定被多个异步方法共享的对象?

22

我在nodejs中有一个对象,其中包含不同的属性。有许多异步函数访问并修改该对象,并且执行方式非常复杂。单个异步函数可能具有内部回调(或异步函数),这些回调可能需要一些时间才能执行,然后该函数将修改该对象。 我想锁定该对象,直到我完成所有修改,只有在此之后,任何其他异步函数才能访问它。

示例:

var machineList = {};

function operation1() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}
function operation2() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}
function operation3() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}
function operation4() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}

假设 machineList 是一个复杂的对象,并且有不同的异步方法 (operation1(), operation2(), ...) 对它进行修改。这些操作按任意顺序和次数调用,以响应来自客户端的请求。每个请求将执行单个操作。

在每个操作函数中都有一些内部闭包函数和回调(或异步函数),它可能需要一些时间。但是我希望在任何单个操作完成之前锁定 machineList 对象。

在任何操作开始时,我想像 waitForMachineList() 一样锁定对象,并在 leaveFromMachineList() 之后释放锁。

因此,我最终想在nodejs中实现锁定机制。就像C++中的关键会话和C#中的锁定一样。

请问有人可以帮助我在nodejs中实现这个吗?或者建议我使用哪个node模块。

3个回答

37
我使用了Node.js模块async-lock来完成锁定操作。现在我可以实现问题中提到的目标。
示例:
var AsyncLock = require('async-lock');
var lock = new AsyncLock();

function operation1() {
    console.log("Execute operation1");
    lock.acquire("key1", function(done) {
        console.log("lock1 enter")
        setTimeout(function() {
            console.log("lock1 Done")
            done();
        }, 3000)
    }, function(err, ret) {
        console.log("lock1 release")
    }, {});
}

function operation2() {
    console.log("Execute operation2");
    lock.acquire("key1", function(done) {
        console.log("lock2 enter")
        setTimeout(function() {
            console.log("lock2 Done")
            done();
        }, 1000)
    }, function(err, ret) {
        console.log("lock2 release")
    }, {});
}

function operation3() {
    console.log("Execute operation3");
    lock.acquire("key1", function(done) {
        console.log("lock3 enter")
        setTimeout(function() {
            console.log("lock3 Done")
            done();
        }, 1)
    }, function(err, ret) {
        console.log("lock3 release")
    }, {});
}operation1(); operation2(); operation3();

输出:

执行操作1

进入锁1

执行操作2

执行操作3

锁1完成

释放锁1

进入锁2

锁2完成

释放锁2

进入锁3

锁3完成

释放锁3


2

我创建了一个简化版的async-lock,想要在这里分享给那些想要了解如何技术实现这样东西或者不想添加新包的人。

const createLock = () => {
  const queue = [];
  let active = false;
  return (fn) => {
    let deferredResolve;
    let deferredReject;
    const deferred = new Promise((resolve, reject) => {
      deferredResolve = resolve;
      deferredReject = reject;
    });
    const exec = async () => {
      await fn().then(deferredResolve, deferredReject);
      if (queue.length > 0) {
        queue.shift()();
      } else {
        active = false;
      }
    };
    if (active) {
      queue.push(exec);
    } else {
      active = true;
      exec();
    }
    return deferred;
  };
};

你可以像这样使用该函数:
function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

const lock = createLock();

function test(id, ms) {
  lock(async () => {
    console.log(id, "start");
    await sleep(ms);
    console.log(id, "end");
  });
}

test(1, 400);
test(2, 300);
test(3, 200);
test(4, 100);

// Output:
// 1 start
// 1 end
// 2 start
// 2 end
// 3 start
// 3 end
// 4 start
// 4 end

但请注意,传递的函数必须是一个 Promise。


0

我写了这段代码来解决类似的问题。

我认为这是一种经典的数据库事务模型。它只实现了锁定机制,而没有回滚。它的工作方式如下:

  1. 您按照应该调用的顺序添加一堆同步方法
  2. 这些方法将在N毫秒内被调用
  3. 只会有一个请求获取DB数据(或任何其他需要修改的数据)和一个请求保存DB数据
  4. 每个方法都将接收到“最新数据”的引用,即处理程序№1更改的数据对象将由处理程序№2接收。顺序取决于№1的顺序

它适用于浏览器。它应该也适用于Node.js,但我没有测试过。

请注意,因为每个方法都将接收到对象的引用,所以您应该修改实际引用,如果您想读取它并返回某些值,则复制它,不要返回该引用,因为该引用的值可能会在未来被未来的处理程序更改。

TRANSACTION将在执行之前等待N毫秒(默认为250)。这定义了哪些方法将分组为单个事务。您还可以进行即时调用。

以下是代码:

let TIMER_ID = 0;
let LOCK = Promise.resolve();
let RESOLVE_LOCK = undefined;
let FUNC_BUFFER = [];
let SUCCESS_BUFFER = [];
let FAIL_BUFFER = [];


/**
 * Gets DB data. 
 */
async function get() {
    return {
        key1: "value1",
        key2: {
            key3: "value2"
        }
    };
}

/**
 * Sets new data in DB.
 */
async function set(value) {
    return;
}


/**
 * Classic database transaction implementation.
 *
 * You adding bunch of methods, every method will be
 * executed with valid instance of data at call time,
 * after all functions end the transaction will end.
 * If any method failed, then entire transaction will fail
 * and no changes will be written. If current transaction is
 * active, new one will be not started until end of previous one.
 *
 * In short, this transaction have ACID properties.
 *
 * Operations will be auto grouped into separate transactions
 * based on start timeout, which is recreated every time on
 * operation call.
 *
 * @example
 * ```
 * // Without transaction:
 * create("1", 123)
 * update("1", 321)
 * read("1") => 123 // because `update` is async
 *
 * // With transaction:
 * create("1", 123)
 * update("1", 321)
 * read("1") => 321
 *
 * // Without transaction:
 * create("v", true)
 * update("v", false) // throws internal error,
 *                    // "v" will be created
 * read("v") => true // because `update` is async
 * update("v", false) // will update because `update` is async
 *
 * // With transaction:
 * create("v", true)
 * update("v", false) // throws internal error,
 *                    // entire transaction will throw error,
 *                   // "v" will be not created
 * read("v") => true // entire transaction will throw error
 * update("v", false) // entire transaction will throw error
 * ```
 *
 * @example
 * ```
 * // Transaction start
 * create()
 * update()
 * update()
 * remove()
 * // Transaction end
 *
 * // Transaction start
 * create()
 * update()
 * sleep(1000)
 * // Transaction end
 *
 * // Transaction start
 * update()
 * remove()
 * // Transaction end
 * ```
 */
const TRANSACTION = {
    /**
     * Adds function in transaction.
     *
     * NOTE:
     * you shouldn't await this function, because
     * it only represents transcation lock, not
     * entire transaction end.
     *
     * @param f
     * Every function should only modify passed state, don't
     * reassign passed state and not save changes manually!
     * @param onSuccess
     * Will be called on entire transaction success.
     * @param onFail
     * Will be called on entire transaction fail.
     * @param startTimeout
     * Passed `f` will be added in current transaction,
     * and current transaction will be called after
     * `startTimeout` ms if there will be no more `f` passed.
     * Default value is recommended.
     */
    add: async function(
        f,
        onSuccess,
        onFail,
        startTimeout
    ) {
        await LOCK;

        window.clearTimeout(TIMER_ID);
        FUNC_BUFFER.push(f);

        if (onSuccess) {
            SUCCESS_BUFFER.push(onSuccess);
        }

        if (onFail) {
            FAIL_BUFFER.push(onFail);
        }

        if (startTimeout == null) {
            startTimeout = 250;
        }

        TIMER_ID = window.setTimeout(() => {
            TRANSACTION.start();
        }, startTimeout);

        console.debug("Added in transaction");
    },
    start: async function() {
        LOCK = new Promise((resolve) => {
            RESOLVE_LOCK = resolve;
        });
        console.debug("Transaction locked");

        let success = true;

        try {
            await TRANSACTION.run();
        } catch (error) {
            success = false;
            console.error(error);
            console.warn("Transaction failed");
        }

        if (success) {
            for (const onSuccess of SUCCESS_BUFFER) {
                try {
                    onSuccess();
                } catch (error) {
                    console.error(error);
                }
            }
        } else {
            for (const onFail of FAIL_BUFFER) {
                try {
                    onFail();
                } catch (error) {
                    console.error(error);
                }
            }
        }

        FUNC_BUFFER = [];
        SUCCESS_BUFFER = [];
        FAIL_BUFFER = [];
        RESOLVE_LOCK();

        console.debug("Transaction unlocked");
    },
    run: async function() {
        const data = await get();
        const state = {
            value1: data.key1,
            value2: data.key2
        };

        for (const f of FUNC_BUFFER) {
            console.debug("Transaction function started");
            f(state);
            console.debug("Transaction function ended");
        }

        await set({
            key1: state.value1,
            key2: state.value2
        });
    }
}

示例 № 1:

/**
 * Gets DB data. 
 */
async function get() {
    return {
        key1: "value1",
        key2: {
            key3: "value2"
        }
    };
}

/**
 * Sets new data in DB.
 */
async function set(value) {
    console.debug("Will be set:", value);

    return;
}


new Promise(
    (resolve) => {
        TRANSACTION.add(
            (data) => {
                data.value2.key3 = "test1";
            },
            () => console.debug("success № 1")
        );
        TRANSACTION.add(
            (data) => {
                const copy = {
                    ...data.value2
                };
                
                resolve(copy);
            },
            () => console.debug("success № 2")
        );
        TRANSACTION.add(
            (data) => {
                data.value1 = "test10";
                data.value2.key3 = "test2";
            },
            () => console.debug("success № 3")
        );
    }
)
.then((value) => {
    console.debug("result:", value);
});

/* Output:

Added in transaction
Added in transaction
Added in transaction
Transaction locked
Transaction function started
Transaction function ended
Transaction function started
Transaction function ended
Transaction function started
Transaction function ended
Will be set: {key1: 'test10', key2: {key3: 'test2'}}
result: {key3: 'test1'}
success № 1
success № 2
success № 3
Transaction unlocked 

*/

示例2:

TRANSACTION.add(
    () => {
        console.log(1);
    }
);
TRANSACTION.add(
    () => {
        console.log(2);
    },
    undefined,
    undefined,
    0 // instant call
);


/* Output:

16:15:34.715 Added in transaction
16:15:34.715 Added in transaction
16:15:34.717 Transaction locked
16:15:34.717 Transaction function started
16:15:34.718 1
16:15:34.718 Transaction function ended
16:15:34.718 Transaction function started
16:15:34.718 2
16:15:34.718 Transaction function ended
16:15:34.719 Transaction unlocked

*/

示例3:

TRANSACTION.add(
    () => {
        console.log(1);
    }
);
TRANSACTION.add(
    () => {
        console.log(2);
    },
    undefined,
    undefined,
    0 // instant call
);

await new Promise((resolve) => {
    window.setTimeout(() => {
        resolve();
    }, 1000);
});

TRANSACTION.add(
    () => {
        console.log(3);
    }
);


/* Output:

16:19:56.840 Added in transaction
16:19:56.840 Added in transaction
16:19:56.841 Transaction locked
16:19:56.841 Transaction function started
16:19:56.842 1
16:19:56.842 Transaction function ended
16:19:56.842 Transaction function started
16:19:56.842 2
16:19:56.842 Transaction function ended
16:19:56.842 Transaction unlocked
16:19:57.840 Added in transaction
16:19:58.090 Transaction locked
16:19:58.090 Transaction function started
16:19:58.090 3
16:19:58.091 Transaction function ended
16:19:58.091 Transaction unlocked

*/

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接