如何在Python中实现两个独立进程之间的数据交换

3

问题

有两个独立的进程在并行运行,我希望它们能够相互通信。

代码解释

该代码使用 Python2.7 编写。在我的最小化代码中,我使用队列进行进程间通信。进程 p1 在队列中放置数据。进程 p2 从队列中获取数据并对其进行处理。然后进程 p2 将修改后的数据再次放回队列中,并最终进程 p1 再次从队列中获取修改后的数据。修改后的数据必须返回到进程 p1 中,因为该进程实际上是一个 eventlet 服务器,用于发送/接收请求。

代码

#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for back-and-forth data exchange between processes

# common modules
import os
import sys
import time
from multiprocessing import Process
from multiprocessing import Queue
from datetime import datetime

someData = {}

class Load():
    def post(self):
        timestamp = str(datetime.now())
        someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
        queue1.put(someData)        # put into queue
        print "#20 process 1: put in queue1 =>", someData
        time.sleep(3)

        while True:     # queue1 checking loop, comment out the loop if use time.sleep only
            if queue1.empty() == False:
                timestamp = str(datetime.now())
                res = queue1.get()
                res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
                print "#28 get from queue1 =>", res
                break
            else:
                print "#31 queue1 empty"
                time.sleep(1)

        # while True:       # queue2 checking loop
        #   if queue2.empty() == False:
        #       timestamp = str(datetime.now())
        #       res = queue2.get()
        #       res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
        #       print "#39 get from queue2 =>", res
        #       break
        #   else:
        #       print "#42 queue2 empty"
        #       time.sleep(1)

class Unload():
    def get(self):
        try:
            if queue1.empty() == False:
                data = queue1.get()     # retrieve package from queue
                #queue1.close()
                #queue1.join_thread()
                timestamp = str(datetime.now())
                data = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp} 
                print "#54 process 2: get from queue1 =>", data
                self.doSomething(data)  # call method
            else:
                print "#57 queue1 empty"
                pass
        except:
            print "#60 queue1 error"
            pass

    def doSomething(self, data):
        time.sleep(3)
        timestamp = str(datetime.now())
        someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
        self.someData = someData
        print "#68 process 2: do something =>", someData
        self.put()

    def put(self):
        time.sleep(3)
        timestamp = str(datetime.now())
        self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
        print "#75 process 2: put back in queue1 =>", self.someData
        res = self.someData
        queue1.put(res)
        #print "#78 process 2: put back in queue2 =>", self.someData
        #res = self.someData
        #queue2.put(res)
        #queue2.close()
        #queue2.join_thread()

# main 
if __name__ == '__main__':

    queue1 = Queue()
    #queue2 = Queue()

    global p1, p2
    p1 = Process(target=Load().post(), args=(queue1,))      # process p1
    #p1 = Process(target=Load().post(), args=(queue1,queue2,))
    p1.daemon = True
    p1.start() 

    p2 = Process(target=Unload().get(), args=(queue1,))     # process p2
    #p2 = Process(target=Unload().get(), args=(queue1,queue2,))
    p2.start()
    p2.join()

问题

我查看了其他资源,但它们都涉及单向通信。以下是资源列表:

  1. 如何在Python中使用get_nowait而不引发空异常
  2. 如何在Python中使用多进程从特定进程获取数据
  3. 如何在Python中使用多进程队列与锁
  4. multiprocessing模块支持锁
  5. 如何创建一个可暂停和恢复的线程
  6. 如何在两个Python进程之间交换数据

我应该如何让process1等待并从process2检索修改后的数据?我是否应该考虑另一种进程间通信的方法,例如管道、zeroMQ等?

尝试1:在process1中使用time.sleep()而不使用while循环

仅使用time.sleep时,数据会进入队列,但永远无法到达process1的最终目的地。目前为止还不错,但缺少最后一步。以下是结果:

#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:40:30.234466', 'class': 'Load()', 'method': 'post()'}
#54 process 2: get from queue1 => {'process': 'p2', 'timestamp': '2020-02-23 11:40:33.239113', 'class': 'Unload()', 'method': 'get()'}
#68 process 2: do something => {'process': 'p2', 'timestamp': '2020-02-23 11:40:36.242500', 'class': 'Unload()', 'method': 'doSomething()'}
#75 process 2: put back in queue1 => {'process': 'p2', 'timestamp': '2020-02-23 11:40:39.245856', 'class': 'Unload()', 'method': 'put()'}

尝试2:在进程1中使用while循环 通过while循环检查队列,数据进入队列,但是被卡住了,从未到达进程2。结果如下。

