[services] entropy-updates-services: avoid loading/unloading Entropy but rather keep an instance alive
This hopefully fixes GC related memleaks exposed here: https://forum.sabayon.org/viewtopic.php?f=24&t=26171
This commit is contained in:
@@ -21,6 +21,7 @@ import dbus
|
||||
import dbus.service
|
||||
import dbus.mainloop.glib
|
||||
import signal
|
||||
import gc
|
||||
from threading import Lock, RLock
|
||||
|
||||
DAEMON_DEBUG = False
|
||||
@@ -193,6 +194,10 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
def __init__(self):
|
||||
|
||||
gobject.threads_init()
|
||||
# do not load/unload every time, this avoids
|
||||
# memleaks:
|
||||
# https://forum.sabayon.org/viewtopic.php?f=24&t=26171
|
||||
self._entropy = Entropy()
|
||||
self._privileges = Privileges()
|
||||
self._privileges.drop()
|
||||
self.__alive = False
|
||||
@@ -304,23 +309,20 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
with self.__is_working_mutex:
|
||||
|
||||
with self._privileges:
|
||||
entropy = None
|
||||
acquired = False
|
||||
try:
|
||||
|
||||
entropy = Entropy()
|
||||
acquired = self.__acquire_entropy_locks(entropy,
|
||||
acquired = self.__acquire_entropy_locks(self._entropy,
|
||||
pid_lock = False)
|
||||
if not acquired:
|
||||
return True # respawn later
|
||||
|
||||
if hasattr(entropy, 'clean_downloaded_packages'):
|
||||
entropy.clean_downloaded_packages()
|
||||
if hasattr(self._entropy, 'clean_downloaded_packages'):
|
||||
self._entropy.clean_downloaded_packages()
|
||||
return False
|
||||
|
||||
finally:
|
||||
if entropy is not None:
|
||||
self.__release_entropy_locks(entropy, pid_lock = False)
|
||||
entropy.shutdown()
|
||||
if acquired:
|
||||
self.__release_entropy_locks(self._entropy, pid_lock = False)
|
||||
|
||||
def entropy_integrity_check(self):
|
||||
|
||||
@@ -331,16 +333,14 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
with self.__is_working_mutex:
|
||||
|
||||
with self._privileges:
|
||||
entropy = None
|
||||
acquired = False
|
||||
try:
|
||||
|
||||
entropy = Entropy()
|
||||
acquired = self.__acquire_entropy_locks(entropy,
|
||||
acquired = self.__acquire_entropy_locks(self._entropy,
|
||||
pid_lock = False)
|
||||
if not acquired:
|
||||
return self.__alive # respawn later
|
||||
|
||||
inst_repo = entropy.installed_repository()
|
||||
inst_repo = self._entropy.installed_repository()
|
||||
if inst_repo is not None:
|
||||
try:
|
||||
inst_repo.integrity_check()
|
||||
@@ -351,9 +351,8 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
return False
|
||||
|
||||
finally:
|
||||
if entropy is not None:
|
||||
self.__release_entropy_locks(entropy, pid_lock = False)
|
||||
entropy.shutdown()
|
||||
if acquired:
|
||||
self.__release_entropy_locks(self._entropy, pid_lock = False)
|
||||
|
||||
def check_system_changes(self):
|
||||
if self.__trigger_oncall_updater:
|
||||
@@ -370,6 +369,7 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
return self.__alive
|
||||
|
||||
def run_oncall_fetcher(self):
|
||||
|
||||
if not self.__trigger_oncall_updater:
|
||||
return self.__alive
|
||||
with self.__fetch_mutex:
|
||||
@@ -467,6 +467,13 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
if DAEMON_DEBUG:
|
||||
write_output(
|
||||
"__acquire_entropy_locks: resources locked, skipping")
|
||||
# reload installed packages repository
|
||||
entropy.close_repositories()
|
||||
entropy.reopen_installed_repository()
|
||||
# garbage collect
|
||||
count = gc.collect()
|
||||
if DAEMON_DEBUG:
|
||||
write_output("__acquire_entropy_locks: GC collected %s" % (count,))
|
||||
return acquired
|
||||
|
||||
def __release_entropy_locks(self, entropy, pid_lock = True):
|
||||
@@ -486,12 +493,11 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
with self._privileges:
|
||||
|
||||
rc_fetch = 0
|
||||
entropy = None
|
||||
acquired = False
|
||||
|
||||
try:
|
||||
|
||||
entropy = Entropy()
|
||||
acquired = self.__acquire_entropy_locks(entropy,
|
||||
acquired = self.__acquire_entropy_locks(self._entropy,
|
||||
pid_lock = update_repos)
|
||||
if not acquired:
|
||||
return rc_fetch
|
||||
@@ -513,7 +519,7 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
if not self.__trigger_startup_check:
|
||||
gobject.timeout_add(0, self.signal_updating)
|
||||
repos = repos_to_up.keys()
|
||||
rc_fetch = self.__run_sync(repos, entropy)
|
||||
rc_fetch = self.__run_sync(repos, self._entropy)
|
||||
if rc_fetch != 0:
|
||||
return rc_fetch
|
||||
if DAEMON_DEBUG:
|
||||
@@ -528,7 +534,7 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
|
||||
try:
|
||||
update, remove, fine, spm_fine = \
|
||||
entropy.calculate_updates(use_cache = False)
|
||||
self._entropy.calculate_updates(use_cache = False)
|
||||
del fine, remove
|
||||
except Exception, err:
|
||||
entropy_tools.print_traceback(f = DAEMON_LOG)
|
||||
@@ -539,7 +545,7 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
return 1
|
||||
|
||||
self.__system_db_hash = \
|
||||
entropy.installed_repository().checksum(
|
||||
self._entropy.installed_repository().checksum(
|
||||
do_order = True, strict = False)
|
||||
self.__updates = update[:]
|
||||
self.__updates_atoms = None
|
||||
@@ -564,11 +570,9 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
return 0
|
||||
|
||||
finally:
|
||||
if entropy is not None:
|
||||
self.__release_entropy_locks(entropy,
|
||||
if acquired:
|
||||
self.__release_entropy_locks(self._entropy,
|
||||
pid_lock = update_repos)
|
||||
# say goodbye
|
||||
entropy.shutdown()
|
||||
|
||||
# compare repos status for updates
|
||||
@dbus.service.method ( "org.entropy.Client", in_signature = '',
|
||||
@@ -623,67 +627,50 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
out_signature = 'av')
|
||||
def get_updates(self):
|
||||
|
||||
entropy = None
|
||||
try:
|
||||
|
||||
entropy = Entropy()
|
||||
curr_hash = entropy.installed_repository().checksum(
|
||||
do_order = True, strict = False)
|
||||
if curr_hash == self.__system_db_hash:
|
||||
return self.__updates
|
||||
|
||||
try:
|
||||
update, remove, fine, spm_fine = \
|
||||
entropy.calculate_updates(use_cache = False)
|
||||
except Exception, err:
|
||||
entropy_tools.print_traceback(f = DAEMON_LOG)
|
||||
msg = "get_updates: %s: %s" % (_("Updates: error"), err,)
|
||||
self.do_alert(_("Updates: error"), msg)
|
||||
return self.__updates
|
||||
|
||||
self.__updates = update[:]
|
||||
self.__system_db_hash = curr_hash
|
||||
curr_hash = self._entropy.installed_repository().checksum(
|
||||
do_order = True, strict = False)
|
||||
if curr_hash == self.__system_db_hash:
|
||||
return self.__updates
|
||||
|
||||
finally:
|
||||
if entropy is not None:
|
||||
entropy.shutdown()
|
||||
try:
|
||||
update, remove, fine, spm_fine = \
|
||||
self._entropy.calculate_updates(use_cache = False)
|
||||
except Exception, err:
|
||||
entropy_tools.print_traceback(f = DAEMON_LOG)
|
||||
msg = "get_updates: %s: %s" % (_("Updates: error"), err,)
|
||||
self.do_alert(_("Updates: error"), msg)
|
||||
return self.__updates
|
||||
|
||||
self.__updates = update[:]
|
||||
self.__system_db_hash = curr_hash
|
||||
return self.__updates
|
||||
|
||||
def _is_system_changed(self):
|
||||
with self.__is_working_mutex:
|
||||
|
||||
entropy = None
|
||||
acquired = False
|
||||
try:
|
||||
|
||||
entropy = Entropy()
|
||||
acquired = False
|
||||
try:
|
||||
acquired = entropy.lock_resources(blocking = False)
|
||||
if not acquired:
|
||||
if DAEMON_DEBUG:
|
||||
write_output(
|
||||
"_is_system_changed: resources locked!")
|
||||
# resources are locked, nothing changed yet :P
|
||||
return False
|
||||
|
||||
last_checksum = self.__last_system_repo_checksum
|
||||
cur_checksum = entropy.installed_repository().checksum(
|
||||
strict = False, strings = True)
|
||||
|
||||
changed = last_checksum != cur_checksum
|
||||
if DAEMON_DEBUG and changed:
|
||||
acquired = self._entropy.lock_resources(blocking = False)
|
||||
if not acquired:
|
||||
if DAEMON_DEBUG:
|
||||
write_output(
|
||||
"_is_system_changed: system db checksum changed!")
|
||||
self.__last_system_repo_checksum = cur_checksum
|
||||
return changed
|
||||
finally:
|
||||
if acquired:
|
||||
entropy.unlock_resources()
|
||||
"_is_system_changed: resources locked!")
|
||||
# resources are locked, nothing changed yet :P
|
||||
return False
|
||||
|
||||
last_checksum = self.__last_system_repo_checksum
|
||||
cur_checksum = self._entropy.installed_repository().checksum(
|
||||
strict = False, strings = True)
|
||||
|
||||
changed = last_checksum != cur_checksum
|
||||
if DAEMON_DEBUG and changed:
|
||||
write_output(
|
||||
"_is_system_changed: system db checksum changed!")
|
||||
self.__last_system_repo_checksum = cur_checksum
|
||||
return changed
|
||||
finally:
|
||||
if entropy is not None:
|
||||
# say goodbye
|
||||
entropy.shutdown()
|
||||
if acquired:
|
||||
self._entropy.unlock_resources()
|
||||
|
||||
@dbus.service.method("org.entropy.Client", in_signature = '',
|
||||
out_signature = 'b')
|
||||
@@ -699,23 +686,15 @@ class UpdatesDaemon(dbus.service.Object):
|
||||
|
||||
with self.__is_working_mutex:
|
||||
atoms = []
|
||||
entropy = None
|
||||
try:
|
||||
entropy = Entropy()
|
||||
self.__updates_atoms = []
|
||||
for idpackage, repoid in self.__updates:
|
||||
try:
|
||||
dbc = entropy.open_repository(repoid)
|
||||
atoms.append(dbc.retrieveAtom(idpackage))
|
||||
except RepositoryError:
|
||||
continue
|
||||
self.__updates_atoms.extend(atoms)
|
||||
return self.__updates_atoms
|
||||
|
||||
finally:
|
||||
if entropy is not None:
|
||||
entropy.shutdown()
|
||||
|
||||
self.__updates_atoms = []
|
||||
for idpackage, repoid in self.__updates:
|
||||
try:
|
||||
dbc = self._entropy.open_repository(repoid)
|
||||
atoms.append(dbc.retrieveAtom(idpackage))
|
||||
except RepositoryError:
|
||||
continue
|
||||
self.__updates_atoms.extend(atoms)
|
||||
return self.__updates_atoms
|
||||
|
||||
@dbus.service.method("org.entropy.Client", in_signature = '',
|
||||
out_signature = '')
|
||||
|
||||
Reference in New Issue
Block a user