From b25e2b8f03f9fec101a01f72b48b7a1ee804e4e5 Mon Sep 17 00:00:00 2001 From: Fabio Erculiani Date: Sun, 9 Sep 2012 17:31:47 +0200 Subject: [PATCH] [entropy.cache] replace fork() usage with multithreading. Mixing multiprocessing with multithreading is bad and we all know that. However, in this specific case there was nothing wrong in running tiny functions in another process. It seems that entropy.dump.dumpobj() is efficient enough nowadays to have it running in the same process anyway. --- lib/entropy/cache.py | 101 +++++++++++-------------------------------- 1 file changed, 26 insertions(+), 75 deletions(-) diff --git a/lib/entropy/cache.py b/lib/entropy/cache.py index 34f12d468..8698bda48 100644 --- a/lib/entropy/cache.py +++ b/lib/entropy/cache.py @@ -19,12 +19,11 @@ import os import errno import sys import tempfile -import atexit from entropy.const import etpConst, etpUi, const_debug_write, \ const_pid_exists, const_setup_perms from entropy.core import Singleton -from entropy.misc import TimeScheduled, Lifo +from entropy.misc import TimeScheduled, ParallelTask, Lifo import time import threading import copy @@ -50,8 +49,6 @@ class EntropyCacher(Singleton): 'mask_filter': 'match/mask_filter', } - # Max amount of processes to spawn - _PROC_LIMIT = 10 # Max number of cache objects written at once _OBJS_WRITTEN_AT_ONCE = 250 @@ -116,15 +113,12 @@ class EntropyCacher(Singleton): This is the place where all the properties initialization takes place. """ - self.__exit_registered = False self.__copy = copy self.__alive = False self.__cache_writer = None self.__cache_buffer = Lifo() self.__stashing_cache = {} self.__inside_with_stmt = 0 - self.__proc_pids = set() - self.__proc_pids_lock = threading.Lock() self.__dump_data_lock = threading.Lock() self.__worker_sem = threading.Semaphore(0) # this lock ensures that all the writes are hold while it's acquired @@ -156,34 +150,6 @@ class EntropyCacher(Singleton): """ return self.__copy.deepcopy(obj) - def __clean_pids(self): - with self.__proc_pids_lock: - dead_pids = set() - for pid in self.__proc_pids: - - try: - dead = os.waitpid(pid, os.WNOHANG)[0] - except OSError as err: - if err.errno != errno.ECHILD: - raise - dead = True - if dead: - dead_pids.add(pid) - elif not const_pid_exists(pid): - dead_pids.add(pid) - - if dead_pids: - self.__proc_pids.difference_update(dead_pids) - - def __wait_cacher_semaphore(self): - self.__clean_pids() - while len(self.__proc_pids) > EntropyCacher._PROC_LIMIT: - if etpUi['debug']: - const_debug_write(__name__, - "EntropyCacher.__wait_cacher_semaphore: too many pids") - time.sleep(0.1) - self.__clean_pids() - def __cacher(self, run_until_empty = False, sync = False, _loop=False): """ This is where the actual asynchronous copy takes @@ -218,6 +184,12 @@ class EntropyCacher(Singleton): except AttributeError: pass + def _commit_data(_massive_data): + for (key, cache_dir), data in _massive_data: + d_o = entropy.dump.dumpobj + if d_o is not None: + d_o(key, data, dump_dir = cache_dir) + while self.__alive or run_until_empty: if etpUi['debug']: @@ -248,43 +220,30 @@ class EntropyCacher(Singleton): break # stack empty massive_data.append(data) - # this must stay before massive_data to make sure to clean - # every defunct process - self.__wait_cacher_semaphore() - if not massive_data: break - pid = os.fork() - if pid == 0: - # make sure there's nothing weird bound to exception hook - sys.excepthook = sys.__excepthook__ + task = ParallelTask(_commit_data, massive_data) + task.name = "EntropyCacherCommitter" + task.daemon = not sync + task.start() + if sync: + task.join() + + if etpUi['debug']: + const_debug_write( + __name__, + "EntropyCacher.__cacher [%s], writing %s objs" % ( + task, len(massive_data),)) + + if EntropyCacher.STASHING_CACHE: for (key, cache_dir), data in massive_data: - d_o = entropy.dump.dumpobj - if d_o is not None: - d_o(key, data, dump_dir = cache_dir) - os._exit(0) - else: - if etpUi['debug']: - const_debug_write(__name__, - "EntropyCacher.__cacher [%s], writing %s objs" % ( - pid, len(massive_data),)) - with self.__proc_pids_lock: - self.__proc_pids.add(pid) - if sync: try: - os.waitpid(pid, 0) - except OSError as err: - if err.errno != errno.ECHILD: - raise - if EntropyCacher.STASHING_CACHE: - for (key, cache_dir), data in massive_data: - try: - del self.__stashing_cache[(key, cache_dir)] - except (AttributeError, KeyError,): - continue - del massive_data[:] - del massive_data + del self.__stashing_cache[(key, cache_dir)] + except (AttributeError, KeyError,): + continue + del massive_data[:] + del massive_data @staticmethod def current_directory(): @@ -302,14 +261,6 @@ class EntropyCacher(Singleton): @return: None """ - # If EntropyCacher is started, its thread could hang a the process - # termination phase. So, register an exit handler against Python - if not self.__exit_registered: - def _stop_cacher(): - EntropyCacher().stop() - atexit.register(_stop_cacher) - self.__exit_registered = True - self.__cache_buffer.clear() self.__cache_writer = EntropyCacher.SemaphoreTimeScheduled( self.__worker_sem, EntropyCacher.WRITEBACK_TIMEOUT,