Python: 在由multiprocess.Process产生的两个线程之间共享变量

3

Python 3.1.2

我在使用 multiprocessing.Process 创建的两个线程之间共享变量时遇到了问题。这是一个简单的布尔变量,应该确定线程是否应该运行或停止执行。下面是三种情况下的简化代码(但使用与我的原始代码相同的机制):

  1. 主类为 threading.Thread 类型,self.is_runningbool 类型 [正常工作]。
  2. 主类为 multiprocess.Process 类型,self.is_runningbool 类型 [不起作用。子线程有自己的 self.is_running 的本地副本而不是共享它]。
  3. 主类为 multiprocess.Process 类型,self.is_runningmultiprocessing.Value("b", True) 类型 [正常工作]。

我想知道为什么会这样,而不是另一种方式。(即为什么第二种情况没有按照我的假设工作)。

测试是在 Python 解释器中进行的:

from testclass import *

d = TestClass()
d.start()
d.stop()

以下是第一点的示例:

import threading
import time
import queue
import multiprocessing

class TestClass(threading.Thread):
def __init__(self):
    threading.Thread.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = True
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

第二点的示例:

import threading
import time
import queue
import multiprocessing


class Test(multiprocessing.Process):
def __init__(self):
    multiprocessing.Process.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = True
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

第3点的示例:

import threading
import time
import queue
import multiprocessing

class TestClass(multiprocessing.Process):
def __init__(self):
    multiprocessing.Process.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = multiprocessing.Value("b", True)
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running.value
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running.value = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()
2个回答

1

线程都属于同一进程,因此它们共享内存。另一个后果是,不同的CPU不能完全同时执行线程,因为一个进程只能被一个CPU选择。

进程有独立的内存空间。一个CPU可以运行一个进程,同时另一个CPU可以运行另一个进程。需要特殊的构造来让进程协作。


1
我知道。但是在这里(第2点),线程是在进程内存空间中创建的。至少应该是这样的,即进程(TestClass)是在该进程中创建的线程的包装器。我没有创建新进程。但是看起来从multiprocessing.Process衍生的线程没有共享它们的内存空间。为什么? 如果你明白我的意思,它似乎是并行于TestClass而不是在它下面创建的线程。 - jaor

0
在第二点中,父进程和子进程都有自己的is_running副本。当您在父进程中调用stop()时,它只会修改父进程中的is_running而不是子进程中的。multiprocessing.Value之所以有效是因为它的内存在两个进程之间共享。
如果您想要一个进程感知队列,请使用multiprocessing.Queue

你能解释一下为什么父进程和子进程有它们自己的 is_running 副本而不是共享一个副本吗?如果父进程是线程类型,为什么情况不同呢? 如果 TestClass 不是 multiprocessing.Process 或者 threading.Thread 类型的话,它会与它的子进程共享 is_running,对吧?如果是这样的话——为什么? - jaor
正如Marco所说,进程不共享内存,但线程会共享。在父进程和子进程中,“is_running”占用不同的内存地址,但在使用线程时占用相同的地址。虽然“Process”和“Thread”看起来相似,但它们并不完全可互换。如果“TestClass”不是“Process”的实例,那么它的成员仍然不会在进程之间自动共享。您必须使用“multiprocessing”中的消息传递或共享原语之一,例如“multiprocessing.Value”来实现这一点。 - user634175

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接