Files
entropy/services/client-updates-daemon
Fabio Erculiani 824be37347 client updates daemon: entropy resources lock handling
When the daemon is loaded, it should not take over the pid file.
Also, before trying to update the repositories, it should check
if the same resources are locked by something else.
2009-04-30 16:42:10 +02:00

321 lines
11 KiB
Python
Executable File

#!/usr/bin/python2 -O
# -*- coding: utf-8 -*-
"""
# DESCRIPTION:
# Entropy Object Oriented Interface
Copyright (C) 2007-2009 Fabio Erculiani
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
from __future__ import with_statement
import os
import sys
# this makes the daemon to not write the entropy pid file
# avoiding to lock other instances
sys.argv.append('--no-pid-handling')
import time
import gobject
import dbus
import dbus.service
import dbus.mainloop.glib
import signal
from threading import Lock
# Entropy imports
sys.path.insert(0,'/usr/lib/entropy/libraries')
sys.path.insert(0,'/usr/lib/entropy/server')
sys.path.insert(0,'/usr/lib/entropy/client')
sys.path.insert(0,'../libraries')
sys.path.insert(0,'../server')
sys.path.insert(0,'../client')
from entropy.misc import TimeScheduled, ParallelTask
from entropy.i18n import _
from entropy.exceptions import *
import entropy.tools as entropyTools
from entropy.client.interfaces import Client
from entropy.client.interfaces import Repository as RepoInterface
from entropy.transceivers import urlFetcher
from entropy.const import etpConst, ETP_LOGPRI_INFO, ETP_LOGLEVEL_NORMAL
from entropy.misc import LogFile, TimeScheduled
from entropy.core import SystemSettings as SysSet
from entropy.output import TextInterface
SYS_SETTINGS = SysSet()
TEXT = TextInterface()
DAEMON_LOGFILE = os.path.join(etpConst['syslogdir'],"client-updater.log")
DAEMON_LOG = LogFile(SYS_SETTINGS['system']['log_level']+1,
DAEMON_LOGFILE, header = "[client-updater]")
DAEMON_DEBUG = False
CHECK_DELAY_SECS = 3600
if "--debug" in sys.argv:
DAEMON_DEBUG = True
for xopt in sys.argv:
if xopt.startswith("--secs=") and (len(xopt.split("=")) > 1):
try:
delay_secs = int(xopt.split("=")[1])
CHECK_DELAY_SECS = delay_secs
except ValueError:
continue
class DaemonUrlFetcher(urlFetcher):
daemon_last_avg = 100
__average = 0
__downloadedsize = 0
__remotesize = 0
__datatransfer = 0
def handle_statistics(self, th_id, downloaded_size, total_size,
average, old_average, update_step, show_speed, data_transfer,
time_remaining, time_remaining_secs):
self.__average = average
self.__downloadedsize = downloaded_size
self.__remotesize = total_size
self.__datatransfer = data_transfer
def updateProgress(self):
myavg = abs(int(round(float(self.__average),1)))
if abs((myavg - self.daemon_last_avg)) < 1: return
if int(myavg) % 10 == 0:
DAEMON_LOG.write("fetch @ %s%s" % (myavg,"%",))
self.daemon_last_avg = myavg
class Entropy(Client):
__previous_progress = ''
def init_singleton(self):
Client.init_singleton(self, load_ugc = False,
url_fetcher = DaemonUrlFetcher, repo_validation = False)
self.nocolor()
self.updateProgress(
"Loading Entropy Updates daemon: check every %s secs, logfile: %s" % (
CHECK_DELAY_SECS, DAEMON_LOGFILE,)
)
def updateProgress(self, *args, **kwargs):
message = args[0]
if self.__previous_progress == message:
return
self.__previous_progress = message
DAEMON_LOG.write(message)
DAEMON_LOG.flush()
if DAEMON_DEBUG:
TEXT.updateProgress(*args,**kwargs)
class UpdatesDaemon(dbus.service.Object):
def __init__(self):
if not entropyTools.is_user_in_entropy_group():
raise PermissionDenied('insufficient permissions')
self.Entropy = Entropy()
self.__alive = False
self.__updater = None
self.__oncall_updater = None
self.__trigger_oncall_updater = False
self.__fetch_mutex = Lock()
gobject.threads_init()
# start dbus service
object_path = "/org/entropy/Client"
dbus.mainloop.glib.DBusGMainLoop(set_as_default = True)
system_bus = dbus.SystemBus()
name = dbus.service.BusName("org.entropy.Client", system_bus)
dbus.service.Object.__init__ (self, system_bus, object_path)
def start(self):
self.stop()
self.__alive = True
self.__updater = gobject.timeout_add(
CHECK_DELAY_SECS*1000, self.run_fetcher)
self.__oncall_updater = gobject.timeout_add(
1000, self.run_oncall_fetcher)
def stop(self):
if self.__alive:
self.__alive = False
if self.__updater != None:
gobject.source_remove(self.__updater)
if self.__oncall_updater != None:
gobject.source_remove(self.__oncall_updater)
def do_alert(self, string, msg, urgency = "critical"):
TEXT.updateProgress('alert: %s, %s, urgency: %s' % (
string, msg, urgency,))
def run_oncall_fetcher(self):
if not self.__trigger_oncall_updater:
return self.__alive
with self.__fetch_mutex:
self.__trigger_oncall_updater = False
self.__run_fetcher()
return self.__alive
def run_fetcher(self):
with self.__fetch_mutex:
self.__run_fetcher()
return self.__alive
def __run_fetcher(self):
if self.__updater == None:
return 0
# entropy resources locked?
locked = self.Entropy._resources_run_check_lock()
if locked:
if DAEMON_DEBUG:
self.Entropy.updateProgress(
"__run_fetcher: resources locked, skipping")
return 0
if DAEMON_DEBUG:
self.Entropy.updateProgress(
"__run_fetcher: called %s" % (time.time(),))
# this makes sure we are always using the very best
# client db
self.Entropy.reopen_client_repository()
self.Entropy.SystemSettings.clear()
repos_to_up = self.compare_repositories_status()
rc = 0
if repos_to_up:
repos = repos_to_up.keys()
try:
repoConn = self.Entropy.Repositories(
repos, fetchSecurity = False, noEquoCheck = True)
if DAEMON_DEBUG:
self.Entropy.updateProgress(
"__run_fetcher: repository interface loaded")
except MissingParameter, e:
if DAEMON_DEBUG:
self.Entropy.updateProgress(
"__run_fetcher: MissingParameter exception, error: %s" % (e,))
except Exception, e:
if DAEMON_DEBUG:
self.Entropy.updateProgress(
"__run_fetcher: Unhandled exception, error: %s" % (e,))
else:
# 128: sync error, something bad happened
# 2: repositories not available (all)
# 1: not able to update all the repositories
if DAEMON_DEBUG:
self.Entropy.updateProgress(
"__run_fetcher: preparing to run sync")
rc = repoConn.sync()
del repoConn
if DAEMON_DEBUG:
self.Entropy.updateProgress("__run_fetcher: sync done")
if rc == 1:
err = _("Not all the repositories have been fetched for checking")
self.do_alert( _("Updates: repository issues"), err )
return rc
elif rc == 2:
err = _("No repositories found online")
self.do_alert( _("Updates: repository issues"), err )
return rc
elif rc == 128:
err = _("Synchronization errors. Cannot update repositories. Check logs.")
self.do_alert( _("Updates: sync issues"), err )
return rc
elif isinstance(rc, basestring):
self.do_alert( _("Updates: unhandled error"), rc )
return rc
if DAEMON_DEBUG:
self.Entropy.updateProgress(
"__run_fetcher: sync closed, rc: %s" % (rc,))
try:
update, remove, fine = self.Entropy.calculate_world_updates()
del fine, remove
except Exception, e:
entropyTools.print_traceback(f = DAEMON_LOG)
msg = "%s: %s" % (_("Updates: error"),e,)
self.do_alert(_("Updates: error"), msg)
return 1
if update:
self.do_alert(
_("Updates available"),
"%s %d %s" % (
_("There are"),len(update),_("updates available."),),
urgency = 'critical'
)
self.signal_updates()
return 0
# compare repos status for updates
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = 'a{si}')
def compare_repositories_status(self):
repos = {}
try:
repoConn = self.Entropy.Repositories(
noEquoCheck = True, fetchSecurity = False)
except MissingParameter:
return repos
except Exception, e:
return repos
# now get remote
for repoid in self.Entropy.SystemSettings['repositories']['available']:
#self.Entropy.update_repository_revision(repoid)
if repoConn.is_repository_updatable(repoid):
if DAEMON_DEBUG:
self.Entropy.updateProgress(
"compare_repositories_status: repo needs to be updated: %s" % (repoid,))
self.Entropy.repository_move_clear_cache(repoid)
repo_rev = self.Entropy.get_repository_revision(repoid)
online_rev = repoConn.get_online_repository_revision(repoid)
repos[repoid] = {
'local': repo_rev,
'remote': online_rev,
}
del repoConn
return repos
@dbus.service.method ( "org.entropy.Client", in_signature = '',
out_signature = '')
def trigger_check(self):
self.__trigger_oncall_updater = True
# signal sent when updates are available for retrive
@dbus.service.signal(dbus_interface = 'org.entropy.Client',
signature = '')
def signal_updates(self):
if DAEMON_DEBUG:
self.Entropy.updateProgress("signal_updates: updates available!")
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
ud = UpdatesDaemon()
ud.start()
mainloop = gobject.MainLoop()
mainloop.run()
ud.stop()
raise SystemExit(0)