[RigoDaemon] add support for Action Queue "introspection" (in a MVCC way)

The new action_queue_items() is able to return the internal
Action Queue status without blocking while returning a consistent
state. This method will be used by Rigo to list the Action Queue
activity on the bottom notification box.
This commit is contained in:
Fabio Erculiani
2012-07-31 16:35:38 +02:00
parent b35c01dda8
commit b5b8d25297
4 changed files with 207 additions and 7 deletions
+151 -6
View File
@@ -25,7 +25,7 @@ import tempfile
import shutil
import subprocess
import copy
from threading import Lock, Timer, Semaphore
from threading import Lock, RLock, Timer, Semaphore
from collections import deque
# this makes the daemon to not write the entropy pid file
@@ -346,7 +346,33 @@ class ApplicationsTransaction(object):
def __init__(self):
self._transactions = {}
self._transactions_mutex = Lock()
self._transactions_mutex = RLock()
self._parent = None
def __enter__(self):
"""
Hold the mutex using the with statement.
"""
self._transactions_mutex.acquire()
def __exit__(self, exc_type, exc_value, tb):
"""
Release the mutex exiting from the with statement.
"""
self._transactions_mutex.release()
def set_parent(self, item):
"""
Set the parent Application. All the other Applications
listed here are belonging from this one.
"""
self._parent = item
def unset_parent(self):
"""
Unset the parent Application.
"""
self._parent = None
def set(self, package_id, repository_id, app_action):
"""
@@ -370,6 +396,14 @@ class ApplicationsTransaction(object):
"""
with self._transactions_mutex:
self._transactions.clear()
self.unset_parent()
def get_parent(self):
"""
Return the parent Application metadata, if any.
Otherwise return None.
"""
return self._parent
def get(self, package_id, repository_id):
"""
@@ -383,13 +417,26 @@ class ApplicationsTransaction(object):
return AppActions.IDLE
return tx
def all(self):
"""
Return the list of Applications currently listed
in ApplicationsTransactions.
Each list item is composed by the following tuple:
(package_id, repository_id, action)
"""
items = []
with self._transactions_mutex:
for (pkg_id, repository_id), action in self._transactions.items():
items.append((pkg_id, repository_id, action))
return items
class RigoDaemonService(dbus.service.Object):
BUS_NAME = DbusConfig.BUS_NAME
OBJECT_PATH = DbusConfig.OBJECT_PATH
API_VERSION = 5
API_VERSION = 6
"""
RigoDaemon is the dbus service Object in charge of executing
@@ -410,6 +457,7 @@ class RigoDaemonService(dbus.service.Object):
self._action = action
self._simulate = simulate
self._authorized = authorized
self._parent = False
@property
def pkg(self):
@@ -454,6 +502,21 @@ class RigoDaemonService(dbus.service.Object):
"""
self._authorized = val
def parent(self):
"""
Return True, if this Application is the one being
currently processed.
"""
return self._parent
def set_parent(self, parent):
"""
Set the current parent metadata State.
If parent is True, the Application is being
currently processed.
"""
self._parent = parent
def __str__(self):
"""
Show item in human readable way
@@ -472,6 +535,7 @@ class RigoDaemonService(dbus.service.Object):
def __init__(self, simulate, authorized):
self._simulate = simulate
self._authorized = authorized
self._parent = False
def simulate(self):
"""
@@ -479,6 +543,12 @@ class RigoDaemonService(dbus.service.Object):
"""
return self._simulate
def action(self):
"""
Return AppActions Action for this UpgradeActionQueueItem.
"""
return AppActions.UPGRADE
def authorized(self):
"""
Return True, if Action has been authorized.
@@ -491,6 +561,21 @@ class RigoDaemonService(dbus.service.Object):
"""
self._authorized = val
def parent(self):
"""
Return True, if this Action is the one being
currently processed.
"""
return self._parent
def set_parent(self, parent):
"""
Set the current parent metadata State.
If parent is True, the Action is being
currently processed.
"""
self._parent = parent
def __str__(self):
"""
Show item in human readable way
@@ -551,7 +636,14 @@ class RigoDaemonService(dbus.service.Object):
# active management.
self._settings_mgmt_serializer = Lock()
# this mutex is used when non threads-safe
# accesses are required, like when we're forced
# to iterate through the deque.
self._action_queue_mutex = Lock()
self._action_queue = deque()
# this should not be merged with action_queue_mutex
# because it masks clients from early action_queue_length = 0
# return values (see action_queue_length()).
self._action_queue_length_mutex = Lock()
self._action_queue_length = 0
self._action_queue_waiter = Semaphore(0)
@@ -1200,7 +1292,8 @@ class RigoDaemonService(dbus.service.Object):
break
try:
item = self._action_queue.popleft()
with self._action_queue_mutex:
item = self._action_queue.popleft()
except IndexError:
# no more items
write_output("_action_queue_worker_thread: "
@@ -1394,6 +1487,8 @@ class RigoDaemonService(dbus.service.Object):
outcome = AppTransactionOutcome.INTERNAL_ERROR
try:
item.set_parent(True)
self._txs.set_parent(item)
if is_app:
if action == AppActions.REMOVE:
@@ -1415,6 +1510,7 @@ class RigoDaemonService(dbus.service.Object):
return outcome
finally:
item.set_parent(False)
self._txs.reset()
def _process_upgrade_action(self, activity, simulate):
@@ -2513,7 +2609,8 @@ class RigoDaemonService(dbus.service.Object):
action,
simulate,
authorized)
self._action_queue.append(item)
with self._action_queue_mutex:
self._action_queue.append(item)
if authorized:
with self._action_queue_length_mutex:
self._action_queue_length += 1
@@ -2639,7 +2736,8 @@ class RigoDaemonService(dbus.service.Object):
pid, PolicyActions.UPGRADE_SYSTEM)
item = self.UpgradeActionQueueItem(
bool(simulate), authorized)
self._action_queue.append(item)
with self._action_queue_mutex:
self._action_queue.append(item)
if authorized:
with self._action_queue_length_mutex:
self._action_queue_length += 1
@@ -2687,6 +2785,53 @@ class RigoDaemonService(dbus.service.Object):
# might temporarily go to -1 ?
return max(0, self._action_queue_length)
@dbus.service.method(BUS_NAME, in_signature='',
out_signature='a(isssba(iss))')
def action_queue_items(self):
"""
Return a list of Applications that are currently on the
execution queue.
"""
write_output("action_queue_items called", debug=True)
parent = None
items = []
with self._action_queue_mutex:
write_output("action_queue_items mutex acquired", debug=True)
with self._txs:
write_output("action_queue_items txs mutex acquired",
debug=True)
parent = self._txs.get_parent()
write_output("action_queue_items: got parent %s" % (parent,),
debug=True)
if parent is not None:
all_txs = self._txs.all()
write_output("action_queue_items: got txs: %d" % (
len(all_txs),),
debug=True)
items.append((parent, all_txs))
# be fast here
for item in self._action_queue:
items.append((item, []))
def _item_map(item_parent):
item, children = item_parent
if isinstance(item, RigoDaemonService.UpgradeActionQueueItem):
return 0, "", "", item.action(), item.simulate(), children
pkg_id, repository_id = item.pkg
path, action, simulate = item.path(), item.action(), item.simulate()
if path is None:
path = ""
return pkg_id, repository_id, path, action, \
simulate, children
# ha ha! now try to guess what's doing...
items = list(filter(lambda x: x is not None, map(_item_map, items)))
return items
@dbus.service.method(BUS_NAME, in_signature='is',
out_signature='s')
def action(self, package_id, repository_id):
@@ -32,6 +32,10 @@
<arg name="length" type="i" direction="out"/>
</method>
<method name="action_queue_items">
<arg name="items" type="a(isssba(iss))" direction="out"/>
</method>
<method name="merge_configuration">
<arg name="source" type="s" direction="in"/>
<arg name="accepted" type="b" direction="out"/>
+1
View File
@@ -49,6 +49,7 @@ class AppActions:
INSTALL = "install"
REMOVE = "remove"
IDLE = "idle"
UPGRADE = "upgrade"
class AppTransactionStates:
+51 -1
View File
@@ -186,7 +186,7 @@ class RigoServiceController(GObject.Object):
_OLD_REPOSITORIES_SIGNAL = "old_repositories"
_NOTICEBOARDS_AVAILABLE_SIGNAL = "noticeboards_available"
_REPOS_SETTINGS_CHANGED_SIGNAL = "repositories_settings_changed"
_SUPPORTED_APIS = [5]
_SUPPORTED_APIS = [6]
def __init__(self, rigo_app, activity_rwsem,
entropy_client, entropy_ws):
@@ -1688,6 +1688,56 @@ class RigoServiceController(GObject.Object):
dbus_interface=self.DBUS_INTERFACE).action_queue_length()
return self._execute_mainloop(_action_queue_length)
def action_queue_items(self):
"""
Return the list of Application objects that are currently processed by
RigoDaemon. This is a kind of instant snapshot of what's going on there.
"""
def _action_queue_items():
return dbus.Interface(
self._entropy_bus,
dbus_interface=self.DBUS_INTERFACE).action_queue_items()
items = self._execute_mainloop(_action_queue_items)
apps = []
for item in items:
pkg_id, r_id, path, daemon_action, simulate, children = item
is_upgrade = False
if daemon_action == DaemonAppActions.UPGRADE:
# special case, system is upgrading, append
# the children list
is_upgrade = True
app_children = None
if children:
app_children = []
for _pkg_id, _r_id, _action in children:
_pkg_id = int(_pkg_id)
_r_id = self._dbus_to_unicode(_r_id)
app = Application(
self._entropy, self._entropy_ws,
self, (_pkg_id, _r_id))
if is_upgrade:
apps.append(app)
else:
app_children.append(app)
if is_upgrade:
continue
pkg_id = int(pkg_id)
r_id = self._dbus_to_unicode(r_id)
if path:
path = self._dbus_to_unicode(path)
else:
path = None
app = Application(self._entropy, self._entropy_ws,
self, (pkg_id, r_id),
package_path=path,
children=app_children)
apps.append(app)
return apps
def action(self, app):
"""
Return Application transaction state (RigoDaemon.AppAction enum