[entropy.cache] replace fork() usage with multithreading.
Mixing multiprocessing with multithreading is bad and we all know that. However, in this specific case there was nothing wrong in running tiny functions in another process. It seems that entropy.dump.dumpobj() is efficient enough nowadays to have it running in the same process anyway.
This commit is contained in:
@@ -19,12 +19,11 @@ import os
|
||||
import errno
|
||||
import sys
|
||||
import tempfile
|
||||
import atexit
|
||||
|
||||
from entropy.const import etpConst, etpUi, const_debug_write, \
|
||||
const_pid_exists, const_setup_perms
|
||||
from entropy.core import Singleton
|
||||
from entropy.misc import TimeScheduled, Lifo
|
||||
from entropy.misc import TimeScheduled, ParallelTask, Lifo
|
||||
import time
|
||||
import threading
|
||||
import copy
|
||||
@@ -50,8 +49,6 @@ class EntropyCacher(Singleton):
|
||||
'mask_filter': 'match/mask_filter',
|
||||
}
|
||||
|
||||
# Max amount of processes to spawn
|
||||
_PROC_LIMIT = 10
|
||||
# Max number of cache objects written at once
|
||||
_OBJS_WRITTEN_AT_ONCE = 250
|
||||
|
||||
@@ -116,15 +113,12 @@ class EntropyCacher(Singleton):
|
||||
This is the place where all the properties initialization
|
||||
takes place.
|
||||
"""
|
||||
self.__exit_registered = False
|
||||
self.__copy = copy
|
||||
self.__alive = False
|
||||
self.__cache_writer = None
|
||||
self.__cache_buffer = Lifo()
|
||||
self.__stashing_cache = {}
|
||||
self.__inside_with_stmt = 0
|
||||
self.__proc_pids = set()
|
||||
self.__proc_pids_lock = threading.Lock()
|
||||
self.__dump_data_lock = threading.Lock()
|
||||
self.__worker_sem = threading.Semaphore(0)
|
||||
# this lock ensures that all the writes are hold while it's acquired
|
||||
@@ -156,34 +150,6 @@ class EntropyCacher(Singleton):
|
||||
"""
|
||||
return self.__copy.deepcopy(obj)
|
||||
|
||||
def __clean_pids(self):
|
||||
with self.__proc_pids_lock:
|
||||
dead_pids = set()
|
||||
for pid in self.__proc_pids:
|
||||
|
||||
try:
|
||||
dead = os.waitpid(pid, os.WNOHANG)[0]
|
||||
except OSError as err:
|
||||
if err.errno != errno.ECHILD:
|
||||
raise
|
||||
dead = True
|
||||
if dead:
|
||||
dead_pids.add(pid)
|
||||
elif not const_pid_exists(pid):
|
||||
dead_pids.add(pid)
|
||||
|
||||
if dead_pids:
|
||||
self.__proc_pids.difference_update(dead_pids)
|
||||
|
||||
def __wait_cacher_semaphore(self):
|
||||
self.__clean_pids()
|
||||
while len(self.__proc_pids) > EntropyCacher._PROC_LIMIT:
|
||||
if etpUi['debug']:
|
||||
const_debug_write(__name__,
|
||||
"EntropyCacher.__wait_cacher_semaphore: too many pids")
|
||||
time.sleep(0.1)
|
||||
self.__clean_pids()
|
||||
|
||||
def __cacher(self, run_until_empty = False, sync = False, _loop=False):
|
||||
"""
|
||||
This is where the actual asynchronous copy takes
|
||||
@@ -218,6 +184,12 @@ class EntropyCacher(Singleton):
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def _commit_data(_massive_data):
|
||||
for (key, cache_dir), data in _massive_data:
|
||||
d_o = entropy.dump.dumpobj
|
||||
if d_o is not None:
|
||||
d_o(key, data, dump_dir = cache_dir)
|
||||
|
||||
while self.__alive or run_until_empty:
|
||||
|
||||
if etpUi['debug']:
|
||||
@@ -248,43 +220,30 @@ class EntropyCacher(Singleton):
|
||||
break # stack empty
|
||||
massive_data.append(data)
|
||||
|
||||
# this must stay before massive_data to make sure to clean
|
||||
# every defunct process
|
||||
self.__wait_cacher_semaphore()
|
||||
|
||||
if not massive_data:
|
||||
break
|
||||
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
# make sure there's nothing weird bound to exception hook
|
||||
sys.excepthook = sys.__excepthook__
|
||||
task = ParallelTask(_commit_data, massive_data)
|
||||
task.name = "EntropyCacherCommitter"
|
||||
task.daemon = not sync
|
||||
task.start()
|
||||
if sync:
|
||||
task.join()
|
||||
|
||||
if etpUi['debug']:
|
||||
const_debug_write(
|
||||
__name__,
|
||||
"EntropyCacher.__cacher [%s], writing %s objs" % (
|
||||
task, len(massive_data),))
|
||||
|
||||
if EntropyCacher.STASHING_CACHE:
|
||||
for (key, cache_dir), data in massive_data:
|
||||
d_o = entropy.dump.dumpobj
|
||||
if d_o is not None:
|
||||
d_o(key, data, dump_dir = cache_dir)
|
||||
os._exit(0)
|
||||
else:
|
||||
if etpUi['debug']:
|
||||
const_debug_write(__name__,
|
||||
"EntropyCacher.__cacher [%s], writing %s objs" % (
|
||||
pid, len(massive_data),))
|
||||
with self.__proc_pids_lock:
|
||||
self.__proc_pids.add(pid)
|
||||
if sync:
|
||||
try:
|
||||
os.waitpid(pid, 0)
|
||||
except OSError as err:
|
||||
if err.errno != errno.ECHILD:
|
||||
raise
|
||||
if EntropyCacher.STASHING_CACHE:
|
||||
for (key, cache_dir), data in massive_data:
|
||||
try:
|
||||
del self.__stashing_cache[(key, cache_dir)]
|
||||
except (AttributeError, KeyError,):
|
||||
continue
|
||||
del massive_data[:]
|
||||
del massive_data
|
||||
del self.__stashing_cache[(key, cache_dir)]
|
||||
except (AttributeError, KeyError,):
|
||||
continue
|
||||
del massive_data[:]
|
||||
del massive_data
|
||||
|
||||
@staticmethod
|
||||
def current_directory():
|
||||
@@ -302,14 +261,6 @@ class EntropyCacher(Singleton):
|
||||
|
||||
@return: None
|
||||
"""
|
||||
# If EntropyCacher is started, its thread could hang a the process
|
||||
# termination phase. So, register an exit handler against Python
|
||||
if not self.__exit_registered:
|
||||
def _stop_cacher():
|
||||
EntropyCacher().stop()
|
||||
atexit.register(_stop_cacher)
|
||||
self.__exit_registered = True
|
||||
|
||||
self.__cache_buffer.clear()
|
||||
self.__cache_writer = EntropyCacher.SemaphoreTimeScheduled(
|
||||
self.__worker_sem, EntropyCacher.WRITEBACK_TIMEOUT,
|
||||
|
||||
Reference in New Issue
Block a user