#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:46:14.606356', 'class': 'Load()', 'method': 'post()'}
#28 get from queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:46:17.610202', 'class': 'Load()', 'method': 'post()'}
#57 queue1 empty

尝试三:使用两个队列 使用两个队列:queue1从process1到process2,queue2从process2到process1。数据进入queue1,但不会在queue2中返回,它们神秘地消失了。结果如下。

#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:53:39.745177', 'class': 'Load()', 'method': 'post()'}
#42 queue2 empty

----- 更新 20200224: 尝试 4、5 和 6 -----------------------------------------------------------------

尝试 4:使用 manager.Queue() 的两个队列

使用 manager.Queue() 的两个队列:queue1 从 process1 到 process2,queue2 从 process2 到 process1。数据进入 queue1,但不会在 queue2 中返回,它们神秘地消失了。以下是代码和结果。

尝试 4 的代码: #!/usr/bin/python2.7 python2.7 # -- coding: utf-8 -- # 用于串行进程数据交换的脚本

# common modules
import os
import sys
import time
import multiprocessing
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Manager
from datetime import datetime

someData = {}
manager = multiprocessing.Manager()
queue1 = manager.Queue()
queue2 = manager.Queue()

class Load():
    def post(self):
        timestamp = str(datetime.now())
        someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
        queue1.put(someData)        # put into queue
        print "#20 process 1: put in queue1 =>", someData
        time.sleep(3)

        # while True:       # queue1 checking loop
        #   if queue1.empty() == False:
        #       timestamp = str(datetime.now())
        #       res = queue1.get()
        #       res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
        #       print "#28 get from queue1 =>", res
        #       break
        #   else:
        #       print "#31 queue1 empty"
        #       time.sleep(1)

        while True:     # queue2 checking loop
            if queue2.empty() == False:
                timestamp = str(datetime.now())
                res = queue2.get()
                res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
                print "#39 get from queue2 =>", res
                break
            else:
                print "#42 queue2 empty"
                time.sleep(1)

class Unload():
    def get(self):
        try:
            if queue1.empty() == False:
                data = queue1.get()     # retrieve package from queue
                #queue1.close()
                #queue1.join_thread()
                timestamp = str(datetime.now())
                data = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp} 
                print "#54 process 2: get from queue1 =>", data
                self.doSomething(data)  # call method
            else:
                print "#57 queue1 empty"
                pass
        except:
            print "#60 queue1 error"
            pass

    def doSomething(self, data):
        time.sleep(3)
        timestamp = str(datetime.now())
        someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
        self.someData = someData
        print "#68 process 2: do something =>", someData
        self.put()

    def put(self):
        time.sleep(3)
        timestamp = str(datetime.now())
        self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
        res = self.someData
        #print "#75 process 2: put back in queue1 =>", self.someData
        #queue1.put(res)
        print "#78 process 2: put back in queue2 =>", self.someData
        queue2.put(res)
        #queue2.close()
        #queue2.join_thread()

# main 
if __name__ == '__main__':

    manager = multiprocessing.Manager()
    queue1 = manager.Queue()
    queue2 = manager.Queue()

    global p1, p2
    #p1 = Process(target=Load().post(), args=(queue1,))     # process p1
    p1 = Process(target=Load().post(), args=(queue1,queue2,))
    p1.daemon = True
    p1.start() 

    #p2 = Process(target=Unload().get(), args=(queue1,))        # process p2
    p2 = Process(target=Unload().get(), args=(queue1,queue2,))
    p2.start()
    p2.join()

第4次尝试的结果:
#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-24 13:06:17.687762', 'class': 'Load()', 'method': 'post()'}
#42 queue2 empty

尝试5:使用manager.Queue()中的一个队列

使用一个队列,即manager.Queue():从进程1到进程2的queue1,从进程2回到进程1的queue1。数据进入queue1,但在那里被卡住了,它们从未到达进程2。以下是代码结果。

尝试5的代码:

#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for serialized interprocess data exchange

# common modules
import os
import sys
import time
import multiprocessing
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Manager
from datetime import datetime

someData = {}
manager = multiprocessing.Manager()
queue1 = manager.Queue()
#queue2 = manager.Queue()

