Files
entropy/services/client-updates-daemon

565 lines
18 KiB
Python
Executable File

#!/usr/bin/python2 -O
"""
@author: Fabio Erculiani <lxnay@sabayonlinux.org>
@contact: lxnay@sabayonlinux.org
@copyright: Fabio Erculiani
@license: GPL-2
B{Entropy Package Manager Client service}.
"""
import os
import sys
# this makes the daemon to not write the entropy pid file
# avoiding to lock other instances
sys.argv.append('--no-pid-handling')
import time
import gobject
import dbus
import dbus.service
import dbus.mainloop.glib
import signal
from threading import Lock
DAEMON_DEBUG = False
if "--debug" in sys.argv:
DAEMON_DEBUG = True
# Entropy imports
sys.path.insert(0,'/usr/lib/entropy/libraries')
sys.path.insert(0,'/usr/lib/entropy/server')
sys.path.insert(0,'/usr/lib/entropy/client')
sys.path.insert(0,'../libraries')
sys.path.insert(0,'../server')
sys.path.insert(0,'../client')
if DAEMON_DEBUG:
sys.path.insert(0,'/home/fabio/entropy/libraries')
from entropy.misc import LogFile
from entropy.i18n import _
from entropy.exceptions import PermissionDenied, RepositoryError, \
MissingParameter
import entropy.tools as entropyTools
from entropy.client.interfaces import Client
from entropy.transceivers import UrlFetcher
from entropy.const import etpConst, const_setup_entropy_pid, \
const_remove_entropy_pid
from entropy.core.settings.base import SystemSettings as SysSet
from entropy.misc import ParallelTask
from entropy.output import TextInterface, nocolor
nocolor()
SYS_SETTINGS = SysSet()
TEXT = TextInterface()
DAEMON_LOGFILE = os.path.join(etpConst['syslogdir'],"client-updater.log")
DAEMON_LOG = LogFile(SYS_SETTINGS['system']['log_level']+1,
DAEMON_LOGFILE, header = "[client-updater]")
PREVIOUS_PROGRESS = ''
CHECK_DELAY_SECS = 3600*4
for xopt in sys.argv:
if xopt.startswith("--secs=") and (len(xopt.split("=")) > 1):
try:
delay_secs = int(xopt.split("=")[1])
CHECK_DELAY_SECS = delay_secs
except ValueError:
continue
def write_output(*args, **kwargs):
message = time.strftime('[%H:%M:%S %d/%m/%Y %Z]') + " " + args[0]
global PREVIOUS_PROGRESS
if PREVIOUS_PROGRESS == message:
return
PREVIOUS_PROGRESS = message
DAEMON_LOG.write(message)
DAEMON_LOG.flush()
if DAEMON_DEBUG:
TEXT.updateProgress(*args, **kwargs)
class DaemonUrlFetcher(UrlFetcher):
daemon_last_avg = 100
__average = 0
__downloadedsize = 0
__remotesize = 0
__datatransfer = 0
def handle_statistics(self, th_id, downloaded_size, total_size,
average, old_average, update_step, show_speed, data_transfer,
time_remaining, time_remaining_secs):
self.__average = average
self.__downloadedsize = downloaded_size
self.__remotesize = total_size
self.__datatransfer = data_transfer
def updateProgress(self):
myavg = abs(int(round(float(self.__average), 1)))
if abs((myavg - self.daemon_last_avg)) < 1:
return
if int(myavg) % 10 == 0:
DAEMON_LOG.write("fetch @ %s%s" % (myavg, "%",))
self.daemon_last_avg = myavg
class Entropy(Client):
def init_singleton(self):
Client.init_singleton(self, load_ugc = False,
url_fetcher = DaemonUrlFetcher, repo_validation = False,
xcache = False)
# validate currently available repos
# manually, to not taint logs
self.validate_repositories(quiet = True)
self.updateProgress(
"Loading Entropy Updates daemon: check every %ss, logfile: %s" % (
CHECK_DELAY_SECS, DAEMON_LOGFILE,)
)
# reset application lock status
self.application_lock_check(silent = True)
def updateProgress(self, *args, **kwargs):
return write_output(*args, **kwargs)
class UpdatesDaemon(dbus.service.Object):
def __init__(self):
gobject.threads_init()
if not entropyTools.is_user_in_entropy_group():
raise PermissionDenied('insufficient permissions')
self.__alive = False
self.__is_working_mutex = Lock()
self.__updater = None
self.__system_changes_checker = None
self.__oncall_updater = None
self.__quit_service_wd = None
self.__quit_service_trigger = False
self.__trigger_oncall_updater = False
self.__fetch_mutex = Lock()
self.__updates = []
self.__updates_atoms = []
self.__system_db_hash = None
self.__last_system_db_mtime = None
# start dbus service
object_path = "/notifier"
dbus_loop = dbus.mainloop.glib.DBusGMainLoop(set_as_default = True)
system_bus = dbus.SystemBus(mainloop = dbus_loop)
name = dbus.service.BusName("org.entropy.Client", bus = system_bus)
dbus.service.Object.__init__ (self, system_bus, object_path)
write_output("__init__: dbus service loaded")
# this seems to avoid race conditions
# with dbus service not being available
time.sleep(2)
def start(self):
self.stop()
self.__alive = True
self.__updater = gobject.timeout_add(
CHECK_DELAY_SECS*1000, self.run_fetcher)
self.__oncall_updater = gobject.timeout_add(
1000, self.run_oncall_fetcher)
self.__quit_service_wd = gobject.timeout_add(
2000, self.quit_service_watchdog)
self.__system_changes_checker = gobject.timeout_add(
60*1000, self.check_system_changes)
def stop(self):
if self.__alive:
self.__alive = False
if self.__updater != None:
gobject.source_remove(self.__updater)
if self.__system_changes_checker != None:
gobject.source_remove(self.__system_changes_checker)
if self.__oncall_updater != None:
gobject.source_remove(self.__oncall_updater)
if self.__quit_service_wd != None:
gobject.source_remove(self.__quit_service_wd)
def do_alert(self, string, msg, urgency = "critical"):
write_output('alert: %s, %s, urgency: %s' % (string, msg, urgency,))
def is_system_on_batteries(self):
"""
Return whether System is running on batteries.
@return: True, if running on batteries
@rtype: bool
"""
ac_powa_exec = "/usr/bin/on_ac_power"
if not os.access(ac_powa_exec, os.X_OK):
return False
ex_rc = os.system(ac_powa_exec)
if ex_rc:
return True
return False
def check_system_changes(self):
if self.is_system_on_batteries():
# running on batteries, then skip
return self.__alive
changed = self._is_system_changed()
if changed:
# trigger check and push
self.__trigger_oncall_updater = True
# keep alive
return self.__alive
def run_oncall_fetcher(self):
if not self.__trigger_oncall_updater:
return self.__alive
with self.__fetch_mutex:
self.__trigger_oncall_updater = False
task = ParallelTask(self.__run_fetcher)
task.start()
#self.__run_fetcher()
return self.__alive
def quit_service_watchdog(self):
if self.__quit_service_trigger:
self.__alive = False
with self.__is_working_mutex:
raise SystemExit(0)
return self.__alive
def run_fetcher(self):
if self.is_system_on_batteries():
# running on batteries, then skip
return self.__alive
with self.__fetch_mutex:
task = ParallelTask(self.__run_fetcher)
task.start()
#self.__run_fetcher()
return self.__alive
def __run_sync(self, repos, entropy):
try:
repo_conn = entropy.Repositories(
repos, fetchSecurity = False, noEquoCheck = True)
if DAEMON_DEBUG:
write_output("__run_fetcher: repository interface loaded")
except MissingParameter, err:
if DAEMON_DEBUG:
write_output(
"__run_fetcher: MissingParameter exception, error: %s" % (
err,))
except Exception, err:
if DAEMON_DEBUG:
write_output(
"__run_fetcher: Unhandled exception, error: %s" % (err,))
else:
# 128: sync error, something bad happened
# 2: repositories not available (all)
# 1: not able to update all the repositories
if DAEMON_DEBUG:
write_output("__run_fetcher: preparing to run sync")
rc_res = repo_conn.sync()
del repo_conn
if DAEMON_DEBUG:
write_output("__run_fetcher: sync done")
if rc_res == 1:
err = _("Not all the repositories have been fetched")
self.do_alert( _("Updates: repository issues"), err )
return 0 # fine anyway
elif rc_res == 2:
err = _("No repositories found online")
self.do_alert( _("Updates: repository issues"), err )
return 0 # try to calculate updates anyway
elif rc_res == 128:
err = _("Synchronization errors. Cannot update repositories.")
self.do_alert( _("Updates: sync issues"), err )
return rc_res
elif isinstance(rc_res, basestring):
self.do_alert( _("Updates: unhandled error"), rc_res )
return 1
return 0
def __run_fetcher(self):
if self.__updater == None:
return 0
with self.__is_working_mutex:
rc_fetch = 0
entropy = Entropy()
# check if other apps are using entropy
app_locked = entropy.application_lock_check()
if app_locked:
if DAEMON_DEBUG:
write_output("__run_fetcher: app. locked, skipping")
entropy.destroy()
return rc_fetch
acquired = const_setup_entropy_pid(force_handling = True)
if not acquired:
if DAEMON_DEBUG:
write_output("__run_fetcher: app. locked during acquire")
entropy.destroy()
return rc_fetch
# entropy resources locked?
locked = entropy.resources_check_lock()
if locked:
if DAEMON_DEBUG:
write_output("__run_fetcher: resources locked, skipping")
entropy.destroy()
return rc_fetch
# acquire resources lock
acquired = entropy.resources_create_lock()
if not acquired:
if DAEMON_DEBUG:
write_output("__run_fetcher: resources locked during acquire")
entropy.destroy()
return rc_fetch
try:
if DAEMON_DEBUG:
write_output("__run_fetcher: called %s" % (time.time(),))
repos_to_up = self.get_repo_status(entropy)
if repos_to_up:
self.do_alert(
_("Repositories to update"),
unicode(repos_to_up),
urgency = 'critical'
)
gobject.timeout_add(0, self.signal_updating)
repos = repos_to_up.keys()
rc_fetch = self.__run_sync(repos, entropy)
if rc_fetch != 0:
return rc_fetch
if DAEMON_DEBUG:
write_output("__run_fetcher: sync closed, rc: %s" % (
rc_fetch,))
try:
update, remove, fine, spm_fine = \
entropy.calculate_world_updates()
del fine, remove
except Exception, err:
entropyTools.print_traceback(f = DAEMON_LOG)
msg = "%s: %s" % (_("Updates: error"), err,)
self.do_alert(_("Updates: error"), msg)
return 1
if update:
self.do_alert(
_("Updates available"),
"%s %d %s" % (
_("There are"), len(update),
_("updates available."),),
urgency = 'critical'
)
self.__system_db_hash = entropy.clientDbconn.checksum(
do_order = True, strict = False)
self.__updates = update[:]
del self.__updates_atoms[:]
gobject.timeout_add(0, self.signal_updates)
else:
self.do_alert(
_("No updates"),
"%s" % (update,),
urgency = 'critical'
)
gobject.timeout_add(0, self.signal_updates)
return 0
finally:
# unlock resources
entropy.resources_remove_lock()
# remove application lock
const_remove_entropy_pid()
# say goodbye
entropy.destroy()
# compare repos status for updates
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = 'a{si}')
def get_repo_status(self, entropy = None):
destroy = False
if entropy is None:
destroy = True
entropy = Entropy()
try:
repos = {}
try:
repo_conn = entropy.Repositories(
noEquoCheck = True, fetchSecurity = False)
except MissingParameter:
return repos
except Exception, e:
return repos
# now get remote
for repoid in entropy.SystemSettings['repositories']['available']:
repo_rev = entropy.get_repository_revision(repoid)
online_rev = repo_conn.get_online_repository_revision(repoid)
if (online_rev == -1) or (repo_rev != online_rev):
if DAEMON_DEBUG:
write_output(
"get_repo_status: repo needs to be updated: %s" % (
repoid,))
entropy.repository_move_clear_cache(repoid)
repos[repoid] = {
'local': repo_rev,
'remote': online_rev,
}
del repo_conn
return repos
finally:
if destroy:
entropy.destroy()
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = '')
def trigger_check(self):
self.__trigger_oncall_updater = True
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = 'av')
def get_updates(self):
entropy = Entropy()
try:
curr_hash = entropy.clientDbconn.checksum(
do_order = True, strict = False)
if curr_hash == self.__system_db_hash:
return self.__updates
try:
update, remove, fine, spm_fine = \
entropy.calculate_world_updates()
except Exception, err:
entropyTools.print_traceback(f = DAEMON_LOG)
msg = "get_updates: %s: %s" % (_("Updates: error"), err,)
self.do_alert(_("Updates: error"), msg)
return self.__updates
self.__updates = update[:]
self.__system_db_hash = curr_hash
return self.__updates
finally:
entropy.destroy()
def _is_system_changed(self):
with self.__is_working_mutex:
entropy = Entropy()
locked = entropy.resources_check_lock()
if locked:
if DAEMON_DEBUG:
write_output("_is_system_changed: resources locked!")
entropy.destroy()
return False # resources are locked, nothing changed yet :P
try:
last_mtime = self.__last_system_db_mtime
dbfile = entropy.clientDbconn.dbFile
try:
cur_mtime = os.path.getmtime(dbfile)
except OSError:
cur_mtime = 0.0
changed = last_mtime != cur_mtime
if DAEMON_DEBUG and changed:
write_output("_is_system_changed: system db mtime changed!")
self.__last_system_db_mtime = cur_mtime
return changed
finally:
# say goodbye
entropy.destroy()
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = 'b')
def is_system_changed(self):
return self._is_system_changed()
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = 'av')
def get_updates_atoms(self):
if self.__updates_atoms:
return self.__updates_atoms
with self.__is_working_mutex:
atoms = []
entropy = Entropy()
try:
for idpackage, repoid in self.__updates:
try:
dbc = entropy.open_repository(repoid)
atoms.append(dbc.retrieveAtom(idpackage))
except RepositoryError:
continue
self.__updates_atoms.extend(atoms)
return self.__updates_atoms
finally:
entropy.destroy()
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = '')
def close_connection(self):
self.__quit_service_trigger = True
# signal sent when updates are available for retrive
@dbus.service.signal(dbus_interface = 'org.entropy.Client',
signature = '')
def signal_updates(self):
if DAEMON_DEBUG:
write_output("signal_updates: updates available!")
# signal sent when daemon is updating the repositories
@dbus.service.signal(dbus_interface = 'org.entropy.Client',
signature = '')
def signal_updating(self):
if DAEMON_DEBUG:
write_output("signal_updating: updating repos!")
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
try:
ud_i = UpdatesDaemon()
except dbus.exceptions.DBusException:
raise SystemExit(1)
ud_i.start()
main_loop = gobject.MainLoop()
main_loop.run()
ud_i.stop()
raise SystemExit(0)