1
2 '''
3 # DESCRIPTION:
4 # Entropy Object Oriented Interface
5
6 Copyright (C) 2007-2009 Fabio Erculiani
7
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 2 of the License, or
11 (at your option) any later version.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 '''
22 from __future__ import with_statement
23 import time
24 import os
25 import random
26 import subprocess
27 from entropy.services.interfaces import SocketHost
28 from entropy.const import etpConst, const_setup_perms
29 from entropy.output import TextInterface
30 from entropy.misc import ParallelTask
31
33
34 - def __init__(self, SystemInterface, Entropy):
35 import entropy.tools as entropyTools
36 self.entropyTools = entropyTools
37 self.Entropy = Entropy
38 self.SystemInterface = SystemInterface
39 self.available_commands = {}
40 self.task_result = None
41
44
46
47 import signal
48 queue_id = command_data['queue_id']
49 args = command_data['args']
50 kwargs = command_data['kwargs']
51 data = self.available_commands.get(command_data['call'])
52
53 if data == None:
54 return False, 'no command'
55 elif len(args)+1 < data['args']:
56 return False, 'not enough args'
57
58 args.insert(0,queue_id)
59 self.task_result = None
60 t = ParallelTask(data['func'], *args, **kwargs)
61 t.start()
62 killed = False
63 while 1:
64 if not t.isAlive(): break
65 time.sleep(2)
66 live_item, key = self.SystemInterface.get_item_by_queue_id(queue_id)
67 if isinstance(live_item,dict) and (key == "processing") and (not killed):
68 if live_item['kill'] and (live_item['processing_pid'] != None):
69 os.kill(live_item['processing_pid'],signal.SIGKILL)
70 killed = True
71 if killed:
72 return False, 'killed by user'
73 return True, t.get_rc()
74
76
80
82
83 - def __init__(self, SystemManagerExecutorInstance, *args, **kwargs):
84 self.SystemManagerExecutor = SystemManagerExecutorInstance
85 self.available_commands = {
86 'hello_world': {
87 'func': self.hello_world,
88 'args': 0,
89 }
90 }
91
93 rc = subprocess.call('echo hello world', shell = True)
94 return True,rc
95
96
97 queue_file = 'system_manager_queue'
98 pinboard_file = "system_manager_pinboard"
99 queue_ext_rc_dir = "system_manager_rc"
100 STDOUT_STORAGE_DIR = os.path.join(etpConst['dumpstoragedir'],'system_manager_stdout')
101 - def __init__(self, EntropyInterface, do_ssl = False, stdout_logging = True, entropy_interface_kwargs = {}, **kwargs):
102
103 self.queue_loaded = False
104 from entropy.misc import TimeScheduled
105 self.TimeScheduled = TimeScheduled
106
107 import entropy.tools as entropyTools
108 import entropy.dump as dumpTools
109 import threading
110 self.entropyTools, self.dumpTools, self.threading = entropyTools, dumpTools, threading
111 from datetime import datetime
112 self.datetime = datetime
113 import copy
114 self.copy = copy
115 from entropy.services.system.commands import Base
116 self.setup_stdout_storage_dir()
117
118 if not kwargs.has_key('external_cmd_classes'):
119 kwargs['external_cmd_classes'] = []
120 kwargs['external_cmd_classes'].insert(0,Base)
121
122 self.Entropy = EntropyInterface(**entropy_interface_kwargs)
123 self.Text = TextInterface()
124 self.SystemExecutor = TaskExecutor(self, self.Entropy)
125
126 self.ExecutorCommandClasses = [(self.BuiltInSystemManagerExecutorCommands,[],{},)]
127 self.ExecutorCommandInstances = []
128 if kwargs.has_key('external_executor_cmd_classes'):
129 self.ExecutorCommandClasses += kwargs.pop('external_executor_cmd_classes')
130 self.handle_executor_command_classes_initialization()
131
132 self.QueueProcessor = None
133 self.QueueLock = self.threading.Lock()
134 self.PinboardLock = self.threading.Lock()
135 self.ForkLock = self.threading.Lock()
136 self.do_ssl = do_ssl
137
138 self.PinboardData = {}
139 self.load_pinboard()
140
141 self.done_queue_keys = ['processed','errored']
142 self.removable_queue_keys = ['processed','errored','queue']
143 self.processing_queue_keys = ['processing']
144 self.dict_queue_keys = ['queue','processing','processed','errored']
145 self.ManagerQueueStdInOut = {}
146 self.ManagerQueue = {
147 'queue': {},
148 'queue_order': [],
149 'processing': {},
150 'processing_order': [],
151 'processed': {},
152 'processed_order': [],
153 'errored' : {},
154 'errored_order': [],
155 'pause': True
156 }
157 self.load_queue()
158 self.queue_loaded = True
159 if self.ManagerQueue['processing'] or self.ManagerQueue['processing_order']:
160 self.ManagerQueue['processing'].clear()
161 del self.ManagerQueue['processing_order'][:]
162 self.store_queue()
163
164 SocketHost.__init__(
165 self,
166 self.FakeServiceInterface,
167 sock_output = self.Text,
168 ssl = do_ssl,
169 **kwargs
170 )
171 self.stdout_logging = stdout_logging
172
173
174
175 self.load_queue_processor()
176
177 self.play_queue()
178
180 if hasattr(self,'queue_loaded'):
181 if self.queue_loaded:
182 self.store_queue()
183
185 for myclass, args, kwargs in self.ExecutorCommandClasses:
186 myintf = myclass(self.SystemExecutor, *args,**kwargs)
187 if hasattr(myintf,'available_commands'):
188 self.SystemExecutor.register(myintf.available_commands)
189 self.ExecutorCommandInstances.append(myintf)
190 else:
191 del myintf
192
200
202 obj = self.get_stored_pinboard()
203 if isinstance(obj,dict):
204 self.PinboardData = obj
205 return True
206 return False
207
210
213
215 with self.PinboardLock:
216 mydata = {
217 'note': note,
218 'extended_text': extended_text,
219 'ts': self.get_ts(),
220 'done': False,
221 }
222 pinboard_id = self.get_pinboard_id()
223 self.PinboardData[pinboard_id] = mydata
224 self.store_pinboard()
225
227 with self.PinboardLock:
228 if self.PinboardData.has_key(pinboard_id):
229 self.PinboardData.pop(pinboard_id)
230 self.store_pinboard()
231 return True
232 return False
233
235 with self.PinboardLock:
236 if self.PinboardData.has_key(pinboard_id):
237 self.PinboardData[pinboard_id]['done'] = status
238 self.store_pinboard()
239 return True
240 return False
241
243 numbers = self.PinboardData.keys()
244 if numbers:
245 number = max(numbers)+1
246 else:
247 number = 1
248 return number
249
251 with self.PinboardLock:
252 return self.PinboardData.copy()
253
257
260
262 obj = self.get_stored_queue()
263 if isinstance(obj,dict):
264 self.ManagerQueue = obj
265 return True
266 return False
267
270
273
276
279
281 return self.datetime.fromtimestamp(time.time())
282
284 self.load_queue()
285 item1, key1 = self._get_item_by_queue_id(queue_id1)
286 item2, key2 = self._get_item_by_queue_id(queue_id2)
287 if key1 != key2:
288 return False
289 t_item = item1.copy()
290 item1.clear()
291 item1.update(item2)
292 item2.clear()
293 item2.update(t_item)
294
295 queue_id1_idx = self.ManagerQueue[key1+"_order"].index(queue_id1)
296 queue_id2_idx = self.ManagerQueue[key2+"_order"].index(queue_id2)
297 self.ManagerQueue[key1+"_order"][queue_id1_idx] = queue_id2
298 self.ManagerQueue[key2+"_order"][queue_id2_idx] = queue_id1
299 self.store_queue()
300 return True
301
302
303 - def add_to_queue(self, command_name, command_text, user_id, group_id, function, args, kwargs, do_parallel, extended_result, interactive = False):
304
305 if function not in self.SystemExecutor.available_commands:
306 return -1
307
308 self.load_queue()
309 queue_id = self.generate_unique_queue_id()
310 if interactive:
311 self.ManagerQueueStdInOut[queue_id] = os.pipe()
312 myqueue_dict = {
313 'queue_id': queue_id,
314 'command_name': command_name,
315 'command_desc': self.valid_commands[command_name]['desc'],
316 'command_text': command_text,
317 'call': function,
318 'args': self.copy.deepcopy(args),
319 'kwargs': self.copy.deepcopy(kwargs),
320 'user_id': user_id,
321 'group_id': group_id,
322 'stdout': self.assign_unique_stdout_file(queue_id),
323 'queue_ts': "%s" % (self.get_ts(),),
324 'kill': False,
325 'processing_pid': None,
326 'do_parallel': do_parallel,
327 'interactive': False,
328 }
329 if extended_result:
330 myqueue_dict['extended_result'] = None
331 self.ManagerQueue['queue'][queue_id] = myqueue_dict
332 self.ManagerQueue['queue_order'].append(queue_id)
333 self.store_queue()
334 return queue_id
335
337 self.load_queue()
338 removed = False
339 for key in self.ManagerQueue:
340 if key not in self.dict_queue_keys:
341 continue
342 for queue_id in queue_ids:
343 item = None
344 try:
345 item = self.ManagerQueue[key].pop(queue_id)
346 except KeyError:
347 continue
348 if item:
349 self.flush_item(item, queue_id)
350 if queue_id in self.ManagerQueue[key+"_order"]:
351 self.ManagerQueue[key+"_order"].remove(queue_id)
352 removed = True
353 self.remove_queue_ext_rc(queue_id)
354 if removed: self.store_queue()
355 return removed
356
358 self.load_queue()
359 item, key = self._get_item_by_queue_id(queue_id)
360 if key in self.processing_queue_keys:
361 item['kill'] = True
362 self.store_queue()
363
368
373
375 if not isinstance(item,dict):
376 return False
377 if item.has_key('stdout'):
378 stdout = item['stdout']
379 if (os.path.isfile(stdout) and os.access(stdout,os.W_OK)):
380 os.remove(stdout)
381 if item.has_key('interactive'):
382 if item['interactive'] and (queue_id in self.ManagerQueueStdInOut):
383 stdin, stdout = self.ManagerQueueStdInOut.pop(queue_id)
384 os.close(stdin)
385 os.close(stdout)
386 return True
387
389 stdout = os.path.join(self.STDOUT_STORAGE_DIR,"%d.%s" % (queue_id,"stdout",))
390 if os.path.isfile(stdout):
391 os.remove(stdout)
392 count = 0
393 orig_stdout = stdout
394 while os.path.lexists(stdout):
395 count += 1
396 stdout = "%s.%d" % (orig_stdout,count,)
397 return stdout
398
400 current_ids = set()
401 for key in self.ManagerQueue:
402 if not key.endswith("_order"):
403 continue
404 current_ids |= set(self.ManagerQueue[key])
405 while 1:
406 try:
407 queue_id = abs(hash(os.urandom(1)))
408 except NotImplementedError:
409 random.seed()
410 queue_id = random.randint(1000000000000000000,9999999999999999999)
411 if queue_id not in current_ids:
412 return queue_id
413
415 self.load_queue()
416 item, key = self._get_item_by_queue_id(queue_id)
417 if copy: item = self._queue_copy_obj(item)
418 return item, key
419
421 for key in self.dict_queue_keys:
422 item = self.ManagerQueue[key].get(queue_id)
423 if item != None:
424 return item, key
425 return None, None
426
428 try:
429 if self.ManagerQueue['queue_order']:
430 queue_id = self.ManagerQueue['queue_order'].pop(0)
431 return self.ManagerQueue['queue'].pop(queue_id), queue_id
432 except (IndexError,KeyError,):
433 self.entropyTools.print_traceback()
434 return None, None
435
437 if isinstance(obj,(dict,set,frozenset,)):
438 return obj.copy()
439 elif isinstance(obj,(list,tuple,)):
440 return obj[:]
441 return obj
442
444
445 try:
446 self._queue_processor(fork_data)
447 except:
448 if self.QueueLock.locked() and not fork_data:
449 self.QueueLock.release()
450 raise
451
453
454
455 if self.ForkLock.locked(): return
456
457 with self.ForkLock:
458 with self.QueueLock:
459
460 if fork_data:
461 command_data, queue_id = self._queue_copy_obj(fork_data)
462 else:
463 self.load_queue()
464 if self.ManagerQueue['pause']: return
465 if not self.ManagerQueue['queue_order']: return
466 command_data, queue_id = self._pop_item_from_queue()
467 if not command_data: return
468 command_data = self._queue_copy_obj(command_data)
469 command_data['processing_ts'] = "%s" % (self.get_ts(),)
470 self.ManagerQueue['processing'][queue_id] = command_data
471 self.ManagerQueue['processing_order'].append(queue_id)
472 self.store_queue()
473
474 self.remove_queue_ext_rc(queue_id)
475 try:
476 if command_data.get('do_parallel') and not fork_data:
477 t = ParallelTask(self.queue_processor, fork_data = (command_data, queue_id,))
478 t.start()
479 return
480 done, result = self.SystemExecutor.execute_task(command_data)
481 except Exception, e:
482 if self.QueueLock.locked(): self.QueueLock.release()
483 self.entropyTools.print_traceback()
484 done = False
485 result = (False, unicode(e),)
486
487 if command_data.has_key('extended_result') and done:
488 try:
489 command_data['result'], extended_result = self._queue_copy_obj(result)
490 self.store_queue_ext_rc(queue_id, extended_result)
491 except TypeError:
492 done = False
493 command_data['result'] = 'wrong tuple split from queue processor (1)'
494 self.store_queue_ext_rc(queue_id, None)
495 else:
496 command_data['result'] = self._queue_copy_obj(result)
497
498 with self.ForkLock:
499 with self.QueueLock:
500
501 self.load_queue()
502
503 if not done:
504 try:
505 self.ManagerQueue['processing'].pop(queue_id)
506 except KeyError:
507 pass
508 if queue_id in self.ManagerQueue['processing_order']:
509 self.ManagerQueue['processing_order'].remove(queue_id)
510 command_data['errored_ts'] = "%s" % (self.get_ts(),)
511 self.ManagerQueue['errored'][queue_id] = command_data
512 self.ManagerQueue['errored_order'].append(queue_id)
513 self.store_queue()
514 return
515
516 try:
517 done, cmd_result = result
518 except TypeError:
519 done = False
520 command_data['result'] = 'wrong tuple split from queue processor (2)'
521
522 if not done:
523 try:
524 self.ManagerQueue['processing'].pop(queue_id)
525 except KeyError:
526 pass
527 if queue_id in self.ManagerQueue['processing_order']:
528 self.ManagerQueue['processing_order'].remove(queue_id)
529 command_data['errored_ts'] = "%s" % (self.get_ts(),)
530 self.ManagerQueue['errored'][queue_id] = command_data
531 self.ManagerQueue['errored_order'].append(queue_id)
532 self.store_queue()
533 return
534
535 try:
536 self.ManagerQueue['processing'].pop(queue_id)
537 except KeyError:
538 pass
539 if queue_id in self.ManagerQueue['processing_order']:
540 self.ManagerQueue['processing_order'].remove(queue_id)
541 command_data['processed_ts'] = "%s" % (self.get_ts(),)
542 self.ManagerQueue['processed'][queue_id] = command_data
543 self.ManagerQueue['processed_order'].append(queue_id)
544 self.store_queue()
545
546
551