class Load():
    def post(self):
        timestamp = str(datetime.now())
        someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
        queue1.put(someData)        # put into queue
        print "#25 process 1: put in queue1 =>", someData
        time.sleep(3)

        while True:     # queue1 checking loop
            if queue1.empty() == False:
                timestamp = str(datetime.now())
                res = queue1.get()
                res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
                print "#33 get from queue1 =>", res
                break
            else:
                print "#36 queue1 empty"
                time.sleep(1)

        # while True:       # queue2 checking loop
        #   if queue2.empty() == False:
        #       timestamp = str(datetime.now())
        #       res = queue2.get()
        #       res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
        #       print "#44 get from queue2 =>", res
        #       break
        #   else:
        #       print "#47 queue2 empty"
        #       time.sleep(1)

class Unload():
    def get(self):
        try:
            if queue1.empty() == False:
                data = queue1.get()     # retrieve package from queue
                #queue1.close()
                #queue1.join_thread()
                timestamp = str(datetime.now())
                data = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp} 
                print "#59 process 2: get from queue1 =>", data
                self.doSomething(data)  # call method
            else:
                print "#62 queue1 empty"
                pass
        except:
            print "#65 queue1 error"
            pass

    def doSomething(self, data):
        time.sleep(3)
        timestamp = str(datetime.now())
        someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
        self.someData = someData
        print "#73 process 2: do something =>", someData
        self.put()

    def put(self):
        time.sleep(3)
        timestamp = str(datetime.now())
        self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
        res = self.someData
        print "#81 process 2: put back in queue1 =>", self.someData
        queue1.put(res)
        #print "#83 process 2: put back in queue2 =>", self.someData
        #queue2.put(res)
        #queue2.close()
        #queue2.join_thread()

# main 
if __name__ == '__main__':

    manager = multiprocessing.Manager()
    queue1 = manager.Queue()
    #queue2 = manager.Queue()

    global p1, p2
    p1 = Process(target=Load().post(), args=(queue1,))      # process p1
    #p1 = Process(target=Load().post(), args=(queue1,queue2,))
    p1.daemon = True
    p1.start() 

    p2 = Process(target=Unload().get(), args=(queue1,))     # process p2
    #p2 = Process(target=Unload().get(), args=(queue1,queue2,))
    p2.start()
    p2.join()

第五次尝试的结果:
#25 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-24 14:08:13.975886', 'class': 'Load()', 'method': 'post()'}
#33 get from queue1 => {'process': 'p1', 'timestamp': '2020-02-24 14:08:16.980382', 'class': 'Load()', 'method': 'post()'}
#62 queue1 empty

尝试6:使用队列超时

按照建议,我尝试修正了队列超时。这次方法是从process1到process2的queue1,从process2到process1的queue2。数据进入了queue1,但并没有返回到queue2,它们又神秘地消失了。下面是代码和结果。

尝试6的代码:

#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for serialized interprocess data exchange

# common modules
import os
import sys
import time
import uuid
import Queue
#from Queue import Empty
import multiprocessing
from multiprocessing import Process
#from multiprocessing import Queue
from datetime import datetime

someData = {}

class Load():
    def post(self):
        timestamp = str(datetime.now())
        someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
        queue1.put(someData)        # put into queue
        print "#24 process 1: put in queue1 =>", someData
        time.sleep(3)

        # while True:       # queue1 checking loop
        #   if queue1.empty() == False:
        #       timestamp = str(datetime.now())
        #       res = queue1.get()
        #       res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
        #       print "#33 get from queue1 =>", res
        #       break
        #   else:
        #       print "#36 queue1 empty"
        #       time.sleep(1)

        while True:     # queue2 checking loop
            try:
                someData = queue2.get(True,1)
                timestamp = str(datetime.now())
                someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
                print "#43 process 1: got from queue2 =>", someData
                break
            except Queue.Empty:
                print "#46 process1: queue2 empty"
                continue

class Unload():
    def get(self):
        while True:     # queue2 checking loop
            try:
                someData = queue1.get(True,1)
                timestamp = str(datetime.now())
                someData = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp} 
                print "#56 process2: got from queue1 =>", someData
                break
            except Queue.Empty:
                print "#59 process2: queue1 empty"
                continue
        self.doSomething(someData)  # call method

    def doSomething(self, data):
        time.sleep(3)
        timestamp = str(datetime.now())
        someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
        self.someData = someData
        print "#68 process2: do something =>", someData
        self.put(someData)

    def put(self,data):
        time.sleep(3)
        timestamp = str(datetime.now())
        self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
        someData = self.someData
        #print "#81 process 2: put back in queue1 =>", self.someData
        #queue1.put(res)
        print "#78 process2: put back in queue2 =>", someData
        queue2.put(someData)


