From cfbb0909c5f144e478de9e829560c5bda8b7e094 Mon Sep 17 00:00:00 2001 From: Fabio Erculiani Date: Sun, 10 Oct 2010 17:55:21 +0200 Subject: [PATCH] [services] add client-updates-daemon wrapper (which sets LC_*), move real executable to /usr/libexec, close bug #1880 --- Makefile | 2 + services/client-updates-daemon | 701 +------------------------------ services/entropy-updates-service | 697 ++++++++++++++++++++++++++++++ 3 files changed, 705 insertions(+), 695 deletions(-) mode change 100644 => 100755 services/client-updates-daemon create mode 100755 services/entropy-updates-service diff --git a/Makefile b/Makefile index cf81ed0e9..961166643 100644 --- a/Makefile +++ b/Makefile @@ -95,9 +95,11 @@ updates-daemon-install: mkdir -p $(DESTDIR)/etc/dbus-1/system.d/ mkdir -p $(DESTDIR)$(PREFIX)/sbin/ + mkdir -p $(DESTDIR)$(PREFIX)/libexec/ mkdir -p $(DESTDIR)$(PREFIX)/share/dbus-1/system-services/ mkdir -p $(DESTDIR)$(PREFIX)/share/dbus-1/interfaces/ install -m 744 services/client-updates-daemon $(DESTDIR)$(PREFIX)/sbin/ + install -m 744 services/entropy-updates-service $(DESTDIR)$(PREFIX)/libexec/ install -m 644 misc/dbus/system.d/org.entropy.Client.conf $(DESTDIR)/etc/dbus-1/system.d/ install -m 644 misc/dbus/system-services/org.entropy.Client.service $(DESTDIR)$(PREFIX)/share/dbus-1/system-services/ install -m 644 misc/dbus/interfaces/org.entropy.Client.xml $(DESTDIR)$(PREFIX)/share/dbus-1/interfaces/ diff --git a/services/client-updates-daemon b/services/client-updates-daemon old mode 100644 new mode 100755 index 546ef2e1c..caf50d208 --- a/services/client-updates-daemon +++ b/services/client-updates-daemon @@ -1,697 +1,8 @@ -#!/usr/bin/python2 -O -# -*- coding: utf-8 -*- -""" +#!/bin/sh - @author: Fabio Erculiani - @contact: lxnay@sabayon.org - @copyright: Fabio Erculiani - @license: GPL-2 +lang_id="en_US.UTF-8" +export LC_ALL="${lang_id}" +export LANG="${lang_id}" +export LANGUAGE="${lang_id}" - B{Entropy Package Manager Client service}. - -""" -import os -import sys -# this makes the daemon to not write the entropy pid file -# avoiding to lock other instances -sys.argv.append('--no-pid-handling') -import time -import gobject -import dbus -import dbus.service -import dbus.mainloop.glib -import signal -from threading import Lock, RLock - -DAEMON_DEBUG = False -if "--debug" in sys.argv: - DAEMON_DEBUG = True - -# Entropy imports -sys.path.insert(0,'/usr/lib/entropy/libraries') -sys.path.insert(0,'/usr/lib/entropy/server') -sys.path.insert(0,'/usr/lib/entropy/client') -sys.path.insert(0,'../libraries') -sys.path.insert(0,'../server') -sys.path.insert(0,'../client') - -from entropy.cache import EntropyCacher -# update default writeback timeout -EntropyCacher.WRITEBACK_TIMEOUT = 120 - -from entropy.misc import LogFile -from entropy.i18n import _ -from entropy.exceptions import PermissionDenied, RepositoryError -import entropy.tools as entropy_tools -from entropy.client.interfaces import Client -from entropy.fetchers import UrlFetcher -from entropy.const import etpConst, const_setup_entropy_pid, \ - const_remove_entropy_pid, const_convert_to_rawstring, \ - const_drop_privileges, const_regain_privileges -from entropy.core.settings.base import SystemSettings as SysSet -from entropy.misc import ParallelTask -from entropy.output import TextInterface, nocolor -nocolor() - - -SYS_SETTINGS = SysSet() -TEXT = TextInterface() -DAEMON_LOGFILE = os.path.join(etpConst['syslogdir'],"client-updater.log") -DAEMON_LOG = LogFile(SYS_SETTINGS['system']['log_level']+1, - DAEMON_LOGFILE, header = "[client-updater]") -PREVIOUS_PROGRESS = '' -CHECK_DELAY_SECS = 3600*4 -for xopt in sys.argv: - if xopt.startswith("--secs=") and (len(xopt.split("=")) > 1): - try: - delay_secs = int(xopt.split("=")[1]) - CHECK_DELAY_SECS = delay_secs - except ValueError: - continue - -def write_output(*args, **kwargs): - message = time.strftime('[%H:%M:%S %d/%m/%Y %Z]') + " " + args[0] - global PREVIOUS_PROGRESS - if PREVIOUS_PROGRESS == message: - return - PREVIOUS_PROGRESS = message - DAEMON_LOG.write(message) - DAEMON_LOG.flush() - if DAEMON_DEBUG: - TEXT.output(*args, **kwargs) - -def install_exception_handler(): - sys.excepthook = handle_exception - -def uninstall_exception_handler(): - sys.excepthook = sys.__excepthook__ - -def handle_exception(exc_class, exc_instance, exc_tb): - - t_back = entropy_tools.get_traceback(tb_obj = exc_tb) - - # restore original exception handler, to avoid loops - uninstall_exception_handler() - # write exception to log file - write_output(const_convert_to_rawstring(t_back)) - raise exc_instance - -install_exception_handler() - -class Privileges: - - def __init__(self, drop_privs = True): - self.__drop_privs = drop_privs - self.__drop_privs_lock = RLock() - self.__with_stmt = 0 - - def __enter__(self): - """ - Hold the lock. - """ - self.__drop_privs_lock.acquire() - if self.__with_stmt < 1: - self.regain() - self.__with_stmt += 1 - - def __exit__(self, exc_type, exc_value, traceback): - """ - Drop the lock. - """ - if self.__with_stmt == 1: - self.drop() - self.__with_stmt -= 1 - self.__drop_privs_lock.release() - - def drop(self): - """ - Drop process privileges. Setting unpriv_gid to etpConst['entropygid'] - makes Entropy UGC/Data cache handling working. - """ - if self.__drop_privs: - with self.__drop_privs_lock: - const_drop_privileges(unpriv_gid = etpConst['entropygid']) - - def regain(self): - """ - Regain previously dropped process privileges. - """ - if self.__drop_privs: - with self.__drop_privs_lock: - const_regain_privileges() - -class DaemonUrlFetcher(UrlFetcher): - - daemon_last_avg = 100 - __average = 0 - __downloadedsize = 0 - __remotesize = 0 - __datatransfer = 0 - - def handle_statistics(self, th_id, downloaded_size, total_size, - average, old_average, update_step, show_speed, data_transfer, - time_remaining, time_remaining_secs): - self.__average = average - self.__downloadedsize = downloaded_size - self.__remotesize = total_size - self.__datatransfer = data_transfer - - def update(self): - myavg = abs(int(round(float(self.__average), 1))) - if abs((myavg - self.daemon_last_avg)) < 1: - return - if int(myavg) % 10 == 0: - DAEMON_LOG.write("fetch @ %s%s" % (myavg, "%",)) - self.daemon_last_avg = myavg - - -class Entropy(Client): - - def init_singleton(self): - Client.init_singleton(self, load_ugc = False, - url_fetcher = DaemonUrlFetcher, repo_validation = False) - # validate currently available repos - # manually, to not taint logs - self._validate_repositories(quiet = True) - self.output( - "Loading Entropy Updates daemon: check every %ss, logfile: %s" % ( - CHECK_DELAY_SECS, DAEMON_LOGFILE,) - ) - - def output(self, *args, **kwargs): - return write_output(*args, **kwargs) -Client.__singleton_class__ = Entropy - -class UpdatesDaemon(dbus.service.Object): - - def __init__(self): - - gobject.threads_init() - self._privileges = Privileges() - self._privileges.drop() - self.__alive = False - self.__is_working_mutex = Lock() - self.__updater = None - self.__entropy_cache_cleaner = None - self.__system_changes_checker = None - self.__client_ping_checker = None - self.__oncall_updater = None - self.__quit_service_wd = None - self.__quit_service_trigger = False - self.__trigger_oncall_updater = False - self.__trigger_disable_repos_update = False - self.__trigger_startup_check = False - self.__fetch_mutex = Lock() - self.__updates = [] - self.__updates_atoms = None - self.__system_db_hash = None - self.__last_system_repo_checksum = None - self.__got_client_ping = True - - # start dbus service - object_path = "/notifier" - dbus_loop = dbus.mainloop.glib.DBusGMainLoop(set_as_default = True) - system_bus = dbus.SystemBus(mainloop = dbus_loop) - name = dbus.service.BusName("org.entropy.Client", bus = system_bus) - dbus.service.Object.__init__ (self, system_bus, object_path) - write_output("__init__: dbus service loaded") - - # this seems to avoid race conditions - # with dbus service not being available - time.sleep(2) - - - def start(self): - self.stop() - self.__alive = True - self.__updater = gobject.timeout_add_seconds( - CHECK_DELAY_SECS, self.run_fetcher) - self.__oncall_updater = gobject.timeout_add_seconds( - 3, self.run_oncall_fetcher) - self.__quit_service_wd = gobject.timeout_add_seconds( - 60, self.quit_service_watchdog) - self.__system_changes_checker = gobject.timeout_add_seconds( - 120, self.check_system_changes) - self.__entropy_cache_cleaner = gobject.timeout_add_seconds( - 3600, self.cleanup_entropy_cache) - if "--disable-timer" not in sys.argv: - self.__client_ping_checker = gobject.timeout_add_seconds( - 120, self.check_client_ping) - - def stop(self): - if self.__alive: - self.__alive = False - - if self.__updater is not None: - gobject.source_remove(self.__updater) - - if self.__system_changes_checker is not None: - gobject.source_remove(self.__system_changes_checker) - - if self.__oncall_updater is not None: - gobject.source_remove(self.__oncall_updater) - - if self.__quit_service_wd is not None: - gobject.source_remove(self.__quit_service_wd) - - if self.__client_ping_checker is not None: - gobject.source_remove(self.__client_ping_checker) - - if self.__entropy_cache_cleaner is not None: - gobject.source_remove(self.__entropy_cache_cleaner) - - def do_alert(self, string, msg, urgency = "critical"): - write_output('alert: %s, %s, urgency: %s' % (string, msg, urgency,)) - - def is_system_on_batteries(self): - """ - Return whether System is running on batteries. - - @return: True, if running on batteries - @rtype: bool - """ - ac_powa_exec = "/usr/bin/on_ac_power" - if not os.access(ac_powa_exec, os.X_OK): - return False - ex_rc = os.system(ac_powa_exec) - if ex_rc: - return True - return False - - def check_client_ping(self): - if self.__got_client_ping: - self.__got_client_ping = False - else: # timeout! - if DAEMON_DEBUG: - write_output("check_client_ping: NO client ping, quitting") - self.__quit_service_trigger = True - return self.__alive - - def cleanup_entropy_cache(self): - - with self.__is_working_mutex: - - with self._privileges: - entropy = None - try: - - entropy = Entropy() - acquired = self.__acquire_entropy_locks(entropy, - pid_lock = False) - if not acquired: - return True # respawn later - - if hasattr(entropy, 'clean_downloaded_packages'): - entropy.clean_downloaded_packages() - return False - - finally: - if entropy is not None: - self.__release_entropy_locks(entropy, pid_lock = False) - entropy.shutdown() - - def check_system_changes(self): - if self.__trigger_oncall_updater: - return self.__alive - - changed = self._is_system_changed() - if changed: - # this will disable repositories download, we don't really want - # that in this case. - self.__trigger_disable_repos_update = True - # trigger check and push - self.__trigger_oncall_updater = True - # keep alive - return self.__alive - - def run_oncall_fetcher(self): - if not self.__trigger_oncall_updater: - return self.__alive - with self.__fetch_mutex: - self.__trigger_oncall_updater = False - update_repos = True - if self.__trigger_disable_repos_update: - update_repos = False - self.__trigger_disable_repos_update = False - task = ParallelTask(self.__run_fetcher, update_repos = update_repos) - task.start() - #self.__run_fetcher() - return self.__alive - - def quit_service_watchdog(self): - if self.__quit_service_trigger: - self.stop() - with self.__is_working_mutex: - entropy_tools.kill_threads() - raise SystemExit(0) - return self.__alive - - def run_fetcher(self): - - if self.is_system_on_batteries(): - # running on batteries, then skip - return self.__alive - - with self.__fetch_mutex: - task = ParallelTask(self.__run_fetcher) - task.start() - #self.__run_fetcher() - return self.__alive - - def __run_sync(self, repos, entropy): - - try: - repo_conn = entropy.Repositories( - repo_identifiers = repos, fetch_security = False, - entropy_updates_alert = True) - if DAEMON_DEBUG: - write_output("__run_sync: repository interface loaded") - except Exception, err: - if DAEMON_DEBUG: - write_output( - "__run_sync: Unhandled exception, error: %s" % (err,)) - else: - - # 128: sync error, something bad happened - # 2: repositories not available (all) - # 1: not able to update all the repositories - if DAEMON_DEBUG: - write_output("__run_sync: preparing to run sync") - rc_res = repo_conn.sync() - del repo_conn - if DAEMON_DEBUG: - write_output("__run_sync: sync done") - - if rc_res == 1: - err = _("Not all the repositories have been fetched") - self.do_alert( _("Updates: repository issues"), err ) - return 0 # fine anyway - elif rc_res == 2: - err = _("No repositories found online") - self.do_alert( _("Updates: repository issues"), err ) - return 0 # try to calculate updates anyway - elif rc_res == 128: - err = _("Synchronization errors. Cannot update repositories.") - self.do_alert( _("Updates: sync issues"), err ) - return rc_res - elif isinstance(rc_res, basestring): - self.do_alert( _("Updates: unhandled error"), rc_res ) - return 1 - - return 0 - - def __acquire_entropy_locks(self, entropy, pid_lock = True): - - if pid_lock: - acquired, locked = const_setup_entropy_pid(force_handling = True) - if (not acquired) or locked: - if DAEMON_DEBUG: - if locked: - write_output( - "__acquire_entropy_locks: app. locked, skipping") - else: - write_output( - "__acquire_entropy_locks: app. locked during acquire") - return False - - # entropy resources locked? - locked = entropy.resources_locked() - if locked: - if DAEMON_DEBUG: - write_output( - "__acquire_entropy_locks: resources locked, skipping") - return False - - # acquire resources lock - acquired = entropy.lock_resources() - if not acquired: - if DAEMON_DEBUG: - write_output( - "__acquire_entropy_locks: resources locked during acquire") - return False - - return True - - def __release_entropy_locks(self, entropy, pid_lock = True): - # unlock resources - entropy.unlock_resources() - if pid_lock: - # remove application lock - const_remove_entropy_pid() - - def __run_fetcher(self, update_repos = True): - - if self.__updater == None: - return 0 - - with self.__is_working_mutex: - - with self._privileges: - - rc_fetch = 0 - entropy = None - - try: - - entropy = Entropy() - acquired = self.__acquire_entropy_locks(entropy, - pid_lock = update_repos) - if not acquired: - return rc_fetch - - if DAEMON_DEBUG: - write_output("__run_fetcher: called %s" % ( - time.time(),)) - - if update_repos: - repos_to_up = self.get_repo_status() - if repos_to_up: - - self.do_alert( - _("Repositories to update"), - unicode(repos_to_up), - urgency = 'critical' - ) - - if not self.__trigger_startup_check: - gobject.timeout_add(0, self.signal_updating) - repos = repos_to_up.keys() - rc_fetch = self.__run_sync(repos, entropy) - if rc_fetch != 0: - return rc_fetch - if DAEMON_DEBUG: - write_output("__run_fetcher: sync closed, rc: %s" % ( - rc_fetch,)) - else: - self.__trigger_startup_check = False - if DAEMON_DEBUG: - write_output("__run_fetcher: not syncing atm, " - "trigger startup check is ON, waiting next " - "round, repos: %s" % (repos_to_up,)) - - try: - update, remove, fine, spm_fine = \ - entropy.calculate_updates(use_cache = False) - del fine, remove - except Exception, err: - entropy_tools.print_traceback(f = DAEMON_LOG) - msg = "%s: %s" % (_("Updates: error"), err,) - self.do_alert(_("Updates: error"), msg) - del self.__updates[:] - self.__updates_atoms = None - return 1 - - self.__system_db_hash = \ - entropy.installed_repository().checksum( - do_order = True, strict = False) - self.__updates = update[:] - self.__updates_atoms = None - - if update: - self.do_alert( - _("Updates available"), - "%s %d %s" % ( - _("There are"), len(update), - _("updates available."),), - urgency = 'critical' - ) - gobject.timeout_add(0, self.signal_updates) - else: - self.do_alert( - _("No updates"), - "%s" % (update,), - urgency = 'critical' - ) - gobject.timeout_add(0, self.signal_updates) - - return 0 - - finally: - if entropy is not None: - self.__release_entropy_locks(entropy, - pid_lock = update_repos) - # say goodbye - entropy.shutdown() - - # compare repos status for updates - @dbus.service.method ( "org.entropy.Client", in_signature = '', - out_signature = 'a{si}') - def get_repo_status(self): - - repos = {} - # now get remote - sys_set = SysSet() - for repoid in sys_set['repositories']['available']: - - repo_rev = Entropy.get_repository(repoid).revision(repoid) - online_rev = Entropy.get_repository(repoid).remote_revision(repoid) - if (online_rev == -1) or (repo_rev != online_rev): - - if DAEMON_DEBUG: - write_output( - "get_repo_status: repo needs to be updated: %s" % ( - repoid,)) - - sys_set._clear_repository_cache(repoid = repoid) - repos[repoid] = { - 'local': repo_rev, - 'remote': online_rev, - } - - return repos - - @dbus.service.method ( "org.entropy.Client", in_signature = '', - out_signature = '') - def trigger_check(self): - self.__trigger_oncall_updater = True - - @dbus.service.method ( "org.entropy.Client", in_signature = '', - out_signature = '') - def trigger_startup_check(self): - self.__trigger_oncall_updater = True - self.__trigger_startup_check = True - - @dbus.service.method ( "org.entropy.Client", in_signature = '', - out_signature = 'av') - def get_updates(self): - - entropy = None - try: - - entropy = Entropy() - curr_hash = entropy.installed_repository().checksum( - do_order = True, strict = False) - if curr_hash == self.__system_db_hash: - return self.__updates - - try: - update, remove, fine, spm_fine = \ - entropy.calculate_updates(use_cache = False) - except Exception, err: - entropy_tools.print_traceback(f = DAEMON_LOG) - msg = "get_updates: %s: %s" % (_("Updates: error"), err,) - self.do_alert(_("Updates: error"), msg) - return self.__updates - - self.__updates = update[:] - self.__system_db_hash = curr_hash - return self.__updates - - finally: - if entropy is not None: - entropy.shutdown() - - def _is_system_changed(self): - with self.__is_working_mutex: - - entropy = None - try: - - entropy = Entropy() - locked = entropy.resources_locked() - if locked: - if DAEMON_DEBUG: - write_output("_is_system_changed: resources locked!") - return False # resources are locked, nothing changed yet :P - - last_checksum = self.__last_system_repo_checksum - cur_checksum = entropy.installed_repository().checksum( - strict = False, strings = True) - - changed = last_checksum != cur_checksum - if DAEMON_DEBUG and changed: - write_output("_is_system_changed: system db checksum changed!") - self.__last_system_repo_checksum = cur_checksum - return changed - - finally: - if entropy is not None: - # say goodbye - entropy.shutdown() - - @dbus.service.method("org.entropy.Client", in_signature = '', - out_signature = 'b') - def is_system_changed(self): - return self._is_system_changed() - - @dbus.service.method("org.entropy.Client", in_signature = '', - out_signature = 'av') - def get_updates_atoms(self): - - if self.__updates_atoms is not None: - return self.__updates_atoms - - with self.__is_working_mutex: - atoms = [] - entropy = None - try: - entropy = Entropy() - self.__updates_atoms = [] - for idpackage, repoid in self.__updates: - try: - dbc = entropy.open_repository(repoid) - atoms.append(dbc.retrieveAtom(idpackage)) - except RepositoryError: - continue - self.__updates_atoms.extend(atoms) - return self.__updates_atoms - - finally: - if entropy is not None: - entropy.shutdown() - - - @dbus.service.method("org.entropy.Client", in_signature = '', - out_signature = '') - def client_ping(self): - self.__got_client_ping = True - if DAEMON_DEBUG: - write_output("client_ping: got client ping") - - # signal sent when updates are available for retrive - @dbus.service.signal(dbus_interface = 'org.entropy.Client', - signature = '') - def signal_updates(self): - if DAEMON_DEBUG: - write_output("signal_updates: updates available!") - return False - - # signal sent when daemon is updating the repositories - @dbus.service.signal(dbus_interface = 'org.entropy.Client', - signature = '') - def signal_updating(self): - if DAEMON_DEBUG: - write_output("signal_updating: updating repos!") - return False - -if __name__ == "__main__": - signal.signal(signal.SIGINT, signal.SIG_DFL) - try: - ud_i = UpdatesDaemon() - except dbus.exceptions.DBusException: - raise SystemExit(1) - ud_i.start() - main_loop = gobject.MainLoop() - main_loop.run() - ud_i.stop() - raise SystemExit(0) +exec /usr/libexec/entropy-updates-service ${@} diff --git a/services/entropy-updates-service b/services/entropy-updates-service new file mode 100755 index 000000000..546ef2e1c --- /dev/null +++ b/services/entropy-updates-service @@ -0,0 +1,697 @@ +#!/usr/bin/python2 -O +# -*- coding: utf-8 -*- +""" + + @author: Fabio Erculiani + @contact: lxnay@sabayon.org + @copyright: Fabio Erculiani + @license: GPL-2 + + B{Entropy Package Manager Client service}. + +""" +import os +import sys +# this makes the daemon to not write the entropy pid file +# avoiding to lock other instances +sys.argv.append('--no-pid-handling') +import time +import gobject +import dbus +import dbus.service +import dbus.mainloop.glib +import signal +from threading import Lock, RLock + +DAEMON_DEBUG = False +if "--debug" in sys.argv: + DAEMON_DEBUG = True + +# Entropy imports +sys.path.insert(0,'/usr/lib/entropy/libraries') +sys.path.insert(0,'/usr/lib/entropy/server') +sys.path.insert(0,'/usr/lib/entropy/client') +sys.path.insert(0,'../libraries') +sys.path.insert(0,'../server') +sys.path.insert(0,'../client') + +from entropy.cache import EntropyCacher +# update default writeback timeout +EntropyCacher.WRITEBACK_TIMEOUT = 120 + +from entropy.misc import LogFile +from entropy.i18n import _ +from entropy.exceptions import PermissionDenied, RepositoryError +import entropy.tools as entropy_tools +from entropy.client.interfaces import Client +from entropy.fetchers import UrlFetcher +from entropy.const import etpConst, const_setup_entropy_pid, \ + const_remove_entropy_pid, const_convert_to_rawstring, \ + const_drop_privileges, const_regain_privileges +from entropy.core.settings.base import SystemSettings as SysSet +from entropy.misc import ParallelTask +from entropy.output import TextInterface, nocolor +nocolor() + + +SYS_SETTINGS = SysSet() +TEXT = TextInterface() +DAEMON_LOGFILE = os.path.join(etpConst['syslogdir'],"client-updater.log") +DAEMON_LOG = LogFile(SYS_SETTINGS['system']['log_level']+1, + DAEMON_LOGFILE, header = "[client-updater]") +PREVIOUS_PROGRESS = '' +CHECK_DELAY_SECS = 3600*4 +for xopt in sys.argv: + if xopt.startswith("--secs=") and (len(xopt.split("=")) > 1): + try: + delay_secs = int(xopt.split("=")[1]) + CHECK_DELAY_SECS = delay_secs + except ValueError: + continue + +def write_output(*args, **kwargs): + message = time.strftime('[%H:%M:%S %d/%m/%Y %Z]') + " " + args[0] + global PREVIOUS_PROGRESS + if PREVIOUS_PROGRESS == message: + return + PREVIOUS_PROGRESS = message + DAEMON_LOG.write(message) + DAEMON_LOG.flush() + if DAEMON_DEBUG: + TEXT.output(*args, **kwargs) + +def install_exception_handler(): + sys.excepthook = handle_exception + +def uninstall_exception_handler(): + sys.excepthook = sys.__excepthook__ + +def handle_exception(exc_class, exc_instance, exc_tb): + + t_back = entropy_tools.get_traceback(tb_obj = exc_tb) + + # restore original exception handler, to avoid loops + uninstall_exception_handler() + # write exception to log file + write_output(const_convert_to_rawstring(t_back)) + raise exc_instance + +install_exception_handler() + +class Privileges: + + def __init__(self, drop_privs = True): + self.__drop_privs = drop_privs + self.__drop_privs_lock = RLock() + self.__with_stmt = 0 + + def __enter__(self): + """ + Hold the lock. + """ + self.__drop_privs_lock.acquire() + if self.__with_stmt < 1: + self.regain() + self.__with_stmt += 1 + + def __exit__(self, exc_type, exc_value, traceback): + """ + Drop the lock. + """ + if self.__with_stmt == 1: + self.drop() + self.__with_stmt -= 1 + self.__drop_privs_lock.release() + + def drop(self): + """ + Drop process privileges. Setting unpriv_gid to etpConst['entropygid'] + makes Entropy UGC/Data cache handling working. + """ + if self.__drop_privs: + with self.__drop_privs_lock: + const_drop_privileges(unpriv_gid = etpConst['entropygid']) + + def regain(self): + """ + Regain previously dropped process privileges. + """ + if self.__drop_privs: + with self.__drop_privs_lock: + const_regain_privileges() + +class DaemonUrlFetcher(UrlFetcher): + + daemon_last_avg = 100 + __average = 0 + __downloadedsize = 0 + __remotesize = 0 + __datatransfer = 0 + + def handle_statistics(self, th_id, downloaded_size, total_size, + average, old_average, update_step, show_speed, data_transfer, + time_remaining, time_remaining_secs): + self.__average = average + self.__downloadedsize = downloaded_size + self.__remotesize = total_size + self.__datatransfer = data_transfer + + def update(self): + myavg = abs(int(round(float(self.__average), 1))) + if abs((myavg - self.daemon_last_avg)) < 1: + return + if int(myavg) % 10 == 0: + DAEMON_LOG.write("fetch @ %s%s" % (myavg, "%",)) + self.daemon_last_avg = myavg + + +class Entropy(Client): + + def init_singleton(self): + Client.init_singleton(self, load_ugc = False, + url_fetcher = DaemonUrlFetcher, repo_validation = False) + # validate currently available repos + # manually, to not taint logs + self._validate_repositories(quiet = True) + self.output( + "Loading Entropy Updates daemon: check every %ss, logfile: %s" % ( + CHECK_DELAY_SECS, DAEMON_LOGFILE,) + ) + + def output(self, *args, **kwargs): + return write_output(*args, **kwargs) +Client.__singleton_class__ = Entropy + +class UpdatesDaemon(dbus.service.Object): + + def __init__(self): + + gobject.threads_init() + self._privileges = Privileges() + self._privileges.drop() + self.__alive = False + self.__is_working_mutex = Lock() + self.__updater = None + self.__entropy_cache_cleaner = None + self.__system_changes_checker = None + self.__client_ping_checker = None + self.__oncall_updater = None + self.__quit_service_wd = None + self.__quit_service_trigger = False + self.__trigger_oncall_updater = False + self.__trigger_disable_repos_update = False + self.__trigger_startup_check = False + self.__fetch_mutex = Lock() + self.__updates = [] + self.__updates_atoms = None + self.__system_db_hash = None + self.__last_system_repo_checksum = None + self.__got_client_ping = True + + # start dbus service + object_path = "/notifier" + dbus_loop = dbus.mainloop.glib.DBusGMainLoop(set_as_default = True) + system_bus = dbus.SystemBus(mainloop = dbus_loop) + name = dbus.service.BusName("org.entropy.Client", bus = system_bus) + dbus.service.Object.__init__ (self, system_bus, object_path) + write_output("__init__: dbus service loaded") + + # this seems to avoid race conditions + # with dbus service not being available + time.sleep(2) + + + def start(self): + self.stop() + self.__alive = True + self.__updater = gobject.timeout_add_seconds( + CHECK_DELAY_SECS, self.run_fetcher) + self.__oncall_updater = gobject.timeout_add_seconds( + 3, self.run_oncall_fetcher) + self.__quit_service_wd = gobject.timeout_add_seconds( + 60, self.quit_service_watchdog) + self.__system_changes_checker = gobject.timeout_add_seconds( + 120, self.check_system_changes) + self.__entropy_cache_cleaner = gobject.timeout_add_seconds( + 3600, self.cleanup_entropy_cache) + if "--disable-timer" not in sys.argv: + self.__client_ping_checker = gobject.timeout_add_seconds( + 120, self.check_client_ping) + + def stop(self): + if self.__alive: + self.__alive = False + + if self.__updater is not None: + gobject.source_remove(self.__updater) + + if self.__system_changes_checker is not None: + gobject.source_remove(self.__system_changes_checker) + + if self.__oncall_updater is not None: + gobject.source_remove(self.__oncall_updater) + + if self.__quit_service_wd is not None: + gobject.source_remove(self.__quit_service_wd) + + if self.__client_ping_checker is not None: + gobject.source_remove(self.__client_ping_checker) + + if self.__entropy_cache_cleaner is not None: + gobject.source_remove(self.__entropy_cache_cleaner) + + def do_alert(self, string, msg, urgency = "critical"): + write_output('alert: %s, %s, urgency: %s' % (string, msg, urgency,)) + + def is_system_on_batteries(self): + """ + Return whether System is running on batteries. + + @return: True, if running on batteries + @rtype: bool + """ + ac_powa_exec = "/usr/bin/on_ac_power" + if not os.access(ac_powa_exec, os.X_OK): + return False + ex_rc = os.system(ac_powa_exec) + if ex_rc: + return True + return False + + def check_client_ping(self): + if self.__got_client_ping: + self.__got_client_ping = False + else: # timeout! + if DAEMON_DEBUG: + write_output("check_client_ping: NO client ping, quitting") + self.__quit_service_trigger = True + return self.__alive + + def cleanup_entropy_cache(self): + + with self.__is_working_mutex: + + with self._privileges: + entropy = None + try: + + entropy = Entropy() + acquired = self.__acquire_entropy_locks(entropy, + pid_lock = False) + if not acquired: + return True # respawn later + + if hasattr(entropy, 'clean_downloaded_packages'): + entropy.clean_downloaded_packages() + return False + + finally: + if entropy is not None: + self.__release_entropy_locks(entropy, pid_lock = False) + entropy.shutdown() + + def check_system_changes(self): + if self.__trigger_oncall_updater: + return self.__alive + + changed = self._is_system_changed() + if changed: + # this will disable repositories download, we don't really want + # that in this case. + self.__trigger_disable_repos_update = True + # trigger check and push + self.__trigger_oncall_updater = True + # keep alive + return self.__alive + + def run_oncall_fetcher(self): + if not self.__trigger_oncall_updater: + return self.__alive + with self.__fetch_mutex: + self.__trigger_oncall_updater = False + update_repos = True + if self.__trigger_disable_repos_update: + update_repos = False + self.__trigger_disable_repos_update = False + task = ParallelTask(self.__run_fetcher, update_repos = update_repos) + task.start() + #self.__run_fetcher() + return self.__alive + + def quit_service_watchdog(self): + if self.__quit_service_trigger: + self.stop() + with self.__is_working_mutex: + entropy_tools.kill_threads() + raise SystemExit(0) + return self.__alive + + def run_fetcher(self): + + if self.is_system_on_batteries(): + # running on batteries, then skip + return self.__alive + + with self.__fetch_mutex: + task = ParallelTask(self.__run_fetcher) + task.start() + #self.__run_fetcher() + return self.__alive + + def __run_sync(self, repos, entropy): + + try: + repo_conn = entropy.Repositories( + repo_identifiers = repos, fetch_security = False, + entropy_updates_alert = True) + if DAEMON_DEBUG: + write_output("__run_sync: repository interface loaded") + except Exception, err: + if DAEMON_DEBUG: + write_output( + "__run_sync: Unhandled exception, error: %s" % (err,)) + else: + + # 128: sync error, something bad happened + # 2: repositories not available (all) + # 1: not able to update all the repositories + if DAEMON_DEBUG: + write_output("__run_sync: preparing to run sync") + rc_res = repo_conn.sync() + del repo_conn + if DAEMON_DEBUG: + write_output("__run_sync: sync done") + + if rc_res == 1: + err = _("Not all the repositories have been fetched") + self.do_alert( _("Updates: repository issues"), err ) + return 0 # fine anyway + elif rc_res == 2: + err = _("No repositories found online") + self.do_alert( _("Updates: repository issues"), err ) + return 0 # try to calculate updates anyway + elif rc_res == 128: + err = _("Synchronization errors. Cannot update repositories.") + self.do_alert( _("Updates: sync issues"), err ) + return rc_res + elif isinstance(rc_res, basestring): + self.do_alert( _("Updates: unhandled error"), rc_res ) + return 1 + + return 0 + + def __acquire_entropy_locks(self, entropy, pid_lock = True): + + if pid_lock: + acquired, locked = const_setup_entropy_pid(force_handling = True) + if (not acquired) or locked: + if DAEMON_DEBUG: + if locked: + write_output( + "__acquire_entropy_locks: app. locked, skipping") + else: + write_output( + "__acquire_entropy_locks: app. locked during acquire") + return False + + # entropy resources locked? + locked = entropy.resources_locked() + if locked: + if DAEMON_DEBUG: + write_output( + "__acquire_entropy_locks: resources locked, skipping") + return False + + # acquire resources lock + acquired = entropy.lock_resources() + if not acquired: + if DAEMON_DEBUG: + write_output( + "__acquire_entropy_locks: resources locked during acquire") + return False + + return True + + def __release_entropy_locks(self, entropy, pid_lock = True): + # unlock resources + entropy.unlock_resources() + if pid_lock: + # remove application lock + const_remove_entropy_pid() + + def __run_fetcher(self, update_repos = True): + + if self.__updater == None: + return 0 + + with self.__is_working_mutex: + + with self._privileges: + + rc_fetch = 0 + entropy = None + + try: + + entropy = Entropy() + acquired = self.__acquire_entropy_locks(entropy, + pid_lock = update_repos) + if not acquired: + return rc_fetch + + if DAEMON_DEBUG: + write_output("__run_fetcher: called %s" % ( + time.time(),)) + + if update_repos: + repos_to_up = self.get_repo_status() + if repos_to_up: + + self.do_alert( + _("Repositories to update"), + unicode(repos_to_up), + urgency = 'critical' + ) + + if not self.__trigger_startup_check: + gobject.timeout_add(0, self.signal_updating) + repos = repos_to_up.keys() + rc_fetch = self.__run_sync(repos, entropy) + if rc_fetch != 0: + return rc_fetch + if DAEMON_DEBUG: + write_output("__run_fetcher: sync closed, rc: %s" % ( + rc_fetch,)) + else: + self.__trigger_startup_check = False + if DAEMON_DEBUG: + write_output("__run_fetcher: not syncing atm, " + "trigger startup check is ON, waiting next " + "round, repos: %s" % (repos_to_up,)) + + try: + update, remove, fine, spm_fine = \ + entropy.calculate_updates(use_cache = False) + del fine, remove + except Exception, err: + entropy_tools.print_traceback(f = DAEMON_LOG) + msg = "%s: %s" % (_("Updates: error"), err,) + self.do_alert(_("Updates: error"), msg) + del self.__updates[:] + self.__updates_atoms = None + return 1 + + self.__system_db_hash = \ + entropy.installed_repository().checksum( + do_order = True, strict = False) + self.__updates = update[:] + self.__updates_atoms = None + + if update: + self.do_alert( + _("Updates available"), + "%s %d %s" % ( + _("There are"), len(update), + _("updates available."),), + urgency = 'critical' + ) + gobject.timeout_add(0, self.signal_updates) + else: + self.do_alert( + _("No updates"), + "%s" % (update,), + urgency = 'critical' + ) + gobject.timeout_add(0, self.signal_updates) + + return 0 + + finally: + if entropy is not None: + self.__release_entropy_locks(entropy, + pid_lock = update_repos) + # say goodbye + entropy.shutdown() + + # compare repos status for updates + @dbus.service.method ( "org.entropy.Client", in_signature = '', + out_signature = 'a{si}') + def get_repo_status(self): + + repos = {} + # now get remote + sys_set = SysSet() + for repoid in sys_set['repositories']['available']: + + repo_rev = Entropy.get_repository(repoid).revision(repoid) + online_rev = Entropy.get_repository(repoid).remote_revision(repoid) + if (online_rev == -1) or (repo_rev != online_rev): + + if DAEMON_DEBUG: + write_output( + "get_repo_status: repo needs to be updated: %s" % ( + repoid,)) + + sys_set._clear_repository_cache(repoid = repoid) + repos[repoid] = { + 'local': repo_rev, + 'remote': online_rev, + } + + return repos + + @dbus.service.method ( "org.entropy.Client", in_signature = '', + out_signature = '') + def trigger_check(self): + self.__trigger_oncall_updater = True + + @dbus.service.method ( "org.entropy.Client", in_signature = '', + out_signature = '') + def trigger_startup_check(self): + self.__trigger_oncall_updater = True + self.__trigger_startup_check = True + + @dbus.service.method ( "org.entropy.Client", in_signature = '', + out_signature = 'av') + def get_updates(self): + + entropy = None + try: + + entropy = Entropy() + curr_hash = entropy.installed_repository().checksum( + do_order = True, strict = False) + if curr_hash == self.__system_db_hash: + return self.__updates + + try: + update, remove, fine, spm_fine = \ + entropy.calculate_updates(use_cache = False) + except Exception, err: + entropy_tools.print_traceback(f = DAEMON_LOG) + msg = "get_updates: %s: %s" % (_("Updates: error"), err,) + self.do_alert(_("Updates: error"), msg) + return self.__updates + + self.__updates = update[:] + self.__system_db_hash = curr_hash + return self.__updates + + finally: + if entropy is not None: + entropy.shutdown() + + def _is_system_changed(self): + with self.__is_working_mutex: + + entropy = None + try: + + entropy = Entropy() + locked = entropy.resources_locked() + if locked: + if DAEMON_DEBUG: + write_output("_is_system_changed: resources locked!") + return False # resources are locked, nothing changed yet :P + + last_checksum = self.__last_system_repo_checksum + cur_checksum = entropy.installed_repository().checksum( + strict = False, strings = True) + + changed = last_checksum != cur_checksum + if DAEMON_DEBUG and changed: + write_output("_is_system_changed: system db checksum changed!") + self.__last_system_repo_checksum = cur_checksum + return changed + + finally: + if entropy is not None: + # say goodbye + entropy.shutdown() + + @dbus.service.method("org.entropy.Client", in_signature = '', + out_signature = 'b') + def is_system_changed(self): + return self._is_system_changed() + + @dbus.service.method("org.entropy.Client", in_signature = '', + out_signature = 'av') + def get_updates_atoms(self): + + if self.__updates_atoms is not None: + return self.__updates_atoms + + with self.__is_working_mutex: + atoms = [] + entropy = None + try: + entropy = Entropy() + self.__updates_atoms = [] + for idpackage, repoid in self.__updates: + try: + dbc = entropy.open_repository(repoid) + atoms.append(dbc.retrieveAtom(idpackage)) + except RepositoryError: + continue + self.__updates_atoms.extend(atoms) + return self.__updates_atoms + + finally: + if entropy is not None: + entropy.shutdown() + + + @dbus.service.method("org.entropy.Client", in_signature = '', + out_signature = '') + def client_ping(self): + self.__got_client_ping = True + if DAEMON_DEBUG: + write_output("client_ping: got client ping") + + # signal sent when updates are available for retrive + @dbus.service.signal(dbus_interface = 'org.entropy.Client', + signature = '') + def signal_updates(self): + if DAEMON_DEBUG: + write_output("signal_updates: updates available!") + return False + + # signal sent when daemon is updating the repositories + @dbus.service.signal(dbus_interface = 'org.entropy.Client', + signature = '') + def signal_updating(self): + if DAEMON_DEBUG: + write_output("signal_updating: updating repos!") + return False + +if __name__ == "__main__": + signal.signal(signal.SIGINT, signal.SIG_DFL) + try: + ud_i = UpdatesDaemon() + except dbus.exceptions.DBusException: + raise SystemExit(1) + ud_i.start() + main_loop = gobject.MainLoop() + main_loop.run() + ud_i.stop() + raise SystemExit(0)