From bfc93662fbdcbcd4a6a414c46c838edce793cd69 Mon Sep 17 00:00:00 2001 From: lxnay Date: Mon, 26 Jan 2009 09:59:10 +0000 Subject: [PATCH] Entropy/SystemManagerServerInterface: - reduce complexity of the queue processor as first step to debug multi-user threading issues on the daemon git-svn-id: http://svn.sabayonlinux.org/projects/entropy/trunk@2942 cd1c1023-2f26-0410-ae45-c471fc1f0318 --- libraries/entropy.py | 46 ++++++++++++++++++-------------------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/libraries/entropy.py b/libraries/entropy.py index b71bdf44d..2f3ad2b6b 100644 --- a/libraries/entropy.py +++ b/libraries/entropy.py @@ -22725,8 +22725,6 @@ class SystemManagerExecutorServerRepositoryInterface: import pickle self.pickle = pickle - import entropyTools - self.entropyTools = entropyTools self.SystemManagerExecutor = SystemManagerExecutorInstance self.args = args self.kwargs = kwargs @@ -25278,7 +25276,6 @@ class SystemManagerServerInterface(SocketHostInterface): self.handle_executor_command_classes_initialization() self.QueueProcessor = None - self.QueueProcessorParallel = None self.QueueLock = self.threading.Lock() self.PinboardLock = self.threading.Lock() self.do_ssl = do_ssl @@ -25318,7 +25315,7 @@ class SystemManagerServerInterface(SocketHostInterface): ) self.stdout_logging = stdout_logging self.fork_requests = fork_requests - self.load_queue_processors() + self.load_queue_processor() # here we can put anything that must be loaded before the queue processor execution self.play_queue() @@ -25397,11 +25394,9 @@ class SystemManagerServerInterface(SocketHostInterface): with self.PinboardLock: return self.PinboardData.copy() - def load_queue_processors(self): - self.QueueProcessor = self.entropyTools.parallelTask(self.queue_processor, False) + def load_queue_processor(self): + self.QueueProcessor = self.entropyTools.parallelTask(self.queue_processor) self.QueueProcessor.start() - self.QueueProcessorParallel = self.entropyTools.parallelTask(self.queue_processor, True) - self.QueueProcessorParallel.start() def get_stored_queue(self): return self.dumpTools.loadobj(self.queue_file) @@ -25570,14 +25565,14 @@ class SystemManagerServerInterface(SocketHostInterface): return self.ManagerQueue['queue'].pop(queue_id), self.ManagerQueue['queue_order'].pop(idx) return None, None - def queue_processor(self, parallel_mode, fork_data = None): + def _queue_copy_obj(self, obj): + if isinstance(obj,(dict,set,frozenset,)): + return obj.copy() + elif isinstance(obj,(list,tuple,)): + return obj[:] + return obj - def copy_obj(obj): - if isinstance(obj,(dict,set,frozenset,)): - return obj.copy() - elif isinstance(obj,(list,tuple,)): - return obj[:] - return obj + def queue_processor(self, fork_data = None): while 1: @@ -25601,22 +25596,21 @@ class SystemManagerServerInterface(SocketHostInterface): continue with self.QueueLock: - command_data = copy_obj(command_data) + command_data = self._queue_copy_obj(command_data) command_data['processing_ts'] = "%s" % (self.get_ts(),) self.ManagerQueue['processing'][queue_id] = command_data self.ManagerQueue['processing_order'].append(queue_id) self.store_queue() - self.remove_queue_ext_rc(queue_id) + # self.remove_queue_ext_rc(queue_id) try: - if parallel_mode: - t = self.entropyTools.parallelTask(self.queue_processor, False, fork_data = (command_data, queue_id,)) + if command_data.get('do_parallel') and not fork_data: + t = self.entropyTools.parallelTask(self.queue_processor, fork_data = (command_data, queue_id,)) t.start() continue - else: - done, result = self.SystemExecutor.execute_task(command_data) + done, result = self.SystemExecutor.execute_task(command_data) except Exception, e: - if self.QueueLock.locked(): + if self.QueueLock.locked() and not fork_data: self.QueueLock.release() self.entropyTools.printTraceback() done = False @@ -25626,14 +25620,14 @@ class SystemManagerServerInterface(SocketHostInterface): if command_data.has_key('extended_result') and done: try: - command_data['result'], extended_result = copy_obj(result) + command_data['result'], extended_result = self._queue_copy_obj(result) self.store_queue_ext_rc(queue_id, extended_result) except TypeError: done = False command_data['result'] = 'wrong tuple split from queue processor (1)' self.store_queue_ext_rc(queue_id, None) else: - command_data['result'] = copy_obj(result) + command_data['result'] = self._queue_copy_obj(result) if not done: try: @@ -25682,7 +25676,7 @@ class SystemManagerServerInterface(SocketHostInterface): if fork_data: break except: - if self.QueueLock.locked(): + if self.QueueLock.locked() and not fork_data: self.QueueLock.release() raise @@ -25692,8 +25686,6 @@ class SystemManagerServerInterface(SocketHostInterface): SocketHostInterface.killall(self) if self.QueueProcessor != None: self.QueueProcessor.kill() - if self.QueueProcessorParallel != None: - self.QueueProcessorParallel.kill() class SystemManagerClientCommands(EntropySocketClientCommands):