# main 
if __name__ == '__main__':

    queue1 = multiprocessing.Queue()
    queue2 = multiprocessing.Queue()

    global p1, p2
    #p1 = Process(target=Load().post(), args=(queue1,))     # process p1
    p1 = Process(target=Load().post(), args=(queue1,queue2,))
    p1.daemon = True
    p1.start() 

    #p2 = Process(target=Unload().get(), args=(queue1,))        # process p2
    p2 = Process(target=Unload().get(), args=(queue1,queue2,))
    p2.start()
    p2.join()

尝试6的结果如下:
#24 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-24 18:14:46.435661', 'class': 'Load()', 'method': 'post()'}
#46 process1: queue2 empty

注意:在不使用类的情况下,建议的方法是有效的。以下是代码:

import uuid
import multiprocessing
from multiprocessing import Process
import Queue


def load(que_in, que_out):
    request = {"id": uuid.uuid4(), "workload": "do_stuff", }
    que_in.put(request)
    print("load: sent request {}: {}".format(request["id"], request["workload"]))
    while True:
        try:
            result = que_out.get(True, 1)
        except Queue.Empty:
            continue
        print("load: got result {}: {}".format(result["id"], result["result"]))


def unload(que_in, que_out):
    def processed(request):
        return {"id": request["id"], "result": request["workload"] + " processed", }
    while True:
        try:
            request = que_in.get(True, 1)
        except Queue.Empty:
            continue
        print("unload: got request {}: {}".format(request["id"], request["workload"]))
        result = processed(request)
        que_out.put(result)
        print("unload: sent result {}: {}".format(result["id"], result["result"]))


    # main
if __name__ == '__main__':

    que_in = multiprocessing.Queue()
    que_out = multiprocessing.Queue()

    p1 = Process(target=load, args=(que_in, que_out))      # process p1
    p1.daemon = True
    p1.start()

    p2 = Process(target=unload, args=(que_in, que_out))     # process p2
    p2.start()
    p2.join()

----- 更新 20200225: 第7次尝试 ------------------------------------------------------------------------------

第7次尝试:在不同类之间使用具有队列超时的共享队列(有效)

在这次尝试中,我使用一个共享队列在不同类的方法之间传递数据,并纠正了超时设置。数据从进程1传输到进程2,然后从进程2返回到进程1。在这次尝试中,数据传输正确。以下是代码和结果。

第7次尝试的代码:

import uuid
import multiprocessing
from multiprocessing import Process
import Queue

class Input():
    def load(self, shared_queue):
        request = {"id": uuid.uuid4(), "workload": "do_stuff", }
        shared_queue.put(request)
        print("load: sent request {}: {}".format(request["id"], request["workload"]))
        while True:
            try:
                result = shared_queue.get(True, 1)
            except Queue.Empty:
                continue
            print("load: got result {}: {}".format(result["id"], result["result"]))
            break

class Output():
    def unload(self, shared_queue):
        def processed(request):
            return {"id": request["id"], "result": request["workload"] + " processed", }
        while True:
            try:
                request = shared_queue.get(True, 1)
            except Queue.Empty:
                continue
            print("unload: got request {}: {}".format(request["id"], request["workload"]))
            result = processed(request)
            shared_queue.put(result)
            print("unload: sent result {}: {}".format(result["id"], result["result"]))


    # main
if __name__ == '__main__':

    shared_queue = multiprocessing.Queue()
    up = Input()
    down = Output()

    p1 = Process(target=up.load, args=(shared_queue,))      # process p1
    p1.daemon = True
    p1.start()


    p2 = Process(target=down.unload, args=(shared_queue,))     # process p2
    p2.start()

    p1.join()
    p2.join()

第7次尝试的结果:
load: sent request a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff
unload: got request a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff
unload: sent result a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff processed
load: got result a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff processed

第三次尝试发生了什么事情? - Anthony Kong
@AnthonyKong,我更新了第三次尝试的解释。 - parovelb
3个回答

2

解决方案:使用一个共享队列

在遵循建议并进行一些调整以正确定位不同类方法后,我解决了这个问题。现在两个独立进程之间的数据来回流动是正确的。对我来说,重要的一点是要特别注意在两个独立进程之间交换的someData包,它必须确保是相同的包。因此,使用标识符输入"id": uuid.uuid4()来检查每次传递是否是相同的包。

#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for back and forth communication between two separate processes using a shared queue

# common modules
import os
import sys
import time
import uuid
import Queue
import multiprocessing
from multiprocessing import Process
from datetime import datetime


someData = {}


