通过建模银行来理解演员模型

8
我正在尝试通过建模银行来理解演员模型的工作原理。首先,这里有一些代码说明为什么我们需要并发系统的模型:
import time

from threading import Thread

bank = {'joe': 100}

class Withdrawal(Thread):
    """
    Models a concurrent withdrawal for 'joe'. In this example, 'bank'
    is a shared resource not protected and accessible from any thread.

    Args:
        amount (double) how much to withdraw
        sleep  (bool)   config to sleep the thread during the withdrawal
    """

    def __init__(self, amount, sleep = False):
        self.amount = amount
        self.sleep  = sleep

        Thread.__init__(self)

    def run(self):
        """
        Overrides method in Thread.

        Returns: void
        """
        balance = bank['joe']
        if balance >= self.amount:
            if self.sleep:
                time.sleep(5)
            bank['joe'] -= self.amount

t1 = Withdrawal(80, True)
t2 = Withdrawal(80)

t1.start()
t2.start()

运行代码后,'joe' 的余额在五秒后应该为 -60。 这是因为 bank 没有受到并发访问的保护,在并发执行期间暂停五秒钟意味着我们无法保证数据不会在不同状态下被访问。 在这种情况下,第一个线程在第二个线程完成提款后访问银行,但没有检查是否仍然可以提款。 结果账户变成了负数。

如果我们将银行和提款建模为Actor,我们就可以保护对账户的访问,因为它的状态是由与尝试从中提款不同的线程管理的。

from queue     import Queue
from threading import Thread

import time
import random

class Actor(Thread):
    """
    Models an actor in the actor model for concurrent computation
    see https://en.wikipedia.org/wiki/Actor_model for theoretical overview

    Args:
        handles (dict) mapping of public methods that are callable
            on message data after message has been read
    """

    def __init__(self, handles):

        self.handles = handles
        self.mailbox = Queue()
        Thread.__init__(self, daemon=True)

    def run(self):
        """
        Overrides method in Thread. Once the thread has started,
        we listen for messages and process one by one when they are received

        Returns: void
        """

        self.read_messages()

    def send(self, actor, message):
        """
        Puts a Message in the recipient actor's mailbox

        Args:
            actor   (Actor)   to receive message
            message (Message) object to send actor

        Returns: void
        """

        actor.mailbox.put(message)

    def read_messages(self):
        """
        Reads messages one at a time and calls the target class handler

        Returns: void
        """

        while 1:
            message = self.mailbox.get()
            action  = message.target
            if action in self.handles:
                self.handles[action](message.data)

class Message:
    """
    Models a message in the actor model

    Args:
        sender (Actor)  instance that owns the message
        data   (dict)   message data that can be consumed
        target (string) function in the recipient Actor to we'd like run when read
    """

    def __init__(self, sender, data, target):
        self.sender = sender
        self.data   = data
        self.target = target

class Bank(Actor):
    """
    Models a bank. Can be used in concurrent computations.

    Args:
        bank (dict) name to amount mapping that models state of Bank
    """

    def __init__(self, bank):
        self.bank = bank
        Actor.__init__(self, {'withdraw': lambda data: self.withdraw(data)})

    def withdraw(self, data):
        """
        Action handler for 'withdraw' messages. Withdraw
        if we can cover the requested amount 

        Args:
            data (dict) message data

        Returns: void
        """

        name, amount = data['name'], data['amount']

        if self.bank[name] >= amount:
            if data['sleep']:
                time.sleep(2)
            self.bank[name] -= amount

class Withdrawal(Actor):
    """
    Models a withdrawal. Can be used in concurrent computations.

    Args:
        bank  (Bank) shared resource to transact with
        sleep (bool) config to request that the bank sleep during a withdrawal
    """

    def __init__(self, bank, sleep=False):
        self.bank = bank
        self.sleep = sleep
        Actor.__init__(self, {})

    def withdraw(self, name, amount):
        """
        Wrapper for sending a withdrawl message

        Args:
            name   (string) owner of the account in our bank
            amount (double) amount we'd like to withdraw

        Returns: void
        """

        data = {'sleep': self.sleep, 'name': name, 'amount': amount}
        Actor.send(self, self.bank, Message(self, data, 'withdraw'))

现在让我们进行测试:

bank = Bank({'joe': 100})
bank.start()

actors = []
for _ in range(100):
    a = Withdrawal(bank, random.randint(0, 1))
    a.start()
    actors.append(a)

for a in actors:
    a.withdraw('joe', 15)

这个理解正确吗?尽管银行在提款时处于休眠状态,但由于数据是在不同的线程上管理的,因此没有同时提款会破坏数据。
2个回答

2

同时提款不再可能发生,这是因为单个Bank线程在Bank.read_messages循环内串行而非并发处理withdraw消息。这意味着sleep命令也是依次执行的;每当银行在提款期间需要休眠时,整个消息队列都会停滞并让出控制权2秒钟。(鉴于Bank的建模行为,这基本上是不可避免的)。


2

如果对于一个对象的访问仅限于单个线程,则通常被认为是线程安全的。

其他参与者不能直接访问银行的存储,而只能发送请求取款的消息,因此更新仅在银行线程中进行,并且原始设计中的检查和设置竞态条件被消除。


Python线程共享全局命名空间,因此为了确保隔离实际上可以使对象线程安全,您必须将对象设置为函数的本地变量。在Actor模型中,这通常意味着您希望公开的唯一对象是某种消息分发器,然后每个Actor都在线程中内部创建和运行另一个函数,并从那里检查分发器的消息。 - Tore Eschliman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接