我有一个使用Python实现的类似于责任链的应用程序。有一个进程通过multiprocessing.Queue()将对象传递给其他进程,这些进程然后对这些对象执行操作。对于传递的对象来说,跟踪其最后修改时间也很重要,因此只有在对象被修改时才会采取措施。
我遇到的问题是,在从队列中提取对象后,对象中的_modified属性似乎会随机更改。但_mtime属性始终正确。下面的示例将运行,并(有意)随机修改DummyObject,然后将其放在队列中以供每个处理程序使用。然后,每个处理程序将打印它们收到的对象中的_modified和_mtime值。我希望在command_func和handler函数中都能看到相同的_modified值,但实际情况通常并非如此。如果我从DummyObject中删除Object_w_mtime继承关系,则不会看到发送和接收对象之间的任何区别。
我对Python比较陌生。据我所知,每次将对象放置在队列中时,它都会被pickle,然后通过管道发送到接收进程进行unpickle。这是正确的吗?在对象被pickled/unpickled时,是否有可能混淆对象继承关系?
我在Ubuntu 11.10上测试了Python 2.7.2和2.6.7,以及在Ubuntu 11.04上测试了Python 2.7.1。有时需要让它运行一分钟左右才能看到行为,因为它似乎是随机的。
在此感谢您的帮助!
我遇到的问题是,在从队列中提取对象后,对象中的_modified属性似乎会随机更改。但_mtime属性始终正确。下面的示例将运行,并(有意)随机修改DummyObject,然后将其放在队列中以供每个处理程序使用。然后,每个处理程序将打印它们收到的对象中的_modified和_mtime值。我希望在command_func和handler函数中都能看到相同的_modified值,但实际情况通常并非如此。如果我从DummyObject中删除Object_w_mtime继承关系,则不会看到发送和接收对象之间的任何区别。
我对Python比较陌生。据我所知,每次将对象放置在队列中时,它都会被pickle,然后通过管道发送到接收进程进行unpickle。这是正确的吗?在对象被pickled/unpickled时,是否有可能混淆对象继承关系?
我在Ubuntu 11.10上测试了Python 2.7.2和2.6.7,以及在Ubuntu 11.04上测试了Python 2.7.1。有时需要让它运行一分钟左右才能看到行为,因为它似乎是随机的。
在此感谢您的帮助!
import multiprocessing
import time
import traceback
import os
import random
class Object_w_mtime(object):
'''
Parent object that tracks the last time an attribute was modified
'''
def __setattr__(self,a_name,a_value):
if ((a_name not in ('_mtime','_modified')) and
(a_value != getattr(self,a_name,None))
):
object.__setattr__(self, '_modified', True)
object.__setattr__(self, '_mtime', time.time())
object.__setattr__(self, a_name, a_value)
return True
#END def
def reset(self):
self._modified = False
#END class
class DummyObject(Object_w_mtime):
def __init__(self):
self.value = 10
def handler(in_queue = None, handler_id = None):
print 'PID:' + str(os.getpid()) + ':handler{0}:<RUN>'.format(handler_id)
while True:
try:
obj = in_queue.get(True,61)
print 'handler{} - _modified'.format(handler_id), obj._modified, ' \t_mtime', obj._mtime
except multiprocessing.queues.Empty:
break
except KeyboardInterrupt:
break
except Exception as e:
print traceback.format_exc()
break
return True
#END def
def command_func(next_links = None):
print 'PID:' + str(os.getpid()) + ':command_func:<RUN>'
obj = DummyObject()
while True:
try:
# randomly assign a different value to test with a modified and unmodified object
obj.value = random.randint(0,1)
print '**************** obj.value = {0} ***************'.format(obj.value)
print 'command_ - _modified', obj._modified, ' \t_mtime', obj._mtime
for each in next_links:
each.put(obj,False)
except multiprocessing.queues.Empty:
break
except KeyboardInterrupt:
break
except Exception as e:
print e
print traceback.format_exc()
break
obj.reset()
time.sleep(3)
return True
#END def
if __name__ == '__main__':
handler_queues = list()
handler_processes = list()
# Create a queue and process object for each command handler
for handler_id in range(1,4):
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=handler, args=(queue, handler_id))
handler_queues.append(queue)
handler_processes.append(process)
try:
# spawn handler processes
for process in handler_processes:
process.start()
# Start sending commands to handlers
command_func(handler_queues)
# exit on keyboard interrupt
except KeyboardInterrupt:
for process in handler_processes:
process.join()
except Exception:
traceback.print_exc()