class Load():
    def post(self, sharedQueue):
        timestamp = str(datetime.now()) # for timing checking
        someData = {"timestamp":timestamp, "id": uuid.uuid4(), "workload": "do_stuff",}
        self.someData = someData
        sharedQueue.put(someData)       # put into the shared queue
        print("#25 p1 load: sent someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
        time.sleep(1)   # for the time flow     

        while True:     # sharedQueue checking loop
            try:
                time.sleep(1)   # for the time flow
                timestamp = str(datetime.now())
                someData = sharedQueue.get(True,1)
                someData["timestamp"] = timestamp
                print("#37 p1 load: got back someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
                break
            except Queue.Empty:
                print("#37 p1: sharedQueue empty")
                continue
            break


class Unload():
    def get(self, sharedQueue):
        while True:     # sharedQueue checking loop
            try:
                someData = sharedQueue.get(True,1)
                self.someData = someData
                timestamp = str(datetime.now())
                someData["timestamp"] = timestamp
                print("#50 p2 unload: got someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
                break
            except Queue.Empty:
                print("#53 p2: sharedQueue empty")
                continue
        time.sleep(1)               # for the time flow
        self.doSomething(someData)  # pass the data to the method


    def doSomething(self, someData):    # execute some code here
        timestamp = str(datetime.now())
        someData["timestamp"] = timestamp
        print("#62 p2 unload: doSomething {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
        self.put(someData)
        time.sleep(1)   # for the time flow


    def put(self,someData):
        timestamp = str(datetime.now())
        someData["timestamp"] = timestamp
        sharedQueue.put(someData)
        print("#71 p2 unload: put someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
        time.sleep(1)   # for the time flow


# main 
if __name__ == '__main__':

    sharedQueue = multiprocessing.Queue()

    trx = Load()
    rcx = Unload()

    p1 = Process(target=trx.post, args=(sharedQueue,))      # process p1
    p1.daemon = True
    p1.start() 

    p2 = Process(target=rcx.get, args=(sharedQueue,))       # process p2
    p2.start()

    p1.join()
    p2.join()

1

我认为你刚刚错过了队列超时的使用

try:
    result = que_out.get(True, 1)
except queue.Empty:
    continue

这个简化的例子可能会对您有所帮助:

import uuid
from multiprocessing import Process
from multiprocessing import Queue
import queue


def load(que_in, que_out):
    request = {"id": uuid.uuid4(), "workload": "do_stuff", }
    que_in.put(request)
    print("load: sent request {}: {}".format(request["id"], request["workload"]))
    while True:
        try:
            result = que_out.get(True, 1)
        except queue.Empty:
            continue
        print("load: got result {}: {}".format(result["id"], result["result"]))


def unload(que_in, que_out):

    def processed(request):
        return {"id": request["id"], "result": request["workload"] + " processed", }

    while True:
        try:
            request = que_in.get(True, 1)
        except queue.Empty:
            continue
        print("unload: got request {}: {}".format(request["id"], request["workload"]))
        result = processed(request)
        que_out.put(result)
        print("unload: sent result {}: {}".format(result["id"], result["result"]))

    # main
if __name__ == '__main__':

    que_in = Queue()
    que_out = Queue()

    p1 = Process(target=load, args=(que_in, que_out))      # process p1
    p1.daemon = True
    p1.start()

    p2 = Process(target=unload, args=(que_in, que_out))     # process p2
    p2.start()
    p2.join()

输出

load: sent request d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff
unload: got request d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff
unload: sent result d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff processed
load: got result d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff processed

你的这个简化示例,@Man-In wholeEarthMan,在Python3中可以运行,并且通过少量更改也可以在Python2中运行。但是当loadunload方法属于两个不同的类时,我仍然无法运行代码。 - parovelb
什么问题?是在Python2上吗? - Man-In wholeEarthMan
是的@Man-In,代码是用Python 2.7编写的。经过一些小调整,我让它正常工作了。问题出在其他地方,我错误地定位了类的方法。 - parovelb

0

你必须使用经过Manager包装的队列来在进程之间传播更改,否则每个进程都有自己独立的队列对象,无法看到其他队列。Manager为所有子进程创建一个共享队列实例。

因此,queue1 = Queue() 变成了 queue1 = manager.Queue(),在顶部加上 from multiprocessing import Manager。如果你想使用两个队列的方法,显然也必须以同样的方式包装第二个队列。

相关资源:

从一个多进程管理器中使用多个队列

Python文档


在第4和第5次尝试中,我尝试了@adq建议的manager.Queue(),但结果与第2和第3次尝试相同。我已更新我的问题,并附上了第4和第5次尝试的代码和结果。 - parovelb

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接