Files
entropy/services/client-updates-daemon

532 lines
18 KiB
Python
Executable File

#!/usr/bin/python -O
# -*- coding: utf-8 -*-
"""
# DESCRIPTION:
# Entropy Client Updates daemon
Copyright (C) 2007-2009 Fabio Erculiani
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
from __future__ import with_statement
import os
import sys
# this makes the daemon to not write the entropy pid file
# avoiding to lock other instances
sys.argv.append('--no-pid-handling')
import time
import gobject
import dbus
import dbus.service
import dbus.mainloop.glib
import signal
from threading import Lock
DAEMON_DEBUG = False
if "--debug" in sys.argv:
DAEMON_DEBUG = True
# Entropy imports
sys.path.insert(0,'/usr/lib/entropy/libraries')
sys.path.insert(0,'/usr/lib/entropy/server')
sys.path.insert(0,'/usr/lib/entropy/client')
sys.path.insert(0,'../libraries')
sys.path.insert(0,'../server')
sys.path.insert(0,'../client')
if DAEMON_DEBUG:
sys.path.insert(0,'/home/fabio/entropy/libraries')
from entropy.misc import LogFile
from entropy.i18n import _
from entropy.exceptions import PermissionDenied, RepositoryError, \
MissingParameter
import entropy.tools as entropyTools
from entropy.client.interfaces import Client
from entropy.transceivers import UrlFetcher
from entropy.const import etpConst
from entropy.core import SystemSettings as SysSet
from entropy.misc import ParallelTask
from entropy.output import TextInterface, nocolor
nocolor()
SYS_SETTINGS = SysSet()
TEXT = TextInterface()
DAEMON_LOGFILE = os.path.join(etpConst['syslogdir'],"client-updater.log")
DAEMON_LOG = LogFile(SYS_SETTINGS['system']['log_level']+1,
DAEMON_LOGFILE, header = "[client-updater]")
PREVIOUS_PROGRESS = ''
CHECK_DELAY_SECS = 3600*4
for xopt in sys.argv:
if xopt.startswith("--secs=") and (len(xopt.split("=")) > 1):
try:
delay_secs = int(xopt.split("=")[1])
CHECK_DELAY_SECS = delay_secs
except ValueError:
continue
def write_output(*args, **kwargs):
message = time.strftime('[%H:%M:%S %d/%m/%Y %Z]') + " " + args[0]
global PREVIOUS_PROGRESS
if PREVIOUS_PROGRESS == message:
return
PREVIOUS_PROGRESS = message
DAEMON_LOG.write(message)
DAEMON_LOG.flush()
if DAEMON_DEBUG:
TEXT.updateProgress(*args, **kwargs)
class DaemonUrlFetcher(UrlFetcher):
daemon_last_avg = 100
__average = 0
__downloadedsize = 0
__remotesize = 0
__datatransfer = 0
def handle_statistics(self, th_id, downloaded_size, total_size,
average, old_average, update_step, show_speed, data_transfer,
time_remaining, time_remaining_secs):
self.__average = average
self.__downloadedsize = downloaded_size
self.__remotesize = total_size
self.__datatransfer = data_transfer
def updateProgress(self):
myavg = abs(int(round(float(self.__average), 1)))
if abs((myavg - self.daemon_last_avg)) < 1:
return
if int(myavg) % 10 == 0:
DAEMON_LOG.write("fetch @ %s%s" % (myavg, "%",))
self.daemon_last_avg = myavg
class Entropy(Client):
def init_singleton(self):
Client.init_singleton(self, load_ugc = False,
url_fetcher = DaemonUrlFetcher, repo_validation = False,
xcache = False)
# validate currently available repos
# manually, to not taint logs
self.validate_repositories(quiet = True)
self.updateProgress(
"Loading Entropy Updates daemon: check every %ss, logfile: %s" % (
CHECK_DELAY_SECS, DAEMON_LOGFILE,)
)
def updateProgress(self, *args, **kwargs):
return write_output(*args, **kwargs)
class UpdatesDaemon(dbus.service.Object):
def __init__(self):
gobject.threads_init()
if not entropyTools.is_user_in_entropy_group():
raise PermissionDenied('insufficient permissions')
self.__alive = False
self.__is_working_mutex = Lock()
self.__updater = None
self.__system_changes_checker = None
self.__oncall_updater = None
self.__quit_service_wd = None
self.__quit_service_trigger = False
self.__trigger_oncall_updater = False
self.__fetch_mutex = Lock()
self.__updates = []
self.__updates_atoms = []
self.__system_db_hash = None
self.__last_system_db_mtime = None
# start dbus service
object_path = "/notifier"
dbus_loop = dbus.mainloop.glib.DBusGMainLoop(set_as_default = True)
system_bus = dbus.SystemBus(mainloop = dbus_loop)
name = dbus.service.BusName("org.entropy.Client", bus = system_bus)
dbus.service.Object.__init__ (self, system_bus, object_path)
write_output("__init__: dbus service loaded")
# this seems to avoid race conditions
# with dbus service not being available
time.sleep(2)
def start(self):
self.stop()
self.__alive = True
self.__updater = gobject.timeout_add(
CHECK_DELAY_SECS*1000, self.run_fetcher)
self.__oncall_updater = gobject.timeout_add(
1000, self.run_oncall_fetcher)
self.__quit_service_wd = gobject.timeout_add(
2000, self.quit_service_watchdog)
self.__system_changes_checker = gobject.timeout_add(
60*1000, self.check_system_changes)
def stop(self):
if self.__alive:
self.__alive = False
if self.__updater != None:
gobject.source_remove(self.__updater)
if self.__system_changes_checker != None:
gobject.source_remove(self.__system_changes_checker)
if self.__oncall_updater != None:
gobject.source_remove(self.__oncall_updater)
if self.__quit_service_wd != None:
gobject.source_remove(self.__quit_service_wd)
def do_alert(self, string, msg, urgency = "critical"):
write_output('alert: %s, %s, urgency: %s' % (string, msg, urgency,))
def check_system_changes(self):
changed = self._is_system_changed()
if changed:
# trigger check and push
self.__trigger_oncall_updater = True
# keep alive
return self.__alive
def run_oncall_fetcher(self):
if not self.__trigger_oncall_updater:
return self.__alive
with self.__fetch_mutex:
self.__trigger_oncall_updater = False
task = ParallelTask(self.__run_fetcher)
task.start()
#self.__run_fetcher()
return self.__alive
def quit_service_watchdog(self):
if self.__quit_service_trigger:
self.__alive = False
with self.__is_working_mutex:
raise SystemExit(0)
return self.__alive
def run_fetcher(self):
with self.__fetch_mutex:
task = ParallelTask(self.__run_fetcher)
task.start()
#self.__run_fetcher()
return self.__alive
def __run_sync(self, repos, entropy):
try:
repo_conn = entropy.Repositories(
repos, fetchSecurity = False, noEquoCheck = True)
if DAEMON_DEBUG:
write_output("__run_fetcher: repository interface loaded")
except MissingParameter, err:
if DAEMON_DEBUG:
write_output(
"__run_fetcher: MissingParameter exception, error: %s" % (
err,))
except Exception, err:
if DAEMON_DEBUG:
write_output(
"__run_fetcher: Unhandled exception, error: %s" % (err,))
else:
# 128: sync error, something bad happened
# 2: repositories not available (all)
# 1: not able to update all the repositories
if DAEMON_DEBUG:
write_output("__run_fetcher: preparing to run sync")
rc_res = repo_conn.sync()
del repo_conn
if DAEMON_DEBUG:
write_output("__run_fetcher: sync done")
if rc_res == 1:
err = _("Not all the repositories have been fetched")
self.do_alert( _("Updates: repository issues"), err )
return 0 # fine anyway
elif rc_res == 2:
err = _("No repositories found online")
self.do_alert( _("Updates: repository issues"), err )
return 0 # try to calculate updates anyway
elif rc_res == 128:
err = _("Synchronization errors. Cannot update repositories.")
self.do_alert( _("Updates: sync issues"), err )
return rc_res
elif isinstance(rc_res, basestring):
self.do_alert( _("Updates: unhandled error"), rc_res )
return 1
return 0
def __run_fetcher(self):
if self.__updater == None:
return 0
with self.__is_working_mutex:
rc_fetch = 0
entropy = Entropy()
# entropy resources locked?
locked = entropy.resources_check_lock()
if locked:
if DAEMON_DEBUG:
write_output("__run_fetcher: resources locked, skipping")
entropy.destroy()
return rc_fetch
# lock
entropy.resources_create_lock()
try:
if DAEMON_DEBUG:
write_output("__run_fetcher: called %s" % (time.time(),))
repos_to_up = self.get_repo_status(entropy)
if repos_to_up:
self.do_alert(
_("Repositories to update"),
unicode(repos_to_up),
urgency = 'critical'
)
gobject.timeout_add(0, self.signal_updating)
repos = repos_to_up.keys()
rc_fetch = self.__run_sync(repos, entropy)
if rc_fetch != 0:
return rc_fetch
if DAEMON_DEBUG:
write_output("__run_fetcher: sync closed, rc: %s" % (
rc_fetch,))
try:
update, remove, fine, spm_fine = \
entropy.calculate_world_updates()
del fine, remove
except Exception, err:
entropyTools.print_traceback(f = DAEMON_LOG)
msg = "%s: %s" % (_("Updates: error"), err,)
self.do_alert(_("Updates: error"), msg)
return 1
if update:
self.do_alert(
_("Updates available"),
"%s %d %s" % (
_("There are"), len(update),
_("updates available."),),
urgency = 'critical'
)
self.__system_db_hash = entropy.clientDbconn.checksum(
do_order = True, strict = False)
self.__updates = update[:]
del self.__updates_atoms[:]
gobject.timeout_add(0, self.signal_updates)
else:
self.do_alert(
_("No updates"),
"%s" % (update,),
urgency = 'critical'
)
gobject.timeout_add(0, self.signal_updates)
return 0
finally:
# unlock resources
entropy.resources_remove_lock()
# say goodbye
entropy.destroy()
# compare repos status for updates
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = 'a{si}')
def get_repo_status(self, entropy = None):
destroy = False
if entropy is None:
destroy = True
entropy = Entropy()
try:
repos = {}
try:
repo_conn = entropy.Repositories(
noEquoCheck = True, fetchSecurity = False)
except MissingParameter:
return repos
except Exception, e:
return repos
# now get remote
for repoid in entropy.SystemSettings['repositories']['available']:
repo_rev = entropy.get_repository_revision(repoid)
online_rev = repo_conn.get_online_repository_revision(repoid)
if (online_rev == -1) or (repo_rev != online_rev):
if DAEMON_DEBUG:
write_output(
"get_repo_status: repo needs to be updated: %s" % (
repoid,))
entropy.repository_move_clear_cache(repoid)
repos[repoid] = {
'local': repo_rev,
'remote': online_rev,
}
del repo_conn
return repos
finally:
if destroy:
entropy.destroy()
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = '')
def trigger_check(self):
self.__trigger_oncall_updater = True
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = 'av')
def get_updates(self):
entropy = Entropy()
try:
curr_hash = entropy.clientDbconn.checksum(
do_order = True, strict = False)
if curr_hash == self.__system_db_hash:
return self.__updates
try:
update, remove, fine, spm_fine = \
entropy.calculate_world_updates()
except Exception, err:
entropyTools.print_traceback(f = DAEMON_LOG)
msg = "get_updates: %s: %s" % (_("Updates: error"), err,)
self.do_alert(_("Updates: error"), msg)
return self.__updates
self.__updates = update[:]
self.__system_db_hash = curr_hash
return self.__updates
finally:
entropy.destroy()
def _is_system_changed(self):
with self.__is_working_mutex:
entropy = Entropy()
locked = entropy.resources_check_lock()
if locked:
if DAEMON_DEBUG:
write_output("_is_system_changed: resources locked!")
entropy.destroy()
return False # resources are locked, nothing changed yet :P
# lock
entropy.resources_create_lock()
try:
last_mtime = self.__last_system_db_mtime
dbfile = entropy.clientDbconn.dbFile
try:
cur_mtime = os.path.getmtime(dbfile)
except OSError:
cur_mtime = 0.0
changed = last_mtime != cur_mtime
if DAEMON_DEBUG and changed:
write_output("_is_system_changed: system db mtime changed!")
self.__last_system_db_mtime = cur_mtime
return changed
finally:
# unlock resources
entropy.resources_remove_lock()
# say goodbye
entropy.destroy()
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = 'b')
def is_system_changed(self):
return self._is_system_changed()
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = 'av')
def get_updates_atoms(self):
if self.__updates_atoms:
return self.__updates_atoms
with self.__is_working_mutex:
atoms = []
entropy = Entropy()
try:
for idpackage, repoid in self.__updates:
try:
dbc = entropy.open_repository(repoid)
atoms.append(dbc.retrieveAtom(idpackage))
except RepositoryError:
continue
self.__updates_atoms.extend(atoms)
return self.__updates_atoms
finally:
entropy.destroy()
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = '')
def close_connection(self):
self.__quit_service_trigger = True
# signal sent when updates are available for retrive
@dbus.service.signal(dbus_interface = 'org.entropy.Client',
signature = '')
def signal_updates(self):
if DAEMON_DEBUG:
write_output("signal_updates: updates available!")
# signal sent when daemon is updating the repositories
@dbus.service.signal(dbus_interface = 'org.entropy.Client',
signature = '')
def signal_updating(self):
if DAEMON_DEBUG:
write_output("signal_updating: updating repos!")
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
try:
ud_i = UpdatesDaemon()
except dbus.exceptions.DBusException:
raise SystemExit(1)
ud_i.start()
main_loop = gobject.MainLoop()
main_loop.run()
ud_i.stop()
raise SystemExit(0)