1
2 """
3
4 @author: Fabio Erculiani <lxnay@sabayonlinux.org>
5 @contact: lxnay@sabayonlinux.org
6 @copyright: Fabio Erculiani
7 @license: GPL-2
8
9 B{Entropy Services System Management Interface}.
10
11 """
12 from __future__ import with_statement
13 import time
14 import os
15 import random
16 import subprocess
17 from entropy.services.interfaces import SocketHost
18 from entropy.const import etpConst, const_setup_perms
19 from entropy.output import TextInterface
20 from entropy.misc import ParallelTask
21
23
24 - def __init__(self, SystemInterface, Entropy):
25 import entropy.tools as entropyTools
26 self.entropyTools = entropyTools
27 self.Entropy = Entropy
28 self.SystemInterface = SystemInterface
29 self.available_commands = {}
30 self.task_result = None
31
34
36
37 import signal
38 queue_id = command_data['queue_id']
39 args = command_data['args']
40 kwargs = command_data['kwargs']
41 data = self.available_commands.get(command_data['call'])
42
43 if data == None:
44 return False, 'no command'
45 elif len(args)+1 < data['args']:
46 return False, 'not enough args'
47
48 args.insert(0,queue_id)
49 self.task_result = None
50 t = ParallelTask(data['func'], *args, **kwargs)
51 t.start()
52 killed = False
53 while 1:
54 if not t.isAlive(): break
55 time.sleep(2)
56 live_item, key = self.SystemInterface.get_item_by_queue_id(queue_id)
57 if isinstance(live_item,dict) and (key == "processing") and (not killed):
58 if live_item['kill'] and (live_item['processing_pid'] != None):
59 os.kill(live_item['processing_pid'],signal.SIGKILL)
60 killed = True
61 if killed:
62 return False, 'killed by user'
63 return True, t.get_rc()
64
66
70
72
73 - def __init__(self, SystemManagerExecutorInstance, *args, **kwargs):
74 self.SystemManagerExecutor = SystemManagerExecutorInstance
75 self.available_commands = {
76 'hello_world': {
77 'func': self.hello_world,
78 'args': 0,
79 }
80 }
81
83 rc = subprocess.call('echo hello world', shell = True)
84 return True,rc
85
86
87 queue_file = 'system_manager_queue'
88 pinboard_file = "system_manager_pinboard"
89 queue_ext_rc_dir = "system_manager_rc"
90 STDOUT_STORAGE_DIR = os.path.join(etpConst['dumpstoragedir'],'system_manager_stdout')
91 - def __init__(self, EntropyInterface, do_ssl = False, stdout_logging = True, entropy_interface_kwargs = {}, **kwargs):
92
93 self.queue_loaded = False
94 from entropy.misc import TimeScheduled
95 self.TimeScheduled = TimeScheduled
96
97 import entropy.tools as entropyTools
98 import entropy.dump as dumpTools
99 import threading
100 self.entropyTools, self.dumpTools, self.threading = entropyTools, dumpTools, threading
101 from datetime import datetime
102 self.datetime = datetime
103 import copy
104 self.copy = copy
105 from entropy.services.system.commands import Base
106 self.setup_stdout_storage_dir()
107
108 if not kwargs.has_key('external_cmd_classes'):
109 kwargs['external_cmd_classes'] = []
110 kwargs['external_cmd_classes'].insert(0,Base)
111
112 self.Entropy = EntropyInterface(**entropy_interface_kwargs)
113 self.Text = TextInterface()
114 self.SystemExecutor = TaskExecutor(self, self.Entropy)
115
116 self.ExecutorCommandClasses = [(self.BuiltInSystemManagerExecutorCommands,[],{},)]
117 self.ExecutorCommandInstances = []
118 if kwargs.has_key('external_executor_cmd_classes'):
119 self.ExecutorCommandClasses += kwargs.pop('external_executor_cmd_classes')
120 self.handle_executor_command_classes_initialization()
121
122 self.QueueProcessor = None
123 self.QueueLock = self.threading.Lock()
124 self.PinboardLock = self.threading.Lock()
125 self.ForkLock = self.threading.Lock()
126 self.do_ssl = do_ssl
127
128 self.PinboardData = {}
129 self.load_pinboard()
130
131 self.done_queue_keys = ['processed','errored']
132 self.removable_queue_keys = ['processed','errored','queue']
133 self.processing_queue_keys = ['processing']
134 self.dict_queue_keys = ['queue','processing','processed','errored']
135 self.ManagerQueueStdInOut = {}
136 self.ManagerQueue = {
137 'queue': {},
138 'queue_order': [],
139 'processing': {},
140 'processing_order': [],
141 'processed': {},
142 'processed_order': [],
143 'errored' : {},
144 'errored_order': [],
145 'pause': True
146 }
147 self.load_queue()
148 self.queue_loaded = True
149 if self.ManagerQueue['processing'] or self.ManagerQueue['processing_order']:
150 self.ManagerQueue['processing'].clear()
151 del self.ManagerQueue['processing_order'][:]
152 self.store_queue()
153
154 SocketHost.__init__(
155 self,
156 self.FakeServiceInterface,
157 sock_output = self.Text,
158 ssl = do_ssl,
159 **kwargs
160 )
161 self.stdout_logging = stdout_logging
162
163
164
165 self.load_queue_processor()
166
167 self.play_queue()
168
170 if hasattr(self,'queue_loaded'):
171 if self.queue_loaded:
172 self.store_queue()
173
175 for myclass, args, kwargs in self.ExecutorCommandClasses:
176 myintf = myclass(self.SystemExecutor, *args,**kwargs)
177 if hasattr(myintf,'available_commands'):
178 self.SystemExecutor.register(myintf.available_commands)
179 self.ExecutorCommandInstances.append(myintf)
180 else:
181 del myintf
182
190
192 obj = self.get_stored_pinboard()
193 if isinstance(obj,dict):
194 self.PinboardData = obj
195 return True
196 return False
197
200
203
205 with self.PinboardLock:
206 mydata = {
207 'note': note,
208 'extended_text': extended_text,
209 'ts': self.get_ts(),
210 'done': False,
211 }
212 pinboard_id = self.get_pinboard_id()
213 self.PinboardData[pinboard_id] = mydata
214 self.store_pinboard()
215
217 with self.PinboardLock:
218 if self.PinboardData.has_key(pinboard_id):
219 self.PinboardData.pop(pinboard_id)
220 self.store_pinboard()
221 return True
222 return False
223
225 with self.PinboardLock:
226 if self.PinboardData.has_key(pinboard_id):
227 self.PinboardData[pinboard_id]['done'] = status
228 self.store_pinboard()
229 return True
230 return False
231
233 numbers = self.PinboardData.keys()
234 if numbers:
235 number = max(numbers)+1
236 else:
237 number = 1
238 return number
239
241 with self.PinboardLock:
242 return self.PinboardData.copy()
243
247
250
252 obj = self.get_stored_queue()
253 if isinstance(obj,dict):
254 self.ManagerQueue = obj
255 return True
256 return False
257
260
263
266
269
271 return self.datetime.fromtimestamp(time.time())
272
274 self.load_queue()
275 item1, key1 = self._get_item_by_queue_id(queue_id1)
276 item2, key2 = self._get_item_by_queue_id(queue_id2)
277 if key1 != key2:
278 return False
279 t_item = item1.copy()
280 item1.clear()
281 item1.update(item2)
282 item2.clear()
283 item2.update(t_item)
284
285 queue_id1_idx = self.ManagerQueue[key1+"_order"].index(queue_id1)
286 queue_id2_idx = self.ManagerQueue[key2+"_order"].index(queue_id2)
287 self.ManagerQueue[key1+"_order"][queue_id1_idx] = queue_id2
288 self.ManagerQueue[key2+"_order"][queue_id2_idx] = queue_id1
289 self.store_queue()
290 return True
291
292
293 - def add_to_queue(self, command_name, command_text, user_id, group_id, function, args, kwargs, do_parallel, extended_result, interactive = False):
294
295 if function not in self.SystemExecutor.available_commands:
296 return -1
297
298 self.load_queue()
299 queue_id = self.generate_unique_queue_id()
300 if interactive:
301 self.ManagerQueueStdInOut[queue_id] = os.pipe()
302 myqueue_dict = {
303 'queue_id': queue_id,
304 'command_name': command_name,
305 'command_desc': self.valid_commands[command_name]['desc'],
306 'command_text': command_text,
307 'call': function,
308 'args': self.copy.deepcopy(args),
309 'kwargs': self.copy.deepcopy(kwargs),
310 'user_id': user_id,
311 'group_id': group_id,
312 'stdout': self.assign_unique_stdout_file(queue_id),
313 'queue_ts': "%s" % (self.get_ts(),),
314 'kill': False,
315 'processing_pid': None,
316 'do_parallel': do_parallel,
317 'interactive': False,
318 }
319 if extended_result:
320 myqueue_dict['extended_result'] = None
321 self.ManagerQueue['queue'][queue_id] = myqueue_dict
322 self.ManagerQueue['queue_order'].append(queue_id)
323 self.store_queue()
324 return queue_id
325
327 self.load_queue()
328 removed = False
329 for key in self.ManagerQueue:
330 if key not in self.dict_queue_keys:
331 continue
332 for queue_id in queue_ids:
333 item = None
334 try:
335 item = self.ManagerQueue[key].pop(queue_id)
336 except KeyError:
337 continue
338 if item:
339 self.flush_item(item, queue_id)
340 if queue_id in self.ManagerQueue[key+"_order"]:
341 self.ManagerQueue[key+"_order"].remove(queue_id)
342 removed = True
343 self.remove_queue_ext_rc(queue_id)
344 if removed: self.store_queue()
345 return removed
346
348 self.load_queue()
349 item, key = self._get_item_by_queue_id(queue_id)
350 if key in self.processing_queue_keys:
351 item['kill'] = True
352 self.store_queue()
353
358
363
365 if not isinstance(item,dict):
366 return False
367 if item.has_key('stdout'):
368 stdout = item['stdout']
369 if (os.path.isfile(stdout) and os.access(stdout,os.W_OK)):
370 os.remove(stdout)
371 if item.has_key('interactive'):
372 if item['interactive'] and (queue_id in self.ManagerQueueStdInOut):
373 stdin, stdout = self.ManagerQueueStdInOut.pop(queue_id)
374 os.close(stdin)
375 os.close(stdout)
376 return True
377
379 stdout = os.path.join(self.STDOUT_STORAGE_DIR,"%d.%s" % (queue_id,"stdout",))
380 if os.path.isfile(stdout):
381 os.remove(stdout)
382 count = 0
383 orig_stdout = stdout
384 while os.path.lexists(stdout):
385 count += 1
386 stdout = "%s.%d" % (orig_stdout,count,)
387 return stdout
388
390 current_ids = set()
391 for key in self.ManagerQueue:
392 if not key.endswith("_order"):
393 continue
394 current_ids |= set(self.ManagerQueue[key])
395 while 1:
396 try:
397 queue_id = abs(hash(os.urandom(1)))
398 except NotImplementedError:
399 random.seed()
400 queue_id = random.randint(1000000000000000000,9999999999999999999)
401 if queue_id not in current_ids:
402 return queue_id
403
405 self.load_queue()
406 item, key = self._get_item_by_queue_id(queue_id)
407 if copy: item = self._queue_copy_obj(item)
408 return item, key
409
411 for key in self.dict_queue_keys:
412 item = self.ManagerQueue[key].get(queue_id)
413 if item != None:
414 return item, key
415 return None, None
416
418 try:
419 if self.ManagerQueue['queue_order']:
420 queue_id = self.ManagerQueue['queue_order'].pop(0)
421 return self.ManagerQueue['queue'].pop(queue_id), queue_id
422 except (IndexError,KeyError,):
423 self.entropyTools.print_traceback()
424 return None, None
425
427 if isinstance(obj,(dict,set,frozenset,)):
428 return obj.copy()
429 elif isinstance(obj,(list,tuple,)):
430 return obj[:]
431 return obj
432
434
435 try:
436 self._queue_processor(fork_data)
437 except:
438 if self.QueueLock.locked() and not fork_data:
439 self.QueueLock.release()
440 raise
441
443
444
445 if self.ForkLock.locked(): return
446
447 with self.ForkLock:
448 with self.QueueLock:
449
450 if fork_data:
451 command_data, queue_id = self._queue_copy_obj(fork_data)
452 else:
453 self.load_queue()
454 if self.ManagerQueue['pause']: return
455 if not self.ManagerQueue['queue_order']: return
456 command_data, queue_id = self._pop_item_from_queue()
457 if not command_data: return
458 command_data = self._queue_copy_obj(command_data)
459 command_data['processing_ts'] = "%s" % (self.get_ts(),)
460 self.ManagerQueue['processing'][queue_id] = command_data
461 self.ManagerQueue['processing_order'].append(queue_id)
462 self.store_queue()
463
464 self.remove_queue_ext_rc(queue_id)
465 try:
466 if command_data.get('do_parallel') and not fork_data:
467 t = ParallelTask(self.queue_processor, fork_data = (command_data, queue_id,))
468 t.start()
469 return
470 done, result = self.SystemExecutor.execute_task(command_data)
471 except Exception, e:
472 if self.QueueLock.locked(): self.QueueLock.release()
473 self.entropyTools.print_traceback()
474 done = False
475 result = (False, unicode(e),)
476
477 if command_data.has_key('extended_result') and done:
478 try:
479 command_data['result'], extended_result = self._queue_copy_obj(result)
480 self.store_queue_ext_rc(queue_id, extended_result)
481 except TypeError:
482 done = False
483 command_data['result'] = 'wrong tuple split from queue processor (1)'
484 self.store_queue_ext_rc(queue_id, None)
485 else:
486 command_data['result'] = self._queue_copy_obj(result)
487
488 with self.ForkLock:
489 with self.QueueLock:
490
491 self.load_queue()
492
493 if not done:
494 try:
495 self.ManagerQueue['processing'].pop(queue_id)
496 except KeyError:
497 pass
498 if queue_id in self.ManagerQueue['processing_order']:
499 self.ManagerQueue['processing_order'].remove(queue_id)
500 command_data['errored_ts'] = "%s" % (self.get_ts(),)
501 self.ManagerQueue['errored'][queue_id] = command_data
502 self.ManagerQueue['errored_order'].append(queue_id)
503 self.store_queue()
504 return
505
506 try:
507 done, cmd_result = result
508 except TypeError:
509 done = False
510 command_data['result'] = 'wrong tuple split from queue processor (2)'
511
512 if not done:
513 try:
514 self.ManagerQueue['processing'].pop(queue_id)
515 except KeyError:
516 pass
517 if queue_id in self.ManagerQueue['processing_order']:
518 self.ManagerQueue['processing_order'].remove(queue_id)
519 command_data['errored_ts'] = "%s" % (self.get_ts(),)
520 self.ManagerQueue['errored'][queue_id] = command_data
521 self.ManagerQueue['errored_order'].append(queue_id)
522 self.store_queue()
523 return
524
525 try:
526 self.ManagerQueue['processing'].pop(queue_id)
527 except KeyError:
528 pass
529 if queue_id in self.ManagerQueue['processing_order']:
530 self.ManagerQueue['processing_order'].remove(queue_id)
531 command_data['processed_ts'] = "%s" % (self.get_ts(),)
532 self.ManagerQueue['processed'][queue_id] = command_data
533 self.ManagerQueue['processed_order'].append(queue_id)
534 self.store_queue()
535
536
541