Entropy/SocketHostInterface:

- if HostInterface has ForkLock attribute that's a thread lock, use it when running in fork mode
Entropy/SystemManagerServerInterface:
- test a solution featuring a Fork Lock


git-svn-id: http://svn.sabayonlinux.org/projects/entropy/trunk@2947 cd1c1023-2f26-0410-ae45-c471fc1f0318
This commit is contained in:
lxnay
2009-01-26 13:17:08 +00:00
parent 13f77c64e1
commit 88f539e7e5

View File

@@ -13904,10 +13904,15 @@ class SocketHostInterface:
# not using spawnFunction because it causes some mess
# forking this way avoids having memory leaks
if self.server.processor.HostInterface.fork_requests:
l = None
if hasattr(self.server.processor.HostInterface,'ForkLock'):
x = getattr(self.server.processor.HostInterface,'ForkLock')
if hasattr(x,'acquire') and hasattr(x,'release'):
l = x; l.acquire()
my_timeout = self.server.processor.HostInterface.fork_request_timeout_seconds
pid = os.fork()
seconds = 0
if pid > 0:
if pid > 0: # parent here
# pid killer after timeout
passed_away = False
while 1:
@@ -13935,6 +13940,7 @@ class SocketHostInterface:
passed_away = True # in this way, the process table should be clean
continue
break
if l != None: l.release()
else:
self.do_handle()
os._exit(0)
@@ -24968,23 +24974,21 @@ class SystemManagerServerInterface(SocketHostInterface):
def docmd_get_queue(self, myargs):
with self.HostInterface.QueueLock:
self.HostInterface.load_queue()
myqueue = copy.deepcopy(self.HostInterface.ManagerQueue)
myqueue = copy.deepcopy(self.HostInterface.ManagerQueue)
extended = False
if myargs:
extended = myargs[0]
extended = False
if myargs:
extended = myargs[0]
if not extended:
for key in self.HostInterface.done_queue_keys:
for queue_id in myqueue.get(key):
item = myqueue[key].get(queue_id)
if not item.has_key('extended_result'):
continue
item['extended_result'] = None
if not extended:
for key in self.HostInterface.done_queue_keys:
for queue_id in myqueue.get(key):
item = myqueue[key].get(queue_id)
if not item.has_key('extended_result'):
continue
item['extended_result'] = None
return True, myqueue
return True, myqueue
def docmd_get_queue_item_by_id(self, myargs):
@@ -25268,6 +25272,7 @@ class SystemManagerServerInterface(SocketHostInterface):
self.QueueProcessor = None
self.QueueLock = self.threading.Lock()
self.PinboardLock = self.threading.Lock()
self.ForkLock = self.threading.Lock()
self.do_ssl = do_ssl
self.PinboardData = {}
@@ -25416,23 +25421,22 @@ class SystemManagerServerInterface(SocketHostInterface):
return self.datetime.fromtimestamp(time.time())
def swap_items_in_queue(self, queue_id1, queue_id2):
with self.QueueLock:
self.load_queue()
item1, key1 = self._get_item_by_queue_id(queue_id1)
item2, key2 = self._get_item_by_queue_id(queue_id2)
if key1 != key2:
return False
t_item = item1.copy()
item1.clear()
item1.update(item2)
item2.clear()
item2.update(t_item)
# fix the _order
queue_id1_idx = self.ManagerQueue[key1+"_order"].index(queue_id1)
queue_id2_idx = self.ManagerQueue[key2+"_order"].index(queue_id2)
self.ManagerQueue[key1+"_order"][queue_id1_idx] = queue_id2
self.ManagerQueue[key2+"_order"][queue_id2_idx] = queue_id1
self.store_queue()
self.load_queue()
item1, key1 = self._get_item_by_queue_id(queue_id1)
item2, key2 = self._get_item_by_queue_id(queue_id2)
if key1 != key2:
return False
t_item = item1.copy()
item1.clear()
item1.update(item2)
item2.clear()
item2.update(t_item)
# fix the _order
queue_id1_idx = self.ManagerQueue[key1+"_order"].index(queue_id1)
queue_id2_idx = self.ManagerQueue[key2+"_order"].index(queue_id2)
self.ManagerQueue[key1+"_order"][queue_id1_idx] = queue_id2
self.ManagerQueue[key2+"_order"][queue_id2_idx] = queue_id1
self.store_queue()
return True
@@ -25441,76 +25445,71 @@ class SystemManagerServerInterface(SocketHostInterface):
if function not in self.SystemExecutor.available_commands:
return -1
with self.QueueLock:
self.load_queue()
queue_id = self.generate_unique_queue_id()
if interactive:
self.ManagerQueueStdInOut[queue_id] = os.pipe()
myqueue_dict = {
'queue_id': queue_id,
'command_name': command_name,
'command_desc': self.valid_commands[command_name]['desc'],
'command_text': command_text,
'call': function,
'args': copy.deepcopy(args),
'kwargs': copy.deepcopy(kwargs),
'user_id': user_id,
'group_id': group_id,
'stdout': self.assign_unique_stdout_file(queue_id),
'queue_ts': "%s" % (self.get_ts(),),
'kill': False,
'processing_pid': None,
'do_parallel': do_parallel,
'interactive': False,
}
if extended_result:
myqueue_dict['extended_result'] = None
self.ManagerQueue['queue'][queue_id] = myqueue_dict
self.ManagerQueue['queue_order'].append(queue_id)
self.store_queue()
self.load_queue()
queue_id = self.generate_unique_queue_id()
if interactive:
self.ManagerQueueStdInOut[queue_id] = os.pipe()
myqueue_dict = {
'queue_id': queue_id,
'command_name': command_name,
'command_desc': self.valid_commands[command_name]['desc'],
'command_text': command_text,
'call': function,
'args': copy.deepcopy(args),
'kwargs': copy.deepcopy(kwargs),
'user_id': user_id,
'group_id': group_id,
'stdout': self.assign_unique_stdout_file(queue_id),
'queue_ts': "%s" % (self.get_ts(),),
'kill': False,
'processing_pid': None,
'do_parallel': do_parallel,
'interactive': False,
}
if extended_result:
myqueue_dict['extended_result'] = None
self.ManagerQueue['queue'][queue_id] = myqueue_dict
self.ManagerQueue['queue_order'].append(queue_id)
self.store_queue()
return queue_id
def remove_from_queue(self, queue_ids):
with self.QueueLock:
self.load_queue()
removed = False
for key in self.ManagerQueue:
if key not in self.dict_queue_keys:
self.load_queue()
removed = False
for key in self.ManagerQueue:
if key not in self.dict_queue_keys:
continue
for queue_id in queue_ids:
item = None
try:
item = self.ManagerQueue[key].pop(queue_id)
except KeyError:
continue
for queue_id in queue_ids:
item = None
try:
item = self.ManagerQueue[key].pop(queue_id)
except KeyError:
continue
if item:
self.flush_item(item, queue_id)
if queue_id in self.ManagerQueue[key+"_order"]:
self.ManagerQueue[key+"_order"].remove(queue_id)
removed = True
self.remove_queue_ext_rc(queue_id)
if removed: self.store_queue()
return removed
if item:
self.flush_item(item, queue_id)
if queue_id in self.ManagerQueue[key+"_order"]:
self.ManagerQueue[key+"_order"].remove(queue_id)
removed = True
self.remove_queue_ext_rc(queue_id)
if removed: self.store_queue()
return removed
def kill_processing_queue_id(self, queue_id):
with self.QueueLock:
self.load_queue()
item, key = self._get_item_by_queue_id(queue_id)
if key in self.processing_queue_keys:
item['kill'] = True
self.store_queue()
self.load_queue()
item, key = self._get_item_by_queue_id(queue_id)
if key in self.processing_queue_keys:
item['kill'] = True
self.store_queue()
def pause_queue(self):
with self.QueueLock:
self.load_queue()
self.ManagerQueue['pause'] = True
self.store_queue()
self.load_queue()
self.ManagerQueue['pause'] = True
self.store_queue()
def play_queue(self):
with self.QueueLock:
self.load_queue()
self.ManagerQueue['pause'] = False
self.store_queue()
self.load_queue()
self.ManagerQueue['pause'] = False
self.store_queue()
def flush_item(self, item, queue_id):
if not isinstance(item,dict):
@@ -25549,11 +25548,10 @@ class SystemManagerServerInterface(SocketHostInterface):
return queue_id
def get_item_by_queue_id(self, queue_id, copy = False):
with self.QueueLock:
self.load_queue()
item, key = self._get_item_by_queue_id(queue_id)
if copy: item = self._queue_copy_obj(item)
return item, key
self.load_queue()
item, key = self._get_item_by_queue_id(queue_id)
if copy: item = self._queue_copy_obj(item)
return item, key
def _get_item_by_queue_id(self, queue_id):
for key in self.dict_queue_keys:
@@ -25589,20 +25587,25 @@ class SystemManagerServerInterface(SocketHostInterface):
def _queue_processor(self, fork_data):
with self.QueueLock:
if fork_data:
command_data, queue_id = self._queue_copy_obj(fork_data)
else:
self.load_queue()
if self.ManagerQueue['pause']: return
if not self.ManagerQueue['queue_order']: return
command_data, queue_id = self._pop_item_from_queue()
if not command_data: return
command_data = self._queue_copy_obj(command_data)
command_data['processing_ts'] = "%s" % (self.get_ts(),)
self.ManagerQueue['processing'][queue_id] = command_data
self.ManagerQueue['processing_order'].append(queue_id)
self.store_queue()
# queue processing is stopped until there's a process running
if self.ForkLock.locked(): return
with self.ForkLock:
with self.QueueLock:
if fork_data:
command_data, queue_id = self._queue_copy_obj(fork_data)
else:
self.load_queue()
if self.ManagerQueue['pause']: return
if not self.ManagerQueue['queue_order']: return
command_data, queue_id = self._pop_item_from_queue()
if not command_data: return
command_data = self._queue_copy_obj(command_data)
command_data['processing_ts'] = "%s" % (self.get_ts(),)
self.ManagerQueue['processing'][queue_id] = command_data
self.ManagerQueue['processing_order'].append(queue_id)
self.store_queue()
self.remove_queue_ext_rc(queue_id)
try:
@@ -25628,52 +25631,53 @@ class SystemManagerServerInterface(SocketHostInterface):
else:
command_data['result'] = self._queue_copy_obj(result)
with self.QueueLock:
with self.ForkLock:
with self.QueueLock:
self.load_queue()
self.load_queue()
if not done:
try:
self.ManagerQueue['processing'].pop(queue_id)
except KeyError:
pass
if queue_id in self.ManagerQueue['processing_order']:
self.ManagerQueue['processing_order'].remove(queue_id)
command_data['errored_ts'] = "%s" % (self.get_ts(),)
self.ManagerQueue['errored'][queue_id] = command_data
self.ManagerQueue['errored_order'].append(queue_id)
self.store_queue()
return
try:
done, cmd_result = result
except TypeError:
done = False
command_data['result'] = 'wrong tuple split from queue processor (2)'
if not done:
try:
self.ManagerQueue['processing'].pop(queue_id)
except KeyError:
pass
if queue_id in self.ManagerQueue['processing_order']:
self.ManagerQueue['processing_order'].remove(queue_id)
command_data['errored_ts'] = "%s" % (self.get_ts(),)
self.ManagerQueue['errored'][queue_id] = command_data
self.ManagerQueue['errored_order'].append(queue_id)
self.store_queue()
return
if not done:
try:
self.ManagerQueue['processing'].pop(queue_id)
except KeyError:
pass
if queue_id in self.ManagerQueue['processing_order']:
self.ManagerQueue['processing_order'].remove(queue_id)
command_data['errored_ts'] = "%s" % (self.get_ts(),)
self.ManagerQueue['errored'][queue_id] = command_data
self.ManagerQueue['errored_order'].append(queue_id)
command_data['processed_ts'] = "%s" % (self.get_ts(),)
self.ManagerQueue['processed'][queue_id] = command_data
self.ManagerQueue['processed_order'].append(queue_id)
self.store_queue()
return
try:
done, cmd_result = result
except TypeError:
done = False
command_data['result'] = 'wrong tuple split from queue processor (2)'
if not done:
try:
self.ManagerQueue['processing'].pop(queue_id)
except KeyError:
pass
if queue_id in self.ManagerQueue['processing_order']:
self.ManagerQueue['processing_order'].remove(queue_id)
command_data['errored_ts'] = "%s" % (self.get_ts(),)
self.ManagerQueue['errored'][queue_id] = command_data
self.ManagerQueue['errored_order'].append(queue_id)
self.store_queue()
return
try:
self.ManagerQueue['processing'].pop(queue_id)
except KeyError:
pass
if queue_id in self.ManagerQueue['processing_order']:
self.ManagerQueue['processing_order'].remove(queue_id)
command_data['processed_ts'] = "%s" % (self.get_ts(),)
self.ManagerQueue['processed'][queue_id] = command_data
self.ManagerQueue['processed_order'].append(queue_id)
self.store_queue()
def killall(self):