Package entropy :: Package services :: Package system :: Module interfaces

Source Code for Module entropy.services.system.interfaces

  1  # -*- coding: utf-8 -*- 
  2  """ 
  3   
  4      @author: Fabio Erculiani <lxnay@sabayonlinux.org> 
  5      @contact: lxnay@sabayonlinux.org 
  6      @copyright: Fabio Erculiani 
  7      @license: GPL-2 
  8   
  9      B{Entropy Services System Management Interface}. 
 10   
 11  """ 
 12  from __future__ import with_statement 
 13  import time 
 14  import os 
 15  import random 
 16  import subprocess 
 17  from entropy.services.interfaces import SocketHost 
 18  from entropy.const import etpConst, const_setup_perms 
 19  from entropy.output import TextInterface 
 20  from entropy.misc import ParallelTask 
 21   
22 -class TaskExecutor:
23
24 - def __init__(self, SystemInterface, Entropy):
25 import entropy.tools as entropyTools 26 self.entropyTools = entropyTools 27 self.Entropy = Entropy 28 self.SystemInterface = SystemInterface 29 self.available_commands = {} 30 self.task_result = None
31
32 - def register(self, available_commands):
34
35 - def execute_task(self, command_data):
36 37 import signal 38 queue_id = command_data['queue_id'] 39 args = command_data['args'] 40 kwargs = command_data['kwargs'] 41 data = self.available_commands.get(command_data['call']) 42 43 if data == None: 44 return False, 'no command' 45 elif len(args)+1 < data['args']: 46 return False, 'not enough args' 47 48 args.insert(0,queue_id) 49 self.task_result = None 50 t = ParallelTask(data['func'], *args, **kwargs) 51 t.start() 52 killed = False 53 while 1: 54 if not t.isAlive(): break 55 time.sleep(2) 56 live_item, key = self.SystemInterface.get_item_by_queue_id(queue_id) 57 if isinstance(live_item,dict) and (key == "processing") and (not killed): 58 if live_item['kill'] and (live_item['processing_pid'] != None): 59 os.kill(live_item['processing_pid'],signal.SIGKILL) 60 killed = True 61 if killed: 62 return False, 'killed by user' 63 return True, t.get_rc()
64
65 -class Server(SocketHost):
66
68 - def __init__(self, *args, **kwargs):
69 pass
70
72
73 - def __init__(self, SystemManagerExecutorInstance, *args, **kwargs):
74 self.SystemManagerExecutor = SystemManagerExecutorInstance 75 self.available_commands = { 76 'hello_world': { 77 'func': self.hello_world, 78 'args': 0, 79 } 80 }
81
82 - def hello_world(self):
83 rc = subprocess.call('echo hello world', shell = True) 84 return True,rc
85 86 87 queue_file = 'system_manager_queue' 88 pinboard_file = "system_manager_pinboard" 89 queue_ext_rc_dir = "system_manager_rc" 90 STDOUT_STORAGE_DIR = os.path.join(etpConst['dumpstoragedir'],'system_manager_stdout')
91 - def __init__(self, EntropyInterface, do_ssl = False, stdout_logging = True, entropy_interface_kwargs = {}, **kwargs):
92 93 self.queue_loaded = False 94 from entropy.misc import TimeScheduled 95 self.TimeScheduled = TimeScheduled 96 97 import entropy.tools as entropyTools 98 import entropy.dump as dumpTools 99 import threading 100 self.entropyTools, self.dumpTools, self.threading = entropyTools, dumpTools, threading 101 from datetime import datetime 102 self.datetime = datetime 103 import copy 104 self.copy = copy 105 from entropy.services.system.commands import Base 106 self.setup_stdout_storage_dir() 107 108 if not kwargs.has_key('external_cmd_classes'): 109 kwargs['external_cmd_classes'] = [] 110 kwargs['external_cmd_classes'].insert(0,Base) 111 112 self.Entropy = EntropyInterface(**entropy_interface_kwargs) 113 self.Text = TextInterface() 114 self.SystemExecutor = TaskExecutor(self, self.Entropy) 115 116 self.ExecutorCommandClasses = [(self.BuiltInSystemManagerExecutorCommands,[],{},)] 117 self.ExecutorCommandInstances = [] 118 if kwargs.has_key('external_executor_cmd_classes'): 119 self.ExecutorCommandClasses += kwargs.pop('external_executor_cmd_classes') 120 self.handle_executor_command_classes_initialization() 121 122 self.QueueProcessor = None 123 self.QueueLock = self.threading.Lock() 124 self.PinboardLock = self.threading.Lock() 125 self.ForkLock = self.threading.Lock() 126 self.do_ssl = do_ssl 127 128 self.PinboardData = {} 129 self.load_pinboard() 130 131 self.done_queue_keys = ['processed','errored'] 132 self.removable_queue_keys = ['processed','errored','queue'] 133 self.processing_queue_keys = ['processing'] 134 self.dict_queue_keys = ['queue','processing','processed','errored'] 135 self.ManagerQueueStdInOut = {} 136 self.ManagerQueue = { 137 'queue': {}, 138 'queue_order': [], 139 'processing': {}, 140 'processing_order': [], 141 'processed': {}, 142 'processed_order': [], 143 'errored' : {}, 144 'errored_order': [], 145 'pause': True 146 } 147 self.load_queue() 148 self.queue_loaded = True 149 if self.ManagerQueue['processing'] or self.ManagerQueue['processing_order']: 150 self.ManagerQueue['processing'].clear() 151 del self.ManagerQueue['processing_order'][:] 152 self.store_queue() 153 154 SocketHost.__init__( 155 self, 156 self.FakeServiceInterface, 157 sock_output = self.Text, 158 ssl = do_ssl, 159 **kwargs 160 ) 161 self.stdout_logging = stdout_logging 162 # no way, we MUST fork requests, otherwise weird things will happen when more than 163 # one user is connected 164 # self.fork_requests = False 165 self.load_queue_processor() 166 # here we can put anything that must be loaded before the queue processor execution 167 self.play_queue()
168
169 - def __del__(self):
170 if hasattr(self,'queue_loaded'): 171 if self.queue_loaded: 172 self.store_queue()
173
175 for myclass, args, kwargs in self.ExecutorCommandClasses: 176 myintf = myclass(self.SystemExecutor, *args,**kwargs) 177 if hasattr(myintf,'available_commands'): 178 self.SystemExecutor.register(myintf.available_commands) 179 self.ExecutorCommandInstances.append(myintf) 180 else: 181 del myintf
182
183 - def setup_stdout_storage_dir(self):
184 if os.path.isfile(self.STDOUT_STORAGE_DIR) or os.path.islink(self.STDOUT_STORAGE_DIR): 185 os.remove(self.STDOUT_STORAGE_DIR) 186 if not os.path.isdir(self.STDOUT_STORAGE_DIR): 187 os.makedirs(self.STDOUT_STORAGE_DIR,0775) 188 if etpConst['entropygid'] != None: 189 const_setup_perms(self.STDOUT_STORAGE_DIR,etpConst['entropygid'])
190
191 - def load_pinboard(self):
192 obj = self.get_stored_pinboard() 193 if isinstance(obj,dict): 194 self.PinboardData = obj 195 return True 196 return False
197
198 - def get_stored_pinboard(self):
199 return self.dumpTools.loadobj(self.pinboard_file)
200
201 - def store_pinboard(self):
202 self.dumpTools.dumpobj(self.pinboard_file, self.PinboardData)
203
204 - def add_to_pinboard(self, note, extended_text):
205 with self.PinboardLock: 206 mydata = { 207 'note': note, 208 'extended_text': extended_text, 209 'ts': self.get_ts(), 210 'done': False, 211 } 212 pinboard_id = self.get_pinboard_id() 213 self.PinboardData[pinboard_id] = mydata 214 self.store_pinboard()
215
216 - def remove_from_pinboard(self, pinboard_id):
217 with self.PinboardLock: 218 if self.PinboardData.has_key(pinboard_id): 219 self.PinboardData.pop(pinboard_id) 220 self.store_pinboard() 221 return True 222 return False
223
224 - def set_pinboard_item_status(self, pinboard_id, status):
225 with self.PinboardLock: 226 if self.PinboardData.has_key(pinboard_id): 227 self.PinboardData[pinboard_id]['done'] = status 228 self.store_pinboard() 229 return True 230 return False
231
232 - def get_pinboard_id(self):
233 numbers = self.PinboardData.keys() 234 if numbers: 235 number = max(numbers)+1 236 else: 237 number = 1 238 return number
239
240 - def get_pinboard_data(self):
241 with self.PinboardLock: 242 return self.PinboardData.copy()
243
244 - def load_queue_processor(self):
245 self.QueueProcessor = self.TimeScheduled(2, self.queue_processor) 246 self.QueueProcessor.start()
247
248 - def get_stored_queue(self):
249 return self.dumpTools.loadobj(self.queue_file)
250
251 - def load_queue(self):
252 obj = self.get_stored_queue() 253 if isinstance(obj,dict): 254 self.ManagerQueue = obj 255 return True 256 return False
257
258 - def store_queue(self):
259 self.dumpTools.dumpobj(self.queue_file, self.ManagerQueue)
260
261 - def load_queue_ext_rc(self, queue_id):
262 return self.dumpTools.loadobj(os.path.join(self.queue_ext_rc_dir,str(queue_id)))
263
264 - def store_queue_ext_rc(self, queue_id, rc):
265 return self.dumpTools.dumpobj(os.path.join(self.queue_ext_rc_dir,str(queue_id)), rc)
266
267 - def remove_queue_ext_rc(self, queue_id):
268 return self.dumpTools.removeobj(os.path.join(self.queue_ext_rc_dir,str(queue_id)))
269
270 - def get_ts(self):
271 return self.datetime.fromtimestamp(time.time())
272
273 - def swap_items_in_queue(self, queue_id1, queue_id2):
274 self.load_queue() 275 item1, key1 = self._get_item_by_queue_id(queue_id1) 276 item2, key2 = self._get_item_by_queue_id(queue_id2) 277 if key1 != key2: 278 return False 279 t_item = item1.copy() 280 item1.clear() 281 item1.update(item2) 282 item2.clear() 283 item2.update(t_item) 284 # fix the _order 285 queue_id1_idx = self.ManagerQueue[key1+"_order"].index(queue_id1) 286 queue_id2_idx = self.ManagerQueue[key2+"_order"].index(queue_id2) 287 self.ManagerQueue[key1+"_order"][queue_id1_idx] = queue_id2 288 self.ManagerQueue[key2+"_order"][queue_id2_idx] = queue_id1 289 self.store_queue() 290 return True
291 292
293 - def add_to_queue(self, command_name, command_text, user_id, group_id, function, args, kwargs, do_parallel, extended_result, interactive = False):
294 295 if function not in self.SystemExecutor.available_commands: 296 return -1 297 298 self.load_queue() 299 queue_id = self.generate_unique_queue_id() 300 if interactive: 301 self.ManagerQueueStdInOut[queue_id] = os.pipe() 302 myqueue_dict = { 303 'queue_id': queue_id, 304 'command_name': command_name, 305 'command_desc': self.valid_commands[command_name]['desc'], 306 'command_text': command_text, 307 'call': function, 308 'args': self.copy.deepcopy(args), 309 'kwargs': self.copy.deepcopy(kwargs), 310 'user_id': user_id, 311 'group_id': group_id, 312 'stdout': self.assign_unique_stdout_file(queue_id), 313 'queue_ts': "%s" % (self.get_ts(),), 314 'kill': False, 315 'processing_pid': None, 316 'do_parallel': do_parallel, 317 'interactive': False, 318 } 319 if extended_result: 320 myqueue_dict['extended_result'] = None 321 self.ManagerQueue['queue'][queue_id] = myqueue_dict 322 self.ManagerQueue['queue_order'].append(queue_id) 323 self.store_queue() 324 return queue_id
325
326 - def remove_from_queue(self, queue_ids):
327 self.load_queue() 328 removed = False 329 for key in self.ManagerQueue: 330 if key not in self.dict_queue_keys: 331 continue 332 for queue_id in queue_ids: 333 item = None 334 try: 335 item = self.ManagerQueue[key].pop(queue_id) 336 except KeyError: 337 continue 338 if item: 339 self.flush_item(item, queue_id) 340 if queue_id in self.ManagerQueue[key+"_order"]: 341 self.ManagerQueue[key+"_order"].remove(queue_id) 342 removed = True 343 self.remove_queue_ext_rc(queue_id) 344 if removed: self.store_queue() 345 return removed
346
347 - def kill_processing_queue_id(self, queue_id):
348 self.load_queue() 349 item, key = self._get_item_by_queue_id(queue_id) 350 if key in self.processing_queue_keys: 351 item['kill'] = True 352 self.store_queue()
353
354 - def pause_queue(self):
355 self.load_queue() 356 self.ManagerQueue['pause'] = True 357 self.store_queue()
358
359 - def play_queue(self):
360 self.load_queue() 361 self.ManagerQueue['pause'] = False 362 self.store_queue()
363
364 - def flush_item(self, item, queue_id):
365 if not isinstance(item,dict): 366 return False 367 if item.has_key('stdout'): 368 stdout = item['stdout'] 369 if (os.path.isfile(stdout) and os.access(stdout,os.W_OK)): 370 os.remove(stdout) 371 if item.has_key('interactive'): 372 if item['interactive'] and (queue_id in self.ManagerQueueStdInOut): 373 stdin, stdout = self.ManagerQueueStdInOut.pop(queue_id) 374 os.close(stdin) 375 os.close(stdout) 376 return True
377
378 - def assign_unique_stdout_file(self, queue_id):
379 stdout = os.path.join(self.STDOUT_STORAGE_DIR,"%d.%s" % (queue_id,"stdout",)) 380 if os.path.isfile(stdout): 381 os.remove(stdout) 382 count = 0 383 orig_stdout = stdout 384 while os.path.lexists(stdout): 385 count += 1 386 stdout = "%s.%d" % (orig_stdout,count,) 387 return stdout
388
389 - def generate_unique_queue_id(self):
390 current_ids = set() 391 for key in self.ManagerQueue: 392 if not key.endswith("_order"): 393 continue 394 current_ids |= set(self.ManagerQueue[key]) 395 while 1: 396 try: 397 queue_id = abs(hash(os.urandom(1))) 398 except NotImplementedError: 399 random.seed() 400 queue_id = random.randint(1000000000000000000,9999999999999999999) 401 if queue_id not in current_ids: 402 return queue_id
403
404 - def get_item_by_queue_id(self, queue_id, copy = False):
405 self.load_queue() 406 item, key = self._get_item_by_queue_id(queue_id) 407 if copy: item = self._queue_copy_obj(item) 408 return item, key
409
410 - def _get_item_by_queue_id(self, queue_id):
411 for key in self.dict_queue_keys: 412 item = self.ManagerQueue[key].get(queue_id) 413 if item != None: 414 return item, key 415 return None, None
416
417 - def _pop_item_from_queue(self):
418 try: 419 if self.ManagerQueue['queue_order']: 420 queue_id = self.ManagerQueue['queue_order'].pop(0) 421 return self.ManagerQueue['queue'].pop(queue_id), queue_id 422 except (IndexError,KeyError,): 423 self.entropyTools.print_traceback() 424 return None, None
425
426 - def _queue_copy_obj(self, obj):
427 if isinstance(obj,(dict,set,frozenset,)): 428 return obj.copy() 429 elif isinstance(obj,(list,tuple,)): 430 return obj[:] 431 return obj
432
433 - def queue_processor(self, fork_data = None):
434 435 try: 436 self._queue_processor(fork_data) 437 except: 438 if self.QueueLock.locked() and not fork_data: 439 self.QueueLock.release() 440 raise
441
442 - def _queue_processor(self, fork_data):
443 444 # queue processing is stopped until there's a process running 445 if self.ForkLock.locked(): return 446 447 with self.ForkLock: 448 with self.QueueLock: 449 450 if fork_data: 451 command_data, queue_id = self._queue_copy_obj(fork_data) 452 else: 453 self.load_queue() 454 if self.ManagerQueue['pause']: return 455 if not self.ManagerQueue['queue_order']: return 456 command_data, queue_id = self._pop_item_from_queue() 457 if not command_data: return 458 command_data = self._queue_copy_obj(command_data) 459 command_data['processing_ts'] = "%s" % (self.get_ts(),) 460 self.ManagerQueue['processing'][queue_id] = command_data 461 self.ManagerQueue['processing_order'].append(queue_id) 462 self.store_queue() 463 464 self.remove_queue_ext_rc(queue_id) 465 try: 466 if command_data.get('do_parallel') and not fork_data: 467 t = ParallelTask(self.queue_processor, fork_data = (command_data, queue_id,)) 468 t.start() 469 return 470 done, result = self.SystemExecutor.execute_task(command_data) 471 except Exception, e: 472 if self.QueueLock.locked(): self.QueueLock.release() 473 self.entropyTools.print_traceback() 474 done = False 475 result = (False, unicode(e),) 476 477 if command_data.has_key('extended_result') and done: 478 try: 479 command_data['result'], extended_result = self._queue_copy_obj(result) 480 self.store_queue_ext_rc(queue_id, extended_result) 481 except TypeError: 482 done = False 483 command_data['result'] = 'wrong tuple split from queue processor (1)' 484 self.store_queue_ext_rc(queue_id, None) 485 else: 486 command_data['result'] = self._queue_copy_obj(result) 487 488 with self.ForkLock: 489 with self.QueueLock: 490 491 self.load_queue() 492 493 if not done: 494 try: 495 self.ManagerQueue['processing'].pop(queue_id) 496 except KeyError: 497 pass 498 if queue_id in self.ManagerQueue['processing_order']: 499 self.ManagerQueue['processing_order'].remove(queue_id) 500 command_data['errored_ts'] = "%s" % (self.get_ts(),) 501 self.ManagerQueue['errored'][queue_id] = command_data 502 self.ManagerQueue['errored_order'].append(queue_id) 503 self.store_queue() 504 return 505 506 try: 507 done, cmd_result = result 508 except TypeError: 509 done = False 510 command_data['result'] = 'wrong tuple split from queue processor (2)' 511 512 if not done: 513 try: 514 self.ManagerQueue['processing'].pop(queue_id) 515 except KeyError: 516 pass 517 if queue_id in self.ManagerQueue['processing_order']: 518 self.ManagerQueue['processing_order'].remove(queue_id) 519 command_data['errored_ts'] = "%s" % (self.get_ts(),) 520 self.ManagerQueue['errored'][queue_id] = command_data 521 self.ManagerQueue['errored_order'].append(queue_id) 522 self.store_queue() 523 return 524 525 try: 526 self.ManagerQueue['processing'].pop(queue_id) 527 except KeyError: 528 pass 529 if queue_id in self.ManagerQueue['processing_order']: 530 self.ManagerQueue['processing_order'].remove(queue_id) 531 command_data['processed_ts'] = "%s" % (self.get_ts(),) 532 self.ManagerQueue['processed'][queue_id] = command_data 533 self.ManagerQueue['processed_order'].append(queue_id) 534 self.store_queue()
535 536
537 - def killall(self):
538 SocketHost.killall(self) 539 if self.QueueProcessor != None: 540 self.QueueProcessor.kill()
541