我正在尝试通过建模银行来理解演员模型的工作原理。首先,这里有一些代码说明为什么我们需要并发系统的模型:
这个理解正确吗?尽管银行在提款时处于休眠状态,但由于数据是在不同的线程上管理的,因此没有同时提款会破坏数据。
import time
from threading import Thread
bank = {'joe': 100}
class Withdrawal(Thread):
"""
Models a concurrent withdrawal for 'joe'. In this example, 'bank'
is a shared resource not protected and accessible from any thread.
Args:
amount (double) how much to withdraw
sleep (bool) config to sleep the thread during the withdrawal
"""
def __init__(self, amount, sleep = False):
self.amount = amount
self.sleep = sleep
Thread.__init__(self)
def run(self):
"""
Overrides method in Thread.
Returns: void
"""
balance = bank['joe']
if balance >= self.amount:
if self.sleep:
time.sleep(5)
bank['joe'] -= self.amount
t1 = Withdrawal(80, True)
t2 = Withdrawal(80)
t1.start()
t2.start()
运行代码后,'joe'
的余额在五秒后应该为 -60
。 这是因为 bank
没有受到并发访问的保护,在并发执行期间暂停五秒钟意味着我们无法保证数据不会在不同状态下被访问。 在这种情况下,第一个线程在第二个线程完成提款后访问银行,但没有检查是否仍然可以提款。 结果账户变成了负数。
如果我们将银行和提款建模为Actor,我们就可以保护对账户的访问,因为它的状态是由与尝试从中提款不同的线程管理的。
from queue import Queue
from threading import Thread
import time
import random
class Actor(Thread):
"""
Models an actor in the actor model for concurrent computation
see https://en.wikipedia.org/wiki/Actor_model for theoretical overview
Args:
handles (dict) mapping of public methods that are callable
on message data after message has been read
"""
def __init__(self, handles):
self.handles = handles
self.mailbox = Queue()
Thread.__init__(self, daemon=True)
def run(self):
"""
Overrides method in Thread. Once the thread has started,
we listen for messages and process one by one when they are received
Returns: void
"""
self.read_messages()
def send(self, actor, message):
"""
Puts a Message in the recipient actor's mailbox
Args:
actor (Actor) to receive message
message (Message) object to send actor
Returns: void
"""
actor.mailbox.put(message)
def read_messages(self):
"""
Reads messages one at a time and calls the target class handler
Returns: void
"""
while 1:
message = self.mailbox.get()
action = message.target
if action in self.handles:
self.handles[action](message.data)
class Message:
"""
Models a message in the actor model
Args:
sender (Actor) instance that owns the message
data (dict) message data that can be consumed
target (string) function in the recipient Actor to we'd like run when read
"""
def __init__(self, sender, data, target):
self.sender = sender
self.data = data
self.target = target
class Bank(Actor):
"""
Models a bank. Can be used in concurrent computations.
Args:
bank (dict) name to amount mapping that models state of Bank
"""
def __init__(self, bank):
self.bank = bank
Actor.__init__(self, {'withdraw': lambda data: self.withdraw(data)})
def withdraw(self, data):
"""
Action handler for 'withdraw' messages. Withdraw
if we can cover the requested amount
Args:
data (dict) message data
Returns: void
"""
name, amount = data['name'], data['amount']
if self.bank[name] >= amount:
if data['sleep']:
time.sleep(2)
self.bank[name] -= amount
class Withdrawal(Actor):
"""
Models a withdrawal. Can be used in concurrent computations.
Args:
bank (Bank) shared resource to transact with
sleep (bool) config to request that the bank sleep during a withdrawal
"""
def __init__(self, bank, sleep=False):
self.bank = bank
self.sleep = sleep
Actor.__init__(self, {})
def withdraw(self, name, amount):
"""
Wrapper for sending a withdrawl message
Args:
name (string) owner of the account in our bank
amount (double) amount we'd like to withdraw
Returns: void
"""
data = {'sleep': self.sleep, 'name': name, 'amount': amount}
Actor.send(self, self.bank, Message(self, data, 'withdraw'))
现在让我们进行测试:
bank = Bank({'joe': 100})
bank.start()
actors = []
for _ in range(100):
a = Withdrawal(bank, random.randint(0, 1))
a.start()
actors.append(a)
for a in actors:
a.withdraw('joe', 15)
这个理解正确吗?尽管银行在提款时处于休眠状态,但由于数据是在不同的线程上管理的,因此没有同时提款会破坏数据。