Package entropy :: Package services :: Package system :: Module interfaces

Source Code for Module entropy.services.system.interfaces

  1  # -*- coding: utf-8 -*- 
  2  ''' 
  3      # DESCRIPTION: 
  4      # Entropy Object Oriented Interface 
  5   
  6      Copyright (C) 2007-2009 Fabio Erculiani 
  7   
  8      This program is free software; you can redistribute it and/or modify 
  9      it under the terms of the GNU General Public License as published by 
 10      the Free Software Foundation; either version 2 of the License, or 
 11      (at your option) any later version. 
 12   
 13      This program is distributed in the hope that it will be useful, 
 14      but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 16      GNU General Public License for more details. 
 17   
 18      You should have received a copy of the GNU General Public License 
 19      along with this program; if not, write to the Free Software 
 20      Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 21  ''' 
 22  from __future__ import with_statement 
 23  import time 
 24  import os 
 25  import random 
 26  import subprocess 
 27  from entropy.services.interfaces import SocketHost 
 28  from entropy.const import etpConst, const_setup_perms 
 29  from entropy.output import TextInterface 
 30  from entropy.misc import ParallelTask 
 31   
32 -class TaskExecutor:
33
34 - def __init__(self, SystemInterface, Entropy):
35 import entropy.tools as entropyTools 36 self.entropyTools = entropyTools 37 self.Entropy = Entropy 38 self.SystemInterface = SystemInterface 39 self.available_commands = {} 40 self.task_result = None
41
42 - def register(self, available_commands):
44
45 - def execute_task(self, command_data):
46 47 import signal 48 queue_id = command_data['queue_id'] 49 args = command_data['args'] 50 kwargs = command_data['kwargs'] 51 data = self.available_commands.get(command_data['call']) 52 53 if data == None: 54 return False, 'no command' 55 elif len(args)+1 < data['args']: 56 return False, 'not enough args' 57 58 args.insert(0,queue_id) 59 self.task_result = None 60 t = ParallelTask(data['func'], *args, **kwargs) 61 t.start() 62 killed = False 63 while 1: 64 if not t.isAlive(): break 65 time.sleep(2) 66 live_item, key = self.SystemInterface.get_item_by_queue_id(queue_id) 67 if isinstance(live_item,dict) and (key == "processing") and (not killed): 68 if live_item['kill'] and (live_item['processing_pid'] != None): 69 os.kill(live_item['processing_pid'],signal.SIGKILL) 70 killed = True 71 if killed: 72 return False, 'killed by user' 73 return True, t.get_rc()
74
75 -class Server(SocketHost):
76
78 - def __init__(self, *args, **kwargs):
79 pass
80
82
83 - def __init__(self, SystemManagerExecutorInstance, *args, **kwargs):
84 self.SystemManagerExecutor = SystemManagerExecutorInstance 85 self.available_commands = { 86 'hello_world': { 87 'func': self.hello_world, 88 'args': 0, 89 } 90 }
91
92 - def hello_world(self):
93 rc = subprocess.call('echo hello world', shell = True) 94 return True,rc
95 96 97 queue_file = 'system_manager_queue' 98 pinboard_file = "system_manager_pinboard" 99 queue_ext_rc_dir = "system_manager_rc" 100 STDOUT_STORAGE_DIR = os.path.join(etpConst['dumpstoragedir'],'system_manager_stdout')
101 - def __init__(self, EntropyInterface, do_ssl = False, stdout_logging = True, entropy_interface_kwargs = {}, **kwargs):
102 103 self.queue_loaded = False 104 from entropy.misc import TimeScheduled 105 self.TimeScheduled = TimeScheduled 106 107 import entropy.tools as entropyTools 108 import entropy.dump as dumpTools 109 import threading 110 self.entropyTools, self.dumpTools, self.threading = entropyTools, dumpTools, threading 111 from datetime import datetime 112 self.datetime = datetime 113 import copy 114 self.copy = copy 115 from entropy.services.system.commands import Base 116 self.setup_stdout_storage_dir() 117 118 if not kwargs.has_key('external_cmd_classes'): 119 kwargs['external_cmd_classes'] = [] 120 kwargs['external_cmd_classes'].insert(0,Base) 121 122 self.Entropy = EntropyInterface(**entropy_interface_kwargs) 123 self.Text = TextInterface() 124 self.SystemExecutor = TaskExecutor(self, self.Entropy) 125 126 self.ExecutorCommandClasses = [(self.BuiltInSystemManagerExecutorCommands,[],{},)] 127 self.ExecutorCommandInstances = [] 128 if kwargs.has_key('external_executor_cmd_classes'): 129 self.ExecutorCommandClasses += kwargs.pop('external_executor_cmd_classes') 130 self.handle_executor_command_classes_initialization() 131 132 self.QueueProcessor = None 133 self.QueueLock = self.threading.Lock() 134 self.PinboardLock = self.threading.Lock() 135 self.ForkLock = self.threading.Lock() 136 self.do_ssl = do_ssl 137 138 self.PinboardData = {} 139 self.load_pinboard() 140 141 self.done_queue_keys = ['processed','errored'] 142 self.removable_queue_keys = ['processed','errored','queue'] 143 self.processing_queue_keys = ['processing'] 144 self.dict_queue_keys = ['queue','processing','processed','errored'] 145 self.ManagerQueueStdInOut = {} 146 self.ManagerQueue = { 147 'queue': {}, 148 'queue_order': [], 149 'processing': {}, 150 'processing_order': [], 151 'processed': {}, 152 'processed_order': [], 153 'errored' : {}, 154 'errored_order': [], 155 'pause': True 156 } 157 self.load_queue() 158 self.queue_loaded = True 159 if self.ManagerQueue['processing'] or self.ManagerQueue['processing_order']: 160 self.ManagerQueue['processing'].clear() 161 del self.ManagerQueue['processing_order'][:] 162 self.store_queue() 163 164 SocketHost.__init__( 165 self, 166 self.FakeServiceInterface, 167 sock_output = self.Text, 168 ssl = do_ssl, 169 **kwargs 170 ) 171 self.stdout_logging = stdout_logging 172 # no way, we MUST fork requests, otherwise weird things will happen when more than 173 # one user is connected 174 # self.fork_requests = False 175 self.load_queue_processor() 176 # here we can put anything that must be loaded before the queue processor execution 177 self.play_queue()
178
179 - def __del__(self):
180 if hasattr(self,'queue_loaded'): 181 if self.queue_loaded: 182 self.store_queue()
183
185 for myclass, args, kwargs in self.ExecutorCommandClasses: 186 myintf = myclass(self.SystemExecutor, *args,**kwargs) 187 if hasattr(myintf,'available_commands'): 188 self.SystemExecutor.register(myintf.available_commands) 189 self.ExecutorCommandInstances.append(myintf) 190 else: 191 del myintf
192
193 - def setup_stdout_storage_dir(self):
194 if os.path.isfile(self.STDOUT_STORAGE_DIR) or os.path.islink(self.STDOUT_STORAGE_DIR): 195 os.remove(self.STDOUT_STORAGE_DIR) 196 if not os.path.isdir(self.STDOUT_STORAGE_DIR): 197 os.makedirs(self.STDOUT_STORAGE_DIR,0775) 198 if etpConst['entropygid'] != None: 199 const_setup_perms(self.STDOUT_STORAGE_DIR,etpConst['entropygid'])
200
201 - def load_pinboard(self):
202 obj = self.get_stored_pinboard() 203 if isinstance(obj,dict): 204 self.PinboardData = obj 205 return True 206 return False
207
208 - def get_stored_pinboard(self):
209 return self.dumpTools.loadobj(self.pinboard_file)
210
211 - def store_pinboard(self):
212 self.dumpTools.dumpobj(self.pinboard_file, self.PinboardData)
213
214 - def add_to_pinboard(self, note, extended_text):
215 with self.PinboardLock: 216 mydata = { 217 'note': note, 218 'extended_text': extended_text, 219 'ts': self.get_ts(), 220 'done': False, 221 } 222 pinboard_id = self.get_pinboard_id() 223 self.PinboardData[pinboard_id] = mydata 224 self.store_pinboard()
225
226 - def remove_from_pinboard(self, pinboard_id):
227 with self.PinboardLock: 228 if self.PinboardData.has_key(pinboard_id): 229 self.PinboardData.pop(pinboard_id) 230 self.store_pinboard() 231 return True 232 return False
233
234 - def set_pinboard_item_status(self, pinboard_id, status):
235 with self.PinboardLock: 236 if self.PinboardData.has_key(pinboard_id): 237 self.PinboardData[pinboard_id]['done'] = status 238 self.store_pinboard() 239 return True 240 return False
241
242 - def get_pinboard_id(self):
243 numbers = self.PinboardData.keys() 244 if numbers: 245 number = max(numbers)+1 246 else: 247 number = 1 248 return number
249
250 - def get_pinboard_data(self):
251 with self.PinboardLock: 252 return self.PinboardData.copy()
253
254 - def load_queue_processor(self):
255 self.QueueProcessor = self.TimeScheduled(2, self.queue_processor) 256 self.QueueProcessor.start()
257
258 - def get_stored_queue(self):
259 return self.dumpTools.loadobj(self.queue_file)
260
261 - def load_queue(self):
262 obj = self.get_stored_queue() 263 if isinstance(obj,dict): 264 self.ManagerQueue = obj 265 return True 266 return False
267
268 - def store_queue(self):
269 self.dumpTools.dumpobj(self.queue_file, self.ManagerQueue)
270
271 - def load_queue_ext_rc(self, queue_id):
272 return self.dumpTools.loadobj(os.path.join(self.queue_ext_rc_dir,str(queue_id)))
273
274 - def store_queue_ext_rc(self, queue_id, rc):
275 return self.dumpTools.dumpobj(os.path.join(self.queue_ext_rc_dir,str(queue_id)), rc)
276
277 - def remove_queue_ext_rc(self, queue_id):
278 return self.dumpTools.removeobj(os.path.join(self.queue_ext_rc_dir,str(queue_id)))
279
280 - def get_ts(self):
281 return self.datetime.fromtimestamp(time.time())
282
283 - def swap_items_in_queue(self, queue_id1, queue_id2):
284 self.load_queue() 285 item1, key1 = self._get_item_by_queue_id(queue_id1) 286 item2, key2 = self._get_item_by_queue_id(queue_id2) 287 if key1 != key2: 288 return False 289 t_item = item1.copy() 290 item1.clear() 291 item1.update(item2) 292 item2.clear() 293 item2.update(t_item) 294 # fix the _order 295 queue_id1_idx = self.ManagerQueue[key1+"_order"].index(queue_id1) 296 queue_id2_idx = self.ManagerQueue[key2+"_order"].index(queue_id2) 297 self.ManagerQueue[key1+"_order"][queue_id1_idx] = queue_id2 298 self.ManagerQueue[key2+"_order"][queue_id2_idx] = queue_id1 299 self.store_queue() 300 return True
301 302
303 - def add_to_queue(self, command_name, command_text, user_id, group_id, function, args, kwargs, do_parallel, extended_result, interactive = False):
304 305 if function not in self.SystemExecutor.available_commands: 306 return -1 307 308 self.load_queue() 309 queue_id = self.generate_unique_queue_id() 310 if interactive: 311 self.ManagerQueueStdInOut[queue_id] = os.pipe() 312 myqueue_dict = { 313 'queue_id': queue_id, 314 'command_name': command_name, 315 'command_desc': self.valid_commands[command_name]['desc'], 316 'command_text': command_text, 317 'call': function, 318 'args': self.copy.deepcopy(args), 319 'kwargs': self.copy.deepcopy(kwargs), 320 'user_id': user_id, 321 'group_id': group_id, 322 'stdout': self.assign_unique_stdout_file(queue_id), 323 'queue_ts': "%s" % (self.get_ts(),), 324 'kill': False, 325 'processing_pid': None, 326 'do_parallel': do_parallel, 327 'interactive': False, 328 } 329 if extended_result: 330 myqueue_dict['extended_result'] = None 331 self.ManagerQueue['queue'][queue_id] = myqueue_dict 332 self.ManagerQueue['queue_order'].append(queue_id) 333 self.store_queue() 334 return queue_id
335
336 - def remove_from_queue(self, queue_ids):
337 self.load_queue() 338 removed = False 339 for key in self.ManagerQueue: 340 if key not in self.dict_queue_keys: 341 continue 342 for queue_id in queue_ids: 343 item = None 344 try: 345 item = self.ManagerQueue[key].pop(queue_id) 346 except KeyError: 347 continue 348 if item: 349 self.flush_item(item, queue_id) 350 if queue_id in self.ManagerQueue[key+"_order"]: 351 self.ManagerQueue[key+"_order"].remove(queue_id) 352 removed = True 353 self.remove_queue_ext_rc(queue_id) 354 if removed: self.store_queue() 355 return removed
356
357 - def kill_processing_queue_id(self, queue_id):
358 self.load_queue() 359 item, key = self._get_item_by_queue_id(queue_id) 360 if key in self.processing_queue_keys: 361 item['kill'] = True 362 self.store_queue()
363
364 - def pause_queue(self):
365 self.load_queue() 366 self.ManagerQueue['pause'] = True 367 self.store_queue()
368
369 - def play_queue(self):
370 self.load_queue() 371 self.ManagerQueue['pause'] = False 372 self.store_queue()
373
374 - def flush_item(self, item, queue_id):
375 if not isinstance(item,dict): 376 return False 377 if item.has_key('stdout'): 378 stdout = item['stdout'] 379 if (os.path.isfile(stdout) and os.access(stdout,os.W_OK)): 380 os.remove(stdout) 381 if item.has_key('interactive'): 382 if item['interactive'] and (queue_id in self.ManagerQueueStdInOut): 383 stdin, stdout = self.ManagerQueueStdInOut.pop(queue_id) 384 os.close(stdin) 385 os.close(stdout) 386 return True
387
388 - def assign_unique_stdout_file(self, queue_id):
389 stdout = os.path.join(self.STDOUT_STORAGE_DIR,"%d.%s" % (queue_id,"stdout",)) 390 if os.path.isfile(stdout): 391 os.remove(stdout) 392 count = 0 393 orig_stdout = stdout 394 while os.path.lexists(stdout): 395 count += 1 396 stdout = "%s.%d" % (orig_stdout,count,) 397 return stdout
398
399 - def generate_unique_queue_id(self):
400 current_ids = set() 401 for key in self.ManagerQueue: 402 if not key.endswith("_order"): 403 continue 404 current_ids |= set(self.ManagerQueue[key]) 405 while 1: 406 try: 407 queue_id = abs(hash(os.urandom(1))) 408 except NotImplementedError: 409 random.seed() 410 queue_id = random.randint(1000000000000000000,9999999999999999999) 411 if queue_id not in current_ids: 412 return queue_id
413
414 - def get_item_by_queue_id(self, queue_id, copy = False):
415 self.load_queue() 416 item, key = self._get_item_by_queue_id(queue_id) 417 if copy: item = self._queue_copy_obj(item) 418 return item, key
419
420 - def _get_item_by_queue_id(self, queue_id):
421 for key in self.dict_queue_keys: 422 item = self.ManagerQueue[key].get(queue_id) 423 if item != None: 424 return item, key 425 return None, None
426
427 - def _pop_item_from_queue(self):
428 try: 429 if self.ManagerQueue['queue_order']: 430 queue_id = self.ManagerQueue['queue_order'].pop(0) 431 return self.ManagerQueue['queue'].pop(queue_id), queue_id 432 except (IndexError,KeyError,): 433 self.entropyTools.print_traceback() 434 return None, None
435
436 - def _queue_copy_obj(self, obj):
437 if isinstance(obj,(dict,set,frozenset,)): 438 return obj.copy() 439 elif isinstance(obj,(list,tuple,)): 440 return obj[:] 441 return obj
442
443 - def queue_processor(self, fork_data = None):
444 445 try: 446 self._queue_processor(fork_data) 447 except: 448 if self.QueueLock.locked() and not fork_data: 449 self.QueueLock.release() 450 raise
451
452 - def _queue_processor(self, fork_data):
453 454 # queue processing is stopped until there's a process running 455 if self.ForkLock.locked(): return 456 457 with self.ForkLock: 458 with self.QueueLock: 459 460 if fork_data: 461 command_data, queue_id = self._queue_copy_obj(fork_data) 462 else: 463 self.load_queue() 464 if self.ManagerQueue['pause']: return 465 if not self.ManagerQueue['queue_order']: return 466 command_data, queue_id = self._pop_item_from_queue() 467 if not command_data: return 468 command_data = self._queue_copy_obj(command_data) 469 command_data['processing_ts'] = "%s" % (self.get_ts(),) 470 self.ManagerQueue['processing'][queue_id] = command_data 471 self.ManagerQueue['processing_order'].append(queue_id) 472 self.store_queue() 473 474 self.remove_queue_ext_rc(queue_id) 475 try: 476 if command_data.get('do_parallel') and not fork_data: 477 t = ParallelTask(self.queue_processor, fork_data = (command_data, queue_id,)) 478 t.start() 479 return 480 done, result = self.SystemExecutor.execute_task(command_data) 481 except Exception, e: 482 if self.QueueLock.locked(): self.QueueLock.release() 483 self.entropyTools.print_traceback() 484 done = False 485 result = (False, unicode(e),) 486 487 if command_data.has_key('extended_result') and done: 488 try: 489 command_data['result'], extended_result = self._queue_copy_obj(result) 490 self.store_queue_ext_rc(queue_id, extended_result) 491 except TypeError: 492 done = False 493 command_data['result'] = 'wrong tuple split from queue processor (1)' 494 self.store_queue_ext_rc(queue_id, None) 495 else: 496 command_data['result'] = self._queue_copy_obj(result) 497 498 with self.ForkLock: 499 with self.QueueLock: 500 501 self.load_queue() 502 503 if not done: 504 try: 505 self.ManagerQueue['processing'].pop(queue_id) 506 except KeyError: 507 pass 508 if queue_id in self.ManagerQueue['processing_order']: 509 self.ManagerQueue['processing_order'].remove(queue_id) 510 command_data['errored_ts'] = "%s" % (self.get_ts(),) 511 self.ManagerQueue['errored'][queue_id] = command_data 512 self.ManagerQueue['errored_order'].append(queue_id) 513 self.store_queue() 514 return 515 516 try: 517 done, cmd_result = result 518 except TypeError: 519 done = False 520 command_data['result'] = 'wrong tuple split from queue processor (2)' 521 522 if not done: 523 try: 524 self.ManagerQueue['processing'].pop(queue_id) 525 except KeyError: 526 pass 527 if queue_id in self.ManagerQueue['processing_order']: 528 self.ManagerQueue['processing_order'].remove(queue_id) 529 command_data['errored_ts'] = "%s" % (self.get_ts(),) 530 self.ManagerQueue['errored'][queue_id] = command_data 531 self.ManagerQueue['errored_order'].append(queue_id) 532 self.store_queue() 533 return 534 535 try: 536 self.ManagerQueue['processing'].pop(queue_id) 537 except KeyError: 538 pass 539 if queue_id in self.ManagerQueue['processing_order']: 540 self.ManagerQueue['processing_order'].remove(queue_id) 541 command_data['processed_ts'] = "%s" % (self.get_ts(),) 542 self.ManagerQueue['processed'][queue_id] = command_data 543 self.ManagerQueue['processed_order'].append(queue_id) 544 self.store_queue()
545 546
547 - def killall(self):
548 SocketHost.killall(self) 549 if self.QueueProcessor != None: 550 self.QueueProcessor.kill()
551