diff --git a/lib/entropy/cache.py b/lib/entropy/cache.py index 34f12d468..8698bda48 100644 --- a/lib/entropy/cache.py +++ b/lib/entropy/cache.py @@ -19,12 +19,11 @@ import os import errno import sys import tempfile -import atexit from entropy.const import etpConst, etpUi, const_debug_write, \ const_pid_exists, const_setup_perms from entropy.core import Singleton -from entropy.misc import TimeScheduled, Lifo +from entropy.misc import TimeScheduled, ParallelTask, Lifo import time import threading import copy @@ -50,8 +49,6 @@ class EntropyCacher(Singleton): 'mask_filter': 'match/mask_filter', } - # Max amount of processes to spawn - _PROC_LIMIT = 10 # Max number of cache objects written at once _OBJS_WRITTEN_AT_ONCE = 250 @@ -116,15 +113,12 @@ class EntropyCacher(Singleton): This is the place where all the properties initialization takes place. """ - self.__exit_registered = False self.__copy = copy self.__alive = False self.__cache_writer = None self.__cache_buffer = Lifo() self.__stashing_cache = {} self.__inside_with_stmt = 0 - self.__proc_pids = set() - self.__proc_pids_lock = threading.Lock() self.__dump_data_lock = threading.Lock() self.__worker_sem = threading.Semaphore(0) # this lock ensures that all the writes are hold while it's acquired @@ -156,34 +150,6 @@ class EntropyCacher(Singleton): """ return self.__copy.deepcopy(obj) - def __clean_pids(self): - with self.__proc_pids_lock: - dead_pids = set() - for pid in self.__proc_pids: - - try: - dead = os.waitpid(pid, os.WNOHANG)[0] - except OSError as err: - if err.errno != errno.ECHILD: - raise - dead = True - if dead: - dead_pids.add(pid) - elif not const_pid_exists(pid): - dead_pids.add(pid) - - if dead_pids: - self.__proc_pids.difference_update(dead_pids) - - def __wait_cacher_semaphore(self): - self.__clean_pids() - while len(self.__proc_pids) > EntropyCacher._PROC_LIMIT: - if etpUi['debug']: - const_debug_write(__name__, - "EntropyCacher.__wait_cacher_semaphore: too many pids") - time.sleep(0.1) - self.__clean_pids() - def __cacher(self, run_until_empty = False, sync = False, _loop=False): """ This is where the actual asynchronous copy takes @@ -218,6 +184,12 @@ class EntropyCacher(Singleton): except AttributeError: pass + def _commit_data(_massive_data): + for (key, cache_dir), data in _massive_data: + d_o = entropy.dump.dumpobj + if d_o is not None: + d_o(key, data, dump_dir = cache_dir) + while self.__alive or run_until_empty: if etpUi['debug']: @@ -248,43 +220,30 @@ class EntropyCacher(Singleton): break # stack empty massive_data.append(data) - # this must stay before massive_data to make sure to clean - # every defunct process - self.__wait_cacher_semaphore() - if not massive_data: break - pid = os.fork() - if pid == 0: - # make sure there's nothing weird bound to exception hook - sys.excepthook = sys.__excepthook__ + task = ParallelTask(_commit_data, massive_data) + task.name = "EntropyCacherCommitter" + task.daemon = not sync + task.start() + if sync: + task.join() + + if etpUi['debug']: + const_debug_write( + __name__, + "EntropyCacher.__cacher [%s], writing %s objs" % ( + task, len(massive_data),)) + + if EntropyCacher.STASHING_CACHE: for (key, cache_dir), data in massive_data: - d_o = entropy.dump.dumpobj - if d_o is not None: - d_o(key, data, dump_dir = cache_dir) - os._exit(0) - else: - if etpUi['debug']: - const_debug_write(__name__, - "EntropyCacher.__cacher [%s], writing %s objs" % ( - pid, len(massive_data),)) - with self.__proc_pids_lock: - self.__proc_pids.add(pid) - if sync: try: - os.waitpid(pid, 0) - except OSError as err: - if err.errno != errno.ECHILD: - raise - if EntropyCacher.STASHING_CACHE: - for (key, cache_dir), data in massive_data: - try: - del self.__stashing_cache[(key, cache_dir)] - except (AttributeError, KeyError,): - continue - del massive_data[:] - del massive_data + del self.__stashing_cache[(key, cache_dir)] + except (AttributeError, KeyError,): + continue + del massive_data[:] + del massive_data @staticmethod def current_directory(): @@ -302,14 +261,6 @@ class EntropyCacher(Singleton): @return: None """ - # If EntropyCacher is started, its thread could hang a the process - # termination phase. So, register an exit handler against Python - if not self.__exit_registered: - def _stop_cacher(): - EntropyCacher().stop() - atexit.register(_stop_cacher) - self.__exit_registered = True - self.__cache_buffer.clear() self.__cache_writer = EntropyCacher.SemaphoreTimeScheduled( self.__worker_sem, EntropyCacher.WRITEBACK_TIMEOUT,