diff --git a/libraries/entropy.py b/libraries/entropy.py index d1bffbbfd..fd77e85f7 100644 --- a/libraries/entropy.py +++ b/libraries/entropy.py @@ -13904,10 +13904,15 @@ class SocketHostInterface: # not using spawnFunction because it causes some mess # forking this way avoids having memory leaks if self.server.processor.HostInterface.fork_requests: + l = None + if hasattr(self.server.processor.HostInterface,'ForkLock'): + x = getattr(self.server.processor.HostInterface,'ForkLock') + if hasattr(x,'acquire') and hasattr(x,'release'): + l = x; l.acquire() my_timeout = self.server.processor.HostInterface.fork_request_timeout_seconds pid = os.fork() seconds = 0 - if pid > 0: + if pid > 0: # parent here # pid killer after timeout passed_away = False while 1: @@ -13935,6 +13940,7 @@ class SocketHostInterface: passed_away = True # in this way, the process table should be clean continue break + if l != None: l.release() else: self.do_handle() os._exit(0) @@ -24968,23 +24974,21 @@ class SystemManagerServerInterface(SocketHostInterface): def docmd_get_queue(self, myargs): - with self.HostInterface.QueueLock: - self.HostInterface.load_queue() - myqueue = copy.deepcopy(self.HostInterface.ManagerQueue) + myqueue = copy.deepcopy(self.HostInterface.ManagerQueue) - extended = False - if myargs: - extended = myargs[0] + extended = False + if myargs: + extended = myargs[0] - if not extended: - for key in self.HostInterface.done_queue_keys: - for queue_id in myqueue.get(key): - item = myqueue[key].get(queue_id) - if not item.has_key('extended_result'): - continue - item['extended_result'] = None + if not extended: + for key in self.HostInterface.done_queue_keys: + for queue_id in myqueue.get(key): + item = myqueue[key].get(queue_id) + if not item.has_key('extended_result'): + continue + item['extended_result'] = None - return True, myqueue + return True, myqueue def docmd_get_queue_item_by_id(self, myargs): @@ -25268,6 +25272,7 @@ class SystemManagerServerInterface(SocketHostInterface): self.QueueProcessor = None self.QueueLock = self.threading.Lock() self.PinboardLock = self.threading.Lock() + self.ForkLock = self.threading.Lock() self.do_ssl = do_ssl self.PinboardData = {} @@ -25416,23 +25421,22 @@ class SystemManagerServerInterface(SocketHostInterface): return self.datetime.fromtimestamp(time.time()) def swap_items_in_queue(self, queue_id1, queue_id2): - with self.QueueLock: - self.load_queue() - item1, key1 = self._get_item_by_queue_id(queue_id1) - item2, key2 = self._get_item_by_queue_id(queue_id2) - if key1 != key2: - return False - t_item = item1.copy() - item1.clear() - item1.update(item2) - item2.clear() - item2.update(t_item) - # fix the _order - queue_id1_idx = self.ManagerQueue[key1+"_order"].index(queue_id1) - queue_id2_idx = self.ManagerQueue[key2+"_order"].index(queue_id2) - self.ManagerQueue[key1+"_order"][queue_id1_idx] = queue_id2 - self.ManagerQueue[key2+"_order"][queue_id2_idx] = queue_id1 - self.store_queue() + self.load_queue() + item1, key1 = self._get_item_by_queue_id(queue_id1) + item2, key2 = self._get_item_by_queue_id(queue_id2) + if key1 != key2: + return False + t_item = item1.copy() + item1.clear() + item1.update(item2) + item2.clear() + item2.update(t_item) + # fix the _order + queue_id1_idx = self.ManagerQueue[key1+"_order"].index(queue_id1) + queue_id2_idx = self.ManagerQueue[key2+"_order"].index(queue_id2) + self.ManagerQueue[key1+"_order"][queue_id1_idx] = queue_id2 + self.ManagerQueue[key2+"_order"][queue_id2_idx] = queue_id1 + self.store_queue() return True @@ -25441,76 +25445,71 @@ class SystemManagerServerInterface(SocketHostInterface): if function not in self.SystemExecutor.available_commands: return -1 - with self.QueueLock: - self.load_queue() - queue_id = self.generate_unique_queue_id() - if interactive: - self.ManagerQueueStdInOut[queue_id] = os.pipe() - myqueue_dict = { - 'queue_id': queue_id, - 'command_name': command_name, - 'command_desc': self.valid_commands[command_name]['desc'], - 'command_text': command_text, - 'call': function, - 'args': copy.deepcopy(args), - 'kwargs': copy.deepcopy(kwargs), - 'user_id': user_id, - 'group_id': group_id, - 'stdout': self.assign_unique_stdout_file(queue_id), - 'queue_ts': "%s" % (self.get_ts(),), - 'kill': False, - 'processing_pid': None, - 'do_parallel': do_parallel, - 'interactive': False, - } - if extended_result: - myqueue_dict['extended_result'] = None - self.ManagerQueue['queue'][queue_id] = myqueue_dict - self.ManagerQueue['queue_order'].append(queue_id) - self.store_queue() + self.load_queue() + queue_id = self.generate_unique_queue_id() + if interactive: + self.ManagerQueueStdInOut[queue_id] = os.pipe() + myqueue_dict = { + 'queue_id': queue_id, + 'command_name': command_name, + 'command_desc': self.valid_commands[command_name]['desc'], + 'command_text': command_text, + 'call': function, + 'args': copy.deepcopy(args), + 'kwargs': copy.deepcopy(kwargs), + 'user_id': user_id, + 'group_id': group_id, + 'stdout': self.assign_unique_stdout_file(queue_id), + 'queue_ts': "%s" % (self.get_ts(),), + 'kill': False, + 'processing_pid': None, + 'do_parallel': do_parallel, + 'interactive': False, + } + if extended_result: + myqueue_dict['extended_result'] = None + self.ManagerQueue['queue'][queue_id] = myqueue_dict + self.ManagerQueue['queue_order'].append(queue_id) + self.store_queue() return queue_id def remove_from_queue(self, queue_ids): - with self.QueueLock: - self.load_queue() - removed = False - for key in self.ManagerQueue: - if key not in self.dict_queue_keys: + self.load_queue() + removed = False + for key in self.ManagerQueue: + if key not in self.dict_queue_keys: + continue + for queue_id in queue_ids: + item = None + try: + item = self.ManagerQueue[key].pop(queue_id) + except KeyError: continue - for queue_id in queue_ids: - item = None - try: - item = self.ManagerQueue[key].pop(queue_id) - except KeyError: - continue - if item: - self.flush_item(item, queue_id) - if queue_id in self.ManagerQueue[key+"_order"]: - self.ManagerQueue[key+"_order"].remove(queue_id) - removed = True - self.remove_queue_ext_rc(queue_id) - if removed: self.store_queue() - return removed + if item: + self.flush_item(item, queue_id) + if queue_id in self.ManagerQueue[key+"_order"]: + self.ManagerQueue[key+"_order"].remove(queue_id) + removed = True + self.remove_queue_ext_rc(queue_id) + if removed: self.store_queue() + return removed def kill_processing_queue_id(self, queue_id): - with self.QueueLock: - self.load_queue() - item, key = self._get_item_by_queue_id(queue_id) - if key in self.processing_queue_keys: - item['kill'] = True - self.store_queue() + self.load_queue() + item, key = self._get_item_by_queue_id(queue_id) + if key in self.processing_queue_keys: + item['kill'] = True + self.store_queue() def pause_queue(self): - with self.QueueLock: - self.load_queue() - self.ManagerQueue['pause'] = True - self.store_queue() + self.load_queue() + self.ManagerQueue['pause'] = True + self.store_queue() def play_queue(self): - with self.QueueLock: - self.load_queue() - self.ManagerQueue['pause'] = False - self.store_queue() + self.load_queue() + self.ManagerQueue['pause'] = False + self.store_queue() def flush_item(self, item, queue_id): if not isinstance(item,dict): @@ -25549,11 +25548,10 @@ class SystemManagerServerInterface(SocketHostInterface): return queue_id def get_item_by_queue_id(self, queue_id, copy = False): - with self.QueueLock: - self.load_queue() - item, key = self._get_item_by_queue_id(queue_id) - if copy: item = self._queue_copy_obj(item) - return item, key + self.load_queue() + item, key = self._get_item_by_queue_id(queue_id) + if copy: item = self._queue_copy_obj(item) + return item, key def _get_item_by_queue_id(self, queue_id): for key in self.dict_queue_keys: @@ -25589,20 +25587,25 @@ class SystemManagerServerInterface(SocketHostInterface): def _queue_processor(self, fork_data): - with self.QueueLock: - if fork_data: - command_data, queue_id = self._queue_copy_obj(fork_data) - else: - self.load_queue() - if self.ManagerQueue['pause']: return - if not self.ManagerQueue['queue_order']: return - command_data, queue_id = self._pop_item_from_queue() - if not command_data: return - command_data = self._queue_copy_obj(command_data) - command_data['processing_ts'] = "%s" % (self.get_ts(),) - self.ManagerQueue['processing'][queue_id] = command_data - self.ManagerQueue['processing_order'].append(queue_id) - self.store_queue() + # queue processing is stopped until there's a process running + if self.ForkLock.locked(): return + + with self.ForkLock: + with self.QueueLock: + + if fork_data: + command_data, queue_id = self._queue_copy_obj(fork_data) + else: + self.load_queue() + if self.ManagerQueue['pause']: return + if not self.ManagerQueue['queue_order']: return + command_data, queue_id = self._pop_item_from_queue() + if not command_data: return + command_data = self._queue_copy_obj(command_data) + command_data['processing_ts'] = "%s" % (self.get_ts(),) + self.ManagerQueue['processing'][queue_id] = command_data + self.ManagerQueue['processing_order'].append(queue_id) + self.store_queue() self.remove_queue_ext_rc(queue_id) try: @@ -25628,52 +25631,53 @@ class SystemManagerServerInterface(SocketHostInterface): else: command_data['result'] = self._queue_copy_obj(result) - with self.QueueLock: + with self.ForkLock: + with self.QueueLock: - self.load_queue() + self.load_queue() + + if not done: + try: + self.ManagerQueue['processing'].pop(queue_id) + except KeyError: + pass + if queue_id in self.ManagerQueue['processing_order']: + self.ManagerQueue['processing_order'].remove(queue_id) + command_data['errored_ts'] = "%s" % (self.get_ts(),) + self.ManagerQueue['errored'][queue_id] = command_data + self.ManagerQueue['errored_order'].append(queue_id) + self.store_queue() + return + + try: + done, cmd_result = result + except TypeError: + done = False + command_data['result'] = 'wrong tuple split from queue processor (2)' + + if not done: + try: + self.ManagerQueue['processing'].pop(queue_id) + except KeyError: + pass + if queue_id in self.ManagerQueue['processing_order']: + self.ManagerQueue['processing_order'].remove(queue_id) + command_data['errored_ts'] = "%s" % (self.get_ts(),) + self.ManagerQueue['errored'][queue_id] = command_data + self.ManagerQueue['errored_order'].append(queue_id) + self.store_queue() + return - if not done: try: self.ManagerQueue['processing'].pop(queue_id) except KeyError: pass if queue_id in self.ManagerQueue['processing_order']: self.ManagerQueue['processing_order'].remove(queue_id) - command_data['errored_ts'] = "%s" % (self.get_ts(),) - self.ManagerQueue['errored'][queue_id] = command_data - self.ManagerQueue['errored_order'].append(queue_id) + command_data['processed_ts'] = "%s" % (self.get_ts(),) + self.ManagerQueue['processed'][queue_id] = command_data + self.ManagerQueue['processed_order'].append(queue_id) self.store_queue() - return - - try: - done, cmd_result = result - except TypeError: - done = False - command_data['result'] = 'wrong tuple split from queue processor (2)' - - if not done: - try: - self.ManagerQueue['processing'].pop(queue_id) - except KeyError: - pass - if queue_id in self.ManagerQueue['processing_order']: - self.ManagerQueue['processing_order'].remove(queue_id) - command_data['errored_ts'] = "%s" % (self.get_ts(),) - self.ManagerQueue['errored'][queue_id] = command_data - self.ManagerQueue['errored_order'].append(queue_id) - self.store_queue() - return - - try: - self.ManagerQueue['processing'].pop(queue_id) - except KeyError: - pass - if queue_id in self.ManagerQueue['processing_order']: - self.ManagerQueue['processing_order'].remove(queue_id) - command_data['processed_ts'] = "%s" % (self.get_ts(),) - self.ManagerQueue['processed'][queue_id] = command_data - self.ManagerQueue['processed_order'].append(queue_id) - self.store_queue() def killall(self):