Files
entropy/services/client-updates-daemon
Fabio Erculiani ab8dc91766 [services] client-updates-daemon: fix available updates signalling when Sulfur is running
Sulfur holds Entropy general lock, which caused the daemon to not
run its duties whenever the installed packages repository is changed
(due to pkg updates/install/removal). Just skip the lock check since
it is not going to alter the content of available repositories in
any case.
2010-07-27 15:52:49 +02:00

646 lines
22 KiB
Python

#!/usr/bin/python2 -O
# -*- coding: utf-8 -*-
"""
@author: Fabio Erculiani <lxnay@sabayon.org>
@contact: lxnay@sabayon.org
@copyright: Fabio Erculiani
@license: GPL-2
B{Entropy Package Manager Client service}.
"""
import os
import sys
# this makes the daemon to not write the entropy pid file
# avoiding to lock other instances
sys.argv.append('--no-pid-handling')
import time
import gobject
import dbus
import dbus.service
import dbus.mainloop.glib
import signal
from threading import Lock
DAEMON_DEBUG = False
if "--debug" in sys.argv:
DAEMON_DEBUG = True
# Entropy imports
sys.path.insert(0,'/usr/lib/entropy/libraries')
sys.path.insert(0,'/usr/lib/entropy/server')
sys.path.insert(0,'/usr/lib/entropy/client')
sys.path.insert(0,'../libraries')
sys.path.insert(0,'../server')
sys.path.insert(0,'../client')
from entropy.cache import EntropyCacher
# update default writeback timeout
EntropyCacher.WRITEBACK_TIMEOUT = 120
from entropy.misc import LogFile
from entropy.i18n import _
from entropy.exceptions import PermissionDenied, RepositoryError
import entropy.tools as entropy_tools
from entropy.client.interfaces import Client
from entropy.fetchers import UrlFetcher
from entropy.const import etpConst, const_setup_entropy_pid, \
const_remove_entropy_pid, const_convert_to_rawstring
from entropy.core.settings.base import SystemSettings as SysSet
from entropy.misc import ParallelTask
from entropy.output import TextInterface, nocolor
nocolor()
SYS_SETTINGS = SysSet()
TEXT = TextInterface()
DAEMON_LOGFILE = os.path.join(etpConst['syslogdir'],"client-updater.log")
DAEMON_LOG = LogFile(SYS_SETTINGS['system']['log_level']+1,
DAEMON_LOGFILE, header = "[client-updater]")
PREVIOUS_PROGRESS = ''
CHECK_DELAY_SECS = 3600*4
for xopt in sys.argv:
if xopt.startswith("--secs=") and (len(xopt.split("=")) > 1):
try:
delay_secs = int(xopt.split("=")[1])
CHECK_DELAY_SECS = delay_secs
except ValueError:
continue
def write_output(*args, **kwargs):
message = time.strftime('[%H:%M:%S %d/%m/%Y %Z]') + " " + args[0]
global PREVIOUS_PROGRESS
if PREVIOUS_PROGRESS == message:
return
PREVIOUS_PROGRESS = message
DAEMON_LOG.write(message)
DAEMON_LOG.flush()
if DAEMON_DEBUG:
TEXT.output(*args, **kwargs)
def install_exception_handler():
sys.excepthook = handle_exception
def uninstall_exception_handler():
sys.excepthook = sys.__excepthook__
def handle_exception(exc_class, exc_instance, exc_tb):
t_back = entropy_tools.get_traceback(tb_obj = exc_tb)
# restore original exception handler, to avoid loops
uninstall_exception_handler()
# write exception to log file
write_output(const_convert_to_rawstring(t_back))
raise exc_instance
install_exception_handler()
class DaemonUrlFetcher(UrlFetcher):
daemon_last_avg = 100
__average = 0
__downloadedsize = 0
__remotesize = 0
__datatransfer = 0
def handle_statistics(self, th_id, downloaded_size, total_size,
average, old_average, update_step, show_speed, data_transfer,
time_remaining, time_remaining_secs):
self.__average = average
self.__downloadedsize = downloaded_size
self.__remotesize = total_size
self.__datatransfer = data_transfer
def output(self):
myavg = abs(int(round(float(self.__average), 1)))
if abs((myavg - self.daemon_last_avg)) < 1:
return
if int(myavg) % 10 == 0:
DAEMON_LOG.write("fetch @ %s%s" % (myavg, "%",))
self.daemon_last_avg = myavg
class Entropy(Client):
def init_singleton(self):
Client.init_singleton(self, load_ugc = False,
url_fetcher = DaemonUrlFetcher, repo_validation = False)
# validate currently available repos
# manually, to not taint logs
self._validate_repositories(quiet = True)
self.output(
"Loading Entropy Updates daemon: check every %ss, logfile: %s" % (
CHECK_DELAY_SECS, DAEMON_LOGFILE,)
)
def output(self, *args, **kwargs):
return write_output(*args, **kwargs)
Client.__singleton_class__ = Entropy
class UpdatesDaemon(dbus.service.Object):
def __init__(self):
gobject.threads_init()
self.__alive = False
self.__is_working_mutex = Lock()
self.__updater = None
self.__entropy_cache_cleaner = None
self.__system_changes_checker = None
self.__client_ping_checker = None
self.__oncall_updater = None
self.__quit_service_wd = None
self.__quit_service_trigger = False
self.__trigger_oncall_updater = False
self.__trigger_disable_repos_update = False
self.__trigger_startup_check = False
self.__fetch_mutex = Lock()
self.__updates = []
self.__updates_atoms = None
self.__system_db_hash = None
self.__last_system_repo_checksum = None
self.__got_client_ping = True
# start dbus service
object_path = "/notifier"
dbus_loop = dbus.mainloop.glib.DBusGMainLoop(set_as_default = True)
system_bus = dbus.SystemBus(mainloop = dbus_loop)
name = dbus.service.BusName("org.entropy.Client", bus = system_bus)
dbus.service.Object.__init__ (self, system_bus, object_path)
write_output("__init__: dbus service loaded")
# this seems to avoid race conditions
# with dbus service not being available
time.sleep(2)
def start(self):
self.stop()
self.__alive = True
self.__updater = gobject.timeout_add_seconds(
CHECK_DELAY_SECS, self.run_fetcher)
self.__oncall_updater = gobject.timeout_add_seconds(
3, self.run_oncall_fetcher)
self.__quit_service_wd = gobject.timeout_add_seconds(
60, self.quit_service_watchdog)
self.__system_changes_checker = gobject.timeout_add_seconds(
120, self.check_system_changes)
self.__entropy_cache_cleaner = gobject.timeout_add_seconds(
3600, self.cleanup_entropy_cache)
if "--disable-timer" not in sys.argv:
self.__client_ping_checker = gobject.timeout_add_seconds(
120, self.check_client_ping)
def stop(self):
if self.__alive:
self.__alive = False
if self.__updater is not None:
gobject.source_remove(self.__updater)
if self.__system_changes_checker is not None:
gobject.source_remove(self.__system_changes_checker)
if self.__oncall_updater is not None:
gobject.source_remove(self.__oncall_updater)
if self.__quit_service_wd is not None:
gobject.source_remove(self.__quit_service_wd)
if self.__client_ping_checker is not None:
gobject.source_remove(self.__client_ping_checker)
if self.__entropy_cache_cleaner is not None:
gobject.source_remove(self.__entropy_cache_cleaner)
def do_alert(self, string, msg, urgency = "critical"):
write_output('alert: %s, %s, urgency: %s' % (string, msg, urgency,))
def is_system_on_batteries(self):
"""
Return whether System is running on batteries.
@return: True, if running on batteries
@rtype: bool
"""
ac_powa_exec = "/usr/bin/on_ac_power"
if not os.access(ac_powa_exec, os.X_OK):
return False
ex_rc = os.system(ac_powa_exec)
if ex_rc:
return True
return False
def check_client_ping(self):
if self.__got_client_ping:
self.__got_client_ping = False
else: # timeout!
if DAEMON_DEBUG:
write_output("check_client_ping: NO client ping, quitting")
self.__quit_service_trigger = True
return self.__alive
def cleanup_entropy_cache(self):
with self.__is_working_mutex:
entropy = None
try:
entropy = Entropy()
acquired = self.__acquire_entropy_locks(entropy,
pid_lock = False)
if not acquired:
return True # respawn later
if hasattr(entropy, 'clean_downloaded_packages'):
entropy.clean_downloaded_packages()
return False
finally:
if entropy is not None:
self.__release_entropy_locks(entropy)
entropy.shutdown()
def check_system_changes(self):
if self.__trigger_oncall_updater:
return self.__alive
changed = self._is_system_changed()
if changed:
# this will disable repositories download, we don't really want
# that in this case.
self.__trigger_disable_repos_update = True
# trigger check and push
self.__trigger_oncall_updater = True
# keep alive
return self.__alive
def run_oncall_fetcher(self):
if not self.__trigger_oncall_updater:
return self.__alive
with self.__fetch_mutex:
self.__trigger_oncall_updater = False
update_repos = True
if self.__trigger_disable_repos_update:
update_repos = False
self.__trigger_disable_repos_update = False
task = ParallelTask(self.__run_fetcher, update_repos = update_repos)
task.start()
#self.__run_fetcher()
return self.__alive
def quit_service_watchdog(self):
if self.__quit_service_trigger:
self.stop()
with self.__is_working_mutex:
entropy_tools.kill_threads()
raise SystemExit(0)
return self.__alive
def run_fetcher(self):
if self.is_system_on_batteries():
# running on batteries, then skip
return self.__alive
with self.__fetch_mutex:
task = ParallelTask(self.__run_fetcher)
task.start()
#self.__run_fetcher()
return self.__alive
def __run_sync(self, repos, entropy):
try:
repo_conn = entropy.Repositories(
repo_identifiers = repos, fetch_security = False,
entropy_updates_alert = True)
if DAEMON_DEBUG:
write_output("__run_sync: repository interface loaded")
except Exception, err:
if DAEMON_DEBUG:
write_output(
"__run_sync: Unhandled exception, error: %s" % (err,))
else:
# 128: sync error, something bad happened
# 2: repositories not available (all)
# 1: not able to update all the repositories
if DAEMON_DEBUG:
write_output("__run_sync: preparing to run sync")
rc_res = repo_conn.sync()
del repo_conn
if DAEMON_DEBUG:
write_output("__run_sync: sync done")
if rc_res == 1:
err = _("Not all the repositories have been fetched")
self.do_alert( _("Updates: repository issues"), err )
return 0 # fine anyway
elif rc_res == 2:
err = _("No repositories found online")
self.do_alert( _("Updates: repository issues"), err )
return 0 # try to calculate updates anyway
elif rc_res == 128:
err = _("Synchronization errors. Cannot update repositories.")
self.do_alert( _("Updates: sync issues"), err )
return rc_res
elif isinstance(rc_res, basestring):
self.do_alert( _("Updates: unhandled error"), rc_res )
return 1
return 0
def __acquire_entropy_locks(self, entropy, pid_lock = True):
if pid_lock:
acquired, locked = const_setup_entropy_pid(force_handling = True)
if (not acquired) or locked:
if DAEMON_DEBUG:
if locked:
write_output(
"__acquire_entropy_locks: app. locked, skipping")
else:
write_output(
"__acquire_entropy_locks: app. locked during acquire")
return False
# entropy resources locked?
locked = entropy.resources_locked()
if locked:
if DAEMON_DEBUG:
write_output(
"__acquire_entropy_locks: resources locked, skipping")
return False
# acquire resources lock
acquired = entropy.lock_resources()
if not acquired:
if DAEMON_DEBUG:
write_output(
"__acquire_entropy_locks: resources locked during acquire")
return False
return True
def __release_entropy_locks(self, entropy):
# unlock resources
entropy.unlock_resources()
# remove application lock
const_remove_entropy_pid()
def __run_fetcher(self, update_repos = True):
if self.__updater == None:
return 0
with self.__is_working_mutex:
rc_fetch = 0
entropy = None
try:
entropy = Entropy()
acquired = self.__acquire_entropy_locks(entropy,
pid_lock = update_repos)
if not acquired:
return rc_fetch
if DAEMON_DEBUG:
write_output("__run_fetcher: called %s" % (time.time(),))
if update_repos:
repos_to_up = self.get_repo_status()
if repos_to_up:
self.do_alert(
_("Repositories to update"),
unicode(repos_to_up),
urgency = 'critical'
)
if not self.__trigger_startup_check:
gobject.timeout_add(0, self.signal_updating)
repos = repos_to_up.keys()
rc_fetch = self.__run_sync(repos, entropy)
if rc_fetch != 0:
return rc_fetch
if DAEMON_DEBUG:
write_output("__run_fetcher: sync closed, rc: %s" % (
rc_fetch,))
else:
self.__trigger_startup_check = False
if DAEMON_DEBUG:
write_output("__run_fetcher: not syncing atm, "
"trigger startup check is ON, waiting next "
"round, repos: %s" % (repos_to_up,))
try:
update, remove, fine, spm_fine = \
entropy.calculate_updates(use_cache = False)
del fine, remove
except Exception, err:
entropy_tools.print_traceback(f = DAEMON_LOG)
msg = "%s: %s" % (_("Updates: error"), err,)
self.do_alert(_("Updates: error"), msg)
del self.__updates[:]
self.__updates_atoms = None
return 1
self.__system_db_hash = entropy.installed_repository().checksum(
do_order = True, strict = False)
self.__updates = update[:]
self.__updates_atoms = None
if update:
self.do_alert(
_("Updates available"),
"%s %d %s" % (
_("There are"), len(update),
_("updates available."),),
urgency = 'critical'
)
gobject.timeout_add(0, self.signal_updates)
else:
self.do_alert(
_("No updates"),
"%s" % (update,),
urgency = 'critical'
)
gobject.timeout_add(0, self.signal_updates)
return 0
finally:
if entropy is not None:
self.__release_entropy_locks(entropy)
# say goodbye
entropy.shutdown()
# compare repos status for updates
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = 'a{si}')
def get_repo_status(self):
repos = {}
# now get remote
sys_set = SysSet()
for repoid in sys_set['repositories']['available']:
repo_rev = Entropy.get_repository(repoid).revision(repoid)
online_rev = Entropy.get_repository(repoid).remote_revision(repoid)
if (online_rev == -1) or (repo_rev != online_rev):
if DAEMON_DEBUG:
write_output(
"get_repo_status: repo needs to be updated: %s" % (
repoid,))
sys_set._clear_repository_cache(repoid = repoid)
repos[repoid] = {
'local': repo_rev,
'remote': online_rev,
}
return repos
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = '')
def trigger_check(self):
self.__trigger_oncall_updater = True
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = '')
def trigger_startup_check(self):
self.__trigger_oncall_updater = True
self.__trigger_startup_check = True
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = 'av')
def get_updates(self):
entropy = None
try:
entropy = Entropy()
curr_hash = entropy.installed_repository().checksum(
do_order = True, strict = False)
if curr_hash == self.__system_db_hash:
return self.__updates
try:
update, remove, fine, spm_fine = \
entropy.calculate_updates(use_cache = False)
except Exception, err:
entropy_tools.print_traceback(f = DAEMON_LOG)
msg = "get_updates: %s: %s" % (_("Updates: error"), err,)
self.do_alert(_("Updates: error"), msg)
return self.__updates
self.__updates = update[:]
self.__system_db_hash = curr_hash
return self.__updates
finally:
if entropy is not None:
entropy.shutdown()
def _is_system_changed(self):
with self.__is_working_mutex:
entropy = None
try:
entropy = Entropy()
locked = entropy.resources_locked()
if locked:
if DAEMON_DEBUG:
write_output("_is_system_changed: resources locked!")
return False # resources are locked, nothing changed yet :P
last_checksum = self.__last_system_repo_checksum
cur_checksum = entropy.installed_repository().checksum(
strict = False, strings = True)
changed = last_checksum != cur_checksum
if DAEMON_DEBUG and changed:
write_output("_is_system_changed: system db checksum changed!")
self.__last_system_repo_checksum = cur_checksum
return changed
finally:
if entropy is not None:
# say goodbye
entropy.shutdown()
@dbus.service.method("org.entropy.Client", in_signature = '',
out_signature = 'b')
def is_system_changed(self):
return self._is_system_changed()
@dbus.service.method("org.entropy.Client", in_signature = '',
out_signature = 'av')
def get_updates_atoms(self):
if self.__updates_atoms is not None:
return self.__updates_atoms
with self.__is_working_mutex:
atoms = []
entropy = None
try:
entropy = Entropy()
self.__updates_atoms = []
for idpackage, repoid in self.__updates:
try:
dbc = entropy.open_repository(repoid)
atoms.append(dbc.retrieveAtom(idpackage))
except RepositoryError:
continue
self.__updates_atoms.extend(atoms)
return self.__updates_atoms
finally:
if entropy is not None:
entropy.shutdown()
@dbus.service.method("org.entropy.Client", in_signature = '',
out_signature = '')
def client_ping(self):
self.__got_client_ping = True
if DAEMON_DEBUG:
write_output("client_ping: got client ping")
# signal sent when updates are available for retrive
@dbus.service.signal(dbus_interface = 'org.entropy.Client',
signature = '')
def signal_updates(self):
if DAEMON_DEBUG:
write_output("signal_updates: updates available!")
return False
# signal sent when daemon is updating the repositories
@dbus.service.signal(dbus_interface = 'org.entropy.Client',
signature = '')
def signal_updating(self):
if DAEMON_DEBUG:
write_output("signal_updating: updating repos!")
return False
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
try:
ud_i = UpdatesDaemon()
except dbus.exceptions.DBusException:
raise SystemExit(1)
ud_i.start()
main_loop = gobject.MainLoop()
main_loop.run()
ud_i.stop()
raise SystemExit(0)