Entropy/SystemManagerServerInterface:
- reduce complexity of the queue processor as first step to debug multi-user threading issues on the daemon git-svn-id: http://svn.sabayonlinux.org/projects/entropy/trunk@2942 cd1c1023-2f26-0410-ae45-c471fc1f0318
This commit is contained in:
+19
-27
@@ -22725,8 +22725,6 @@ class SystemManagerExecutorServerRepositoryInterface:
|
||||
import pickle
|
||||
self.pickle = pickle
|
||||
|
||||
import entropyTools
|
||||
self.entropyTools = entropyTools
|
||||
self.SystemManagerExecutor = SystemManagerExecutorInstance
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
@@ -25278,7 +25276,6 @@ class SystemManagerServerInterface(SocketHostInterface):
|
||||
self.handle_executor_command_classes_initialization()
|
||||
|
||||
self.QueueProcessor = None
|
||||
self.QueueProcessorParallel = None
|
||||
self.QueueLock = self.threading.Lock()
|
||||
self.PinboardLock = self.threading.Lock()
|
||||
self.do_ssl = do_ssl
|
||||
@@ -25318,7 +25315,7 @@ class SystemManagerServerInterface(SocketHostInterface):
|
||||
)
|
||||
self.stdout_logging = stdout_logging
|
||||
self.fork_requests = fork_requests
|
||||
self.load_queue_processors()
|
||||
self.load_queue_processor()
|
||||
# here we can put anything that must be loaded before the queue processor execution
|
||||
self.play_queue()
|
||||
|
||||
@@ -25397,11 +25394,9 @@ class SystemManagerServerInterface(SocketHostInterface):
|
||||
with self.PinboardLock:
|
||||
return self.PinboardData.copy()
|
||||
|
||||
def load_queue_processors(self):
|
||||
self.QueueProcessor = self.entropyTools.parallelTask(self.queue_processor, False)
|
||||
def load_queue_processor(self):
|
||||
self.QueueProcessor = self.entropyTools.parallelTask(self.queue_processor)
|
||||
self.QueueProcessor.start()
|
||||
self.QueueProcessorParallel = self.entropyTools.parallelTask(self.queue_processor, True)
|
||||
self.QueueProcessorParallel.start()
|
||||
|
||||
def get_stored_queue(self):
|
||||
return self.dumpTools.loadobj(self.queue_file)
|
||||
@@ -25570,14 +25565,14 @@ class SystemManagerServerInterface(SocketHostInterface):
|
||||
return self.ManagerQueue['queue'].pop(queue_id), self.ManagerQueue['queue_order'].pop(idx)
|
||||
return None, None
|
||||
|
||||
def queue_processor(self, parallel_mode, fork_data = None):
|
||||
def _queue_copy_obj(self, obj):
|
||||
if isinstance(obj,(dict,set,frozenset,)):
|
||||
return obj.copy()
|
||||
elif isinstance(obj,(list,tuple,)):
|
||||
return obj[:]
|
||||
return obj
|
||||
|
||||
def copy_obj(obj):
|
||||
if isinstance(obj,(dict,set,frozenset,)):
|
||||
return obj.copy()
|
||||
elif isinstance(obj,(list,tuple,)):
|
||||
return obj[:]
|
||||
return obj
|
||||
def queue_processor(self, fork_data = None):
|
||||
|
||||
while 1:
|
||||
|
||||
@@ -25601,22 +25596,21 @@ class SystemManagerServerInterface(SocketHostInterface):
|
||||
continue
|
||||
|
||||
with self.QueueLock:
|
||||
command_data = copy_obj(command_data)
|
||||
command_data = self._queue_copy_obj(command_data)
|
||||
command_data['processing_ts'] = "%s" % (self.get_ts(),)
|
||||
self.ManagerQueue['processing'][queue_id] = command_data
|
||||
self.ManagerQueue['processing_order'].append(queue_id)
|
||||
self.store_queue()
|
||||
|
||||
self.remove_queue_ext_rc(queue_id)
|
||||
# self.remove_queue_ext_rc(queue_id)
|
||||
try:
|
||||
if parallel_mode:
|
||||
t = self.entropyTools.parallelTask(self.queue_processor, False, fork_data = (command_data, queue_id,))
|
||||
if command_data.get('do_parallel') and not fork_data:
|
||||
t = self.entropyTools.parallelTask(self.queue_processor, fork_data = (command_data, queue_id,))
|
||||
t.start()
|
||||
continue
|
||||
else:
|
||||
done, result = self.SystemExecutor.execute_task(command_data)
|
||||
done, result = self.SystemExecutor.execute_task(command_data)
|
||||
except Exception, e:
|
||||
if self.QueueLock.locked():
|
||||
if self.QueueLock.locked() and not fork_data:
|
||||
self.QueueLock.release()
|
||||
self.entropyTools.printTraceback()
|
||||
done = False
|
||||
@@ -25626,14 +25620,14 @@ class SystemManagerServerInterface(SocketHostInterface):
|
||||
|
||||
if command_data.has_key('extended_result') and done:
|
||||
try:
|
||||
command_data['result'], extended_result = copy_obj(result)
|
||||
command_data['result'], extended_result = self._queue_copy_obj(result)
|
||||
self.store_queue_ext_rc(queue_id, extended_result)
|
||||
except TypeError:
|
||||
done = False
|
||||
command_data['result'] = 'wrong tuple split from queue processor (1)'
|
||||
self.store_queue_ext_rc(queue_id, None)
|
||||
else:
|
||||
command_data['result'] = copy_obj(result)
|
||||
command_data['result'] = self._queue_copy_obj(result)
|
||||
|
||||
if not done:
|
||||
try:
|
||||
@@ -25682,7 +25676,7 @@ class SystemManagerServerInterface(SocketHostInterface):
|
||||
if fork_data: break
|
||||
|
||||
except:
|
||||
if self.QueueLock.locked():
|
||||
if self.QueueLock.locked() and not fork_data:
|
||||
self.QueueLock.release()
|
||||
raise
|
||||
|
||||
@@ -25692,8 +25686,6 @@ class SystemManagerServerInterface(SocketHostInterface):
|
||||
SocketHostInterface.killall(self)
|
||||
if self.QueueProcessor != None:
|
||||
self.QueueProcessor.kill()
|
||||
if self.QueueProcessorParallel != None:
|
||||
self.QueueProcessorParallel.kill()
|
||||
|
||||
|
||||
class SystemManagerClientCommands(EntropySocketClientCommands):
|
||||
|
||||
Reference in New Issue
Block a user