I have an application that implements something like a Chain of Responsibility in Python. There is one process that passes objects via multiprocessing.Queue() to other processes which then perform actions on the objects. It is also important for the last modified time of the object which is passed to be tracked, so action can be taken only when the object was modified.
The problem I am experiencing is that the _modified attribute in the object appears to change randomly after extracting it from the queue. However, the _mtime attribute is always correct. The example below will run and (intentionally) randomly modify the DummyObject, then place it on the Queue for each of the handler processes. Each handler will then print the _modified and _mtime values that they received in the object. I expect the _modified value to be the same in both the command_func and the handler functions, however that is usually not the case. If I remove the Object_w_mtime inheritance from the DummyObject, then I do not see any differences in the sent and received objects.
I'm relatively new to python. To the best of my knowledge what should be happening is each time an object is placed on a queue, it is pickled, then sent over a pipe to the receiving process which unpickles the object. Is that correct? Is there any way that the object inheritance would be messed up when the object is pickled/unpickled?
I tested this with Python 2.7.2 and 2.6.7 on Ubuntu 11.10, as well as python 2.7.1 on Ubuntu 11.04. Sometimes you have to let it run for a minute or so to see the behavior, as it appears to be random.
Grasping at straws here, thanks in advance.
import multiprocessing
import time
import traceback
import os
import random
class Object_w_mtime(object):
'''
Parent object that tracks the last time an attribute was modified
'''
def __setattr__(self,a_name,a_value):
if ((a_name not in ('_mtime','_modified')) and
(a_value != getattr(self,a_name,None))
):
object.__setattr__(self, '_modified', True)
object.__setattr__(self, '_mtime', time.time())
object.__setattr__(self, a_name, a_value)
return True
#END def
def reset(self):
self._modified = False
#END class
class DummyObject(Object_w_mtime):
def __init__(self):
self.value = 10
def handler(in_queue = None, handler_id = None):
print 'PID:' + str(os.getpid()) + ':handler{0}:<RUN>'.format(handler_id)
while True:
try:
obj = in_queue.get(True,61)
print 'handler{} - _modified'.format(handler_id), obj._modified, ' \t_mtime', obj._mtime
except multiprocessing.queues.Empty:
break
except KeyboardInterrupt:
break
except Exception as e:
print traceback.format_exc()
break
return True
#END def
def command_func(next_links = None):
print 'PID:' + str(os.getpid()) + ':command_func:<RUN>'
obj = DummyObject()
while True:
try:
# randomly assign a different value to test with a modified and unmodified object
obj.value = random.randint(0,1)
print '**************** obj.value = {0} ***************'.format(obj.value)
print 'command_ - _modified', obj._modified, ' \t_mtime', obj._mtime
for each in next_links:
each.put(obj,False)
except multiprocessing.queues.Empty:
break
except KeyboardInterrupt:
break
except Exception as e:
print e
print traceback.format_exc()
break
obj.reset()
time.sleep(3)
return True
#END def
if __name__ == '__main__':
handler_queues = list()
handler_processes = list()
# Create a queue and process object for each command handler
for handler_id in range(1,4):
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=handler, args=(queue, handler_id))
handler_queues.append(queue)
handler_processes.append(process)
try:
# spawn handler processes
for process in handler_processes:
process.start()
# Start sending commands to handlers
command_func(handler_queues)
# exit on keyboard interrupt
except KeyboardInterrupt:
for process in handler_processes:
process.join()
except Exception:
traceback.print_exc()