diff --git a/rigo/RigoDaemon/app.py b/rigo/RigoDaemon/app.py index 740623c58..3479b69e0 100755 --- a/rigo/RigoDaemon/app.py +++ b/rigo/RigoDaemon/app.py @@ -25,7 +25,7 @@ import tempfile import shutil import subprocess import copy -from threading import Lock, Timer, Semaphore +from threading import Lock, RLock, Timer, Semaphore from collections import deque # this makes the daemon to not write the entropy pid file @@ -346,7 +346,33 @@ class ApplicationsTransaction(object): def __init__(self): self._transactions = {} - self._transactions_mutex = Lock() + self._transactions_mutex = RLock() + self._parent = None + + def __enter__(self): + """ + Hold the mutex using the with statement. + """ + self._transactions_mutex.acquire() + + def __exit__(self, exc_type, exc_value, tb): + """ + Release the mutex exiting from the with statement. + """ + self._transactions_mutex.release() + + def set_parent(self, item): + """ + Set the parent Application. All the other Applications + listed here are belonging from this one. + """ + self._parent = item + + def unset_parent(self): + """ + Unset the parent Application. + """ + self._parent = None def set(self, package_id, repository_id, app_action): """ @@ -370,6 +396,14 @@ class ApplicationsTransaction(object): """ with self._transactions_mutex: self._transactions.clear() + self.unset_parent() + + def get_parent(self): + """ + Return the parent Application metadata, if any. + Otherwise return None. + """ + return self._parent def get(self, package_id, repository_id): """ @@ -383,13 +417,26 @@ class ApplicationsTransaction(object): return AppActions.IDLE return tx + def all(self): + """ + Return the list of Applications currently listed + in ApplicationsTransactions. + Each list item is composed by the following tuple: + (package_id, repository_id, action) + """ + items = [] + with self._transactions_mutex: + for (pkg_id, repository_id), action in self._transactions.items(): + items.append((pkg_id, repository_id, action)) + return items + class RigoDaemonService(dbus.service.Object): BUS_NAME = DbusConfig.BUS_NAME OBJECT_PATH = DbusConfig.OBJECT_PATH - API_VERSION = 5 + API_VERSION = 6 """ RigoDaemon is the dbus service Object in charge of executing @@ -410,6 +457,7 @@ class RigoDaemonService(dbus.service.Object): self._action = action self._simulate = simulate self._authorized = authorized + self._parent = False @property def pkg(self): @@ -454,6 +502,21 @@ class RigoDaemonService(dbus.service.Object): """ self._authorized = val + def parent(self): + """ + Return True, if this Application is the one being + currently processed. + """ + return self._parent + + def set_parent(self, parent): + """ + Set the current parent metadata State. + If parent is True, the Application is being + currently processed. + """ + self._parent = parent + def __str__(self): """ Show item in human readable way @@ -472,6 +535,7 @@ class RigoDaemonService(dbus.service.Object): def __init__(self, simulate, authorized): self._simulate = simulate self._authorized = authorized + self._parent = False def simulate(self): """ @@ -479,6 +543,12 @@ class RigoDaemonService(dbus.service.Object): """ return self._simulate + def action(self): + """ + Return AppActions Action for this UpgradeActionQueueItem. + """ + return AppActions.UPGRADE + def authorized(self): """ Return True, if Action has been authorized. @@ -491,6 +561,21 @@ class RigoDaemonService(dbus.service.Object): """ self._authorized = val + def parent(self): + """ + Return True, if this Action is the one being + currently processed. + """ + return self._parent + + def set_parent(self, parent): + """ + Set the current parent metadata State. + If parent is True, the Action is being + currently processed. + """ + self._parent = parent + def __str__(self): """ Show item in human readable way @@ -551,7 +636,14 @@ class RigoDaemonService(dbus.service.Object): # active management. self._settings_mgmt_serializer = Lock() + # this mutex is used when non threads-safe + # accesses are required, like when we're forced + # to iterate through the deque. + self._action_queue_mutex = Lock() self._action_queue = deque() + # this should not be merged with action_queue_mutex + # because it masks clients from early action_queue_length = 0 + # return values (see action_queue_length()). self._action_queue_length_mutex = Lock() self._action_queue_length = 0 self._action_queue_waiter = Semaphore(0) @@ -1200,7 +1292,8 @@ class RigoDaemonService(dbus.service.Object): break try: - item = self._action_queue.popleft() + with self._action_queue_mutex: + item = self._action_queue.popleft() except IndexError: # no more items write_output("_action_queue_worker_thread: " @@ -1394,6 +1487,8 @@ class RigoDaemonService(dbus.service.Object): outcome = AppTransactionOutcome.INTERNAL_ERROR try: + item.set_parent(True) + self._txs.set_parent(item) if is_app: if action == AppActions.REMOVE: @@ -1415,6 +1510,7 @@ class RigoDaemonService(dbus.service.Object): return outcome finally: + item.set_parent(False) self._txs.reset() def _process_upgrade_action(self, activity, simulate): @@ -2513,7 +2609,8 @@ class RigoDaemonService(dbus.service.Object): action, simulate, authorized) - self._action_queue.append(item) + with self._action_queue_mutex: + self._action_queue.append(item) if authorized: with self._action_queue_length_mutex: self._action_queue_length += 1 @@ -2639,7 +2736,8 @@ class RigoDaemonService(dbus.service.Object): pid, PolicyActions.UPGRADE_SYSTEM) item = self.UpgradeActionQueueItem( bool(simulate), authorized) - self._action_queue.append(item) + with self._action_queue_mutex: + self._action_queue.append(item) if authorized: with self._action_queue_length_mutex: self._action_queue_length += 1 @@ -2687,6 +2785,53 @@ class RigoDaemonService(dbus.service.Object): # might temporarily go to -1 ? return max(0, self._action_queue_length) + @dbus.service.method(BUS_NAME, in_signature='', + out_signature='a(isssba(iss))') + def action_queue_items(self): + """ + Return a list of Applications that are currently on the + execution queue. + """ + write_output("action_queue_items called", debug=True) + parent = None + items = [] + + with self._action_queue_mutex: + write_output("action_queue_items mutex acquired", debug=True) + with self._txs: + write_output("action_queue_items txs mutex acquired", + debug=True) + + parent = self._txs.get_parent() + write_output("action_queue_items: got parent %s" % (parent,), + debug=True) + + if parent is not None: + all_txs = self._txs.all() + write_output("action_queue_items: got txs: %d" % ( + len(all_txs),), + debug=True) + items.append((parent, all_txs)) + # be fast here + for item in self._action_queue: + items.append((item, [])) + + def _item_map(item_parent): + item, children = item_parent + if isinstance(item, RigoDaemonService.UpgradeActionQueueItem): + return 0, "", "", item.action(), item.simulate(), children + + pkg_id, repository_id = item.pkg + path, action, simulate = item.path(), item.action(), item.simulate() + if path is None: + path = "" + return pkg_id, repository_id, path, action, \ + simulate, children + + # ha ha! now try to guess what's doing... + items = list(filter(lambda x: x is not None, map(_item_map, items))) + return items + @dbus.service.method(BUS_NAME, in_signature='is', out_signature='s') def action(self, package_id, repository_id): diff --git a/rigo/RigoDaemon/dbus/org.sabayon.Rigo.xml b/rigo/RigoDaemon/dbus/org.sabayon.Rigo.xml index a538274bb..effaaf7a2 100644 --- a/rigo/RigoDaemon/dbus/org.sabayon.Rigo.xml +++ b/rigo/RigoDaemon/dbus/org.sabayon.Rigo.xml @@ -32,6 +32,10 @@ + + + + diff --git a/rigo/RigoDaemon/enums.py b/rigo/RigoDaemon/enums.py index a607a29bf..24f1feefa 100644 --- a/rigo/RigoDaemon/enums.py +++ b/rigo/RigoDaemon/enums.py @@ -49,6 +49,7 @@ class AppActions: INSTALL = "install" REMOVE = "remove" IDLE = "idle" + UPGRADE = "upgrade" class AppTransactionStates: diff --git a/rigo/rigo/controllers/daemon.py b/rigo/rigo/controllers/daemon.py index 0a7528b83..0786e1152 100644 --- a/rigo/rigo/controllers/daemon.py +++ b/rigo/rigo/controllers/daemon.py @@ -186,7 +186,7 @@ class RigoServiceController(GObject.Object): _OLD_REPOSITORIES_SIGNAL = "old_repositories" _NOTICEBOARDS_AVAILABLE_SIGNAL = "noticeboards_available" _REPOS_SETTINGS_CHANGED_SIGNAL = "repositories_settings_changed" - _SUPPORTED_APIS = [5] + _SUPPORTED_APIS = [6] def __init__(self, rigo_app, activity_rwsem, entropy_client, entropy_ws): @@ -1688,6 +1688,56 @@ class RigoServiceController(GObject.Object): dbus_interface=self.DBUS_INTERFACE).action_queue_length() return self._execute_mainloop(_action_queue_length) + def action_queue_items(self): + """ + Return the list of Application objects that are currently processed by + RigoDaemon. This is a kind of instant snapshot of what's going on there. + """ + def _action_queue_items(): + return dbus.Interface( + self._entropy_bus, + dbus_interface=self.DBUS_INTERFACE).action_queue_items() + items = self._execute_mainloop(_action_queue_items) + + apps = [] + for item in items: + pkg_id, r_id, path, daemon_action, simulate, children = item + is_upgrade = False + if daemon_action == DaemonAppActions.UPGRADE: + # special case, system is upgrading, append + # the children list + is_upgrade = True + + app_children = None + if children: + app_children = [] + for _pkg_id, _r_id, _action in children: + _pkg_id = int(_pkg_id) + _r_id = self._dbus_to_unicode(_r_id) + app = Application( + self._entropy, self._entropy_ws, + self, (_pkg_id, _r_id)) + if is_upgrade: + apps.append(app) + else: + app_children.append(app) + + if is_upgrade: + continue + + pkg_id = int(pkg_id) + r_id = self._dbus_to_unicode(r_id) + if path: + path = self._dbus_to_unicode(path) + else: + path = None + app = Application(self._entropy, self._entropy_ws, + self, (pkg_id, r_id), + package_path=path, + children=app_children) + apps.append(app) + return apps + def action(self, app): """ Return Application transaction state (RigoDaemon.AppAction enum