From 1f2575204ea69edcbda6cdcc3d6a8f6b2fa79d90 Mon Sep 17 00:00:00 2001 From: Fabio Erculiani Date: Sun, 15 Dec 2013 11:45:23 +0100 Subject: [PATCH] [entropy.locks] make ResourceLock reentrant wrt to individual threads only --- lib/entropy/client/interfaces/repository.py | 5 - lib/entropy/db/sqlite.py | 5 - lib/entropy/locks.py | 117 +++++++++----------- lib/entropy/security.py | 5 - lib/tests/db.py | 11 +- lib/tests/locks.py | 76 ++++++++++++- 6 files changed, 135 insertions(+), 84 deletions(-) diff --git a/lib/entropy/client/interfaces/repository.py b/lib/entropy/client/interfaces/repository.py index 562dec52f..84e0fc5ac 100644 --- a/lib/entropy/client/interfaces/repository.py +++ b/lib/entropy/client/interfaces/repository.py @@ -37,9 +37,6 @@ class RepositoriesUpdateResourcesLock(ResourceLock): access to the repositories update process. """ - _FILE_LOCK_MUTEX = threading.Lock() - _FILE_LOCK_MAP = {} - def __init__(self, output=None): """ Object constructor. @@ -48,8 +45,6 @@ class RepositoriesUpdateResourcesLock(ResourceLock): @type output: entropy.output.TextInterface or None """ super(RepositoriesUpdateResourcesLock, self).__init__( - RepositoriesUpdateResourcesLock._FILE_LOCK_MAP, - RepositoriesUpdateResourcesLock._FILE_LOCK_MUTEX, output=output) def path(self): diff --git a/lib/entropy/db/sqlite.py b/lib/entropy/db/sqlite.py index 5234b615b..53ad2dca9 100644 --- a/lib/entropy/db/sqlite.py +++ b/lib/entropy/db/sqlite.py @@ -512,9 +512,6 @@ class EntropySQLiteRepository(EntropySQLRepository): self._discardLiveCache() return self._live_cacher.get(self._getLiveCacheKey() + key) - _FLOCK_LOCK_MAP = {} - _FLOCK_LOCK_MUTEX = threading.Lock() - def _get_reslock(self, mode): """ Get the lock object used for locking. @@ -524,8 +521,6 @@ class EntropySQLiteRepository(EntropySQLRepository): def __init__(self, repo, mode, path): super(RepositoryResourceLock, self).__init__( - EntropySQLiteRepository._FLOCK_LOCK_MAP, - EntropySQLiteRepository._FLOCK_LOCK_MUTEX, output = repo) self._path = path self._mode = mode diff --git a/lib/entropy/locks.py b/lib/entropy/locks.py index e78b8a60a..10adfc1f1 100644 --- a/lib/entropy/locks.py +++ b/lib/entropy/locks.py @@ -96,16 +96,13 @@ class ResourceLock(object): _TLS = threading.local() - def __init__(self, lock_map, lock_mutex, output=None): + def __init__(self, output=None): """ Object constructor. @keyword output: a TextInterface interface @type output: entropy.output.TextInterface or None """ - self._lock_map = lock_map - self._lock_mutex = lock_mutex - if output is not None: self._out = output else: @@ -117,19 +114,26 @@ class ResourceLock(object): """ raise NotImplementedError() - def _file_lock_setup(self, file_path): + def _file_lock_setup(self): """ - Setup _FILE_LOCK_MAP for file_path, allocating locking information. + Setup the locking status dict for self.path(). """ - mapped = self._lock_map.get(file_path) + lock_map = getattr(self._TLS, "lock_map", None) + if lock_map is None: + lock_map = {} + self._TLS.lock_map = lock_map + + file_path = self.path() + mapped = lock_map.get(file_path) if mapped is None: mapped = { 'count': 0, 'ref': None, 'path': file_path, 'shared': None, + 'recursed': False, } - self._lock_map[file_path] = mapped + lock_map[file_path] = mapped return mapped def _lock_resource(self, blocking, shared): @@ -137,46 +141,41 @@ class ResourceLock(object): Internal function that does the locking given a lock file path. """ - lock_path = self.path() + mapped = self._file_lock_setup() - with self._lock_mutex: - mapped = self._file_lock_setup(lock_path) + # I asked for an exclusive lock, but + # I am only holding a shared one, don't + # return True. + want_exclusive_when_shared = not shared and mapped['shared'] - # I asked for an exclusive lock, but - # I am only holding a shared one, don't - # return True. - want_exclusive_when_shared = not shared and mapped['shared'] + if mapped['ref'] is not None: + if not want_exclusive_when_shared: + # reentrant lock, already acquired + mapped['count'] += 1 + return True - if mapped['ref'] is not None: - if not want_exclusive_when_shared: - # reentrant lock, already acquired - mapped['count'] += 1 - return True + else: + mapped['shared'] = shared - else: - mapped['shared'] = shared + # watch for deadlocks using TLS + if mapped['recursed'] and want_exclusive_when_shared: + # deadlock, raise exception + raise RuntimeError( + "want exclusive lock when shared acquired") - # watch for deadlocks using TLS - recursed = getattr(self._TLS, "recursed", False) - if recursed and want_exclusive_when_shared: - # deadlock, raise exception - raise RuntimeError( - "want exclusive lock when shared acquired") + # not the same thread requested an exclusive lock when shared + mapped['recursed'] = True + # fall through, we won't deadlock - # not the same thread requested an exclusive lock when shared - self._TLS.recursed = True - # fall through, we won't deadlock - - path = mapped['path'] + path = mapped['path'] acquired, flock_f = self._file_lock_create( path, blocking=blocking, shared=shared) if acquired: - with self._lock_mutex: - mapped['count'] += 1 - if flock_f is not None: - mapped['ref'] = flock_f + mapped['count'] += 1 + if flock_f is not None: + mapped['ref'] = flock_f return acquired @@ -185,31 +184,28 @@ class ResourceLock(object): Internal function that does the unlocking of a given lock file. """ - lock_path = self.path() - with self._lock_mutex: + mapped = self._file_lock_setup() - mapped = self._file_lock_setup(lock_path) + if mapped['count'] == 0: + raise RuntimeError("releasing a non-acquired lock") - if mapped['count'] == 0: - raise RuntimeError("releasing a non-acquired lock") + # decrement lock counter + if mapped['count'] > 0: + mapped['count'] -= 1 + # if lock counter > 0, still locked + # waiting for other upper-level calls + if mapped['count'] > 0: + return - # decrement lock counter - if mapped['count'] > 0: - mapped['count'] -= 1 - # if lock counter > 0, still locked - # waiting for other upper-level calls - if mapped['count'] > 0: - return + ref_obj = mapped['ref'] + if ref_obj is not None: + # do not remove! + ref_obj.release() + ref_obj.close() + mapped['ref'] = None - ref_obj = mapped['ref'] - if ref_obj is not None: - # do not remove! - ref_obj.release() - ref_obj.close() - mapped['ref'] = None - - # allow the same thread to acquire the lock again. - self._TLS.recursed = False + # allow the same thread to acquire the lock again. + mapped['recursed'] = False def _file_lock_create(self, lock_path, blocking=False, shared=False): """ @@ -376,9 +372,6 @@ class EntropyResourcesLock(ResourceLock): only a handful of code paths require exclusive locking. """ - _FILE_LOCK_MUTEX = threading.Lock() - _FILE_LOCK_MAP = {} - # List of callables that will be triggered upon lock acquisition. # It can be used to execute cache cleanups. _POST_ACQUIRE_HOOK_LOCK = threading.Lock() @@ -421,8 +414,6 @@ class EntropyResourcesLock(ResourceLock): @type output: entropy.output.TextInterface or None """ super(EntropyResourcesLock, self).__init__( - EntropyResourcesLock._FILE_LOCK_MAP, - EntropyResourcesLock._FILE_LOCK_MUTEX, output=output) def path(self): diff --git a/lib/entropy/security.py b/lib/entropy/security.py index d227160ee..197fdbbc1 100644 --- a/lib/entropy/security.py +++ b/lib/entropy/security.py @@ -43,9 +43,6 @@ class SystemResourcesLock(ResourceLock): or shared access to the System data. """ - _FILE_LOCK_MUTEX = threading.Lock() - _FILE_LOCK_MAP = {} - def __init__(self, output=None): """ Object constructor. @@ -54,8 +51,6 @@ class SystemResourcesLock(ResourceLock): @type output: entropy.output.TextInterface or None """ super(SystemResourcesLock, self).__init__( - SystemResourcesLock._FILE_LOCK_MAP, - SystemResourcesLock._FILE_LOCK_MUTEX, output=output) def path(self): diff --git a/lib/tests/db.py b/lib/tests/db.py index 054c8697e..2a5f61785 100644 --- a/lib/tests/db.py +++ b/lib/tests/db.py @@ -1035,24 +1035,25 @@ class EntropyRepositoryTest(unittest.TestCase): opaque_exclusive2 = test_db.try_acquire_exclusive() self.assert_(opaque_exclusive2 is not None) - self.assert_(opaque_exclusive._lock_map is opaque_exclusive2._lock_map) + self.assert_(opaque_exclusive._file_lock_setup() is + opaque_exclusive2._file_lock_setup()) # test reference counting self.assertEquals( 2, - opaque_exclusive._lock_map[test_db.lock_path()]['count']) + opaque_exclusive._file_lock_setup()['count']) test_db.release_exclusive(opaque_exclusive) self.assertEquals( 1, - opaque_exclusive._lock_map[test_db.lock_path()]['count']) + opaque_exclusive._file_lock_setup()['count']) test_db.release_exclusive(opaque_exclusive2) self.assertEquals( 0, - opaque_exclusive._lock_map[test_db.lock_path()]['count']) + opaque_exclusive._file_lock_setup()['count']) # test that refcount doesn't go below zero self.assertRaises( @@ -1060,7 +1061,7 @@ class EntropyRepositoryTest(unittest.TestCase): self.assertEquals( 0, - opaque_exclusive._lock_map[test_db.lock_path()]['count']) + opaque_exclusive._file_lock_setup()['count']) opaque_exclusive = test_db.try_acquire_exclusive() self.assert_(opaque_exclusive is not None) diff --git a/lib/tests/locks.py b/lib/tests/locks.py index 75d6aa954..f331d3b2f 100644 --- a/lib/tests/locks.py +++ b/lib/tests/locks.py @@ -3,6 +3,8 @@ import sys sys.path.insert(0, '.') sys.path.insert(0, '../') +import threading +import time import os import unittest @@ -134,7 +136,7 @@ class EntropyLocksTest(unittest.TestCase): erl.path = lambda: tmp_path - get_count = lambda: erl._file_lock_setup(erl.path())['count'] + get_count = lambda: erl._file_lock_setup()['count'] self.assertEquals(True, erl.try_acquire_shared()) self.assertRaises(RuntimeError, erl.try_acquire_exclusive) @@ -173,6 +175,78 @@ class EntropyLocksTest(unittest.TestCase): except OSError: pass + def test_entropy_resources_lock_threads(self): + """ + ResourceLock multithreaded test. + + This test ensures that the resource is also contended + between threads, not just processes. + """ + erl = EntropyResourcesLock() + tmp_fd, tmp_path = None, None + try: + tmp_fd, tmp_path = const_mkstemp( + prefix="test_entropy_resources_lock") + + erl.path = lambda: tmp_path + + get_count = lambda: erl._file_lock_setup()['count'] + get_ref = lambda: erl._file_lock_setup()['ref'] + other_thread_count = [0] + other_thread_loop_count = [0] + cond = threading.Condition() + + self.assertEquals(True, erl.try_acquire_exclusive()) + self.assertEquals(1, get_count()) + self.assertNotEquals(None, get_ref()) + self.assertEquals(0, other_thread_count[0]) + + def try_acquire_thread(): + milliseconds = 10 * 1000 + acquired = False + loop_n = 0 + while milliseconds: + self.assertEquals(None, get_ref()) + + acquired = erl.try_acquire_exclusive() + if loop_n == 0: + self.assertFalse(acquired) + + if acquired: + self.assertNotEquals(None, get_ref()) + other_thread_count[0] += 1 + break + + time.sleep(0.100) + milliseconds -= 100 + loop_n += 1 + with cond: + other_thread_loop_count[0] += 1 + cond.notify() + + self.assertTrue(acquired) + + th = threading.Thread(target=try_acquire_thread, + name="TryAcquireThread") + th.start() + + with cond: + while other_thread_loop_count[0] < 1: + cond.wait() + + erl.release() + th.join() + self.assertEquals(1, other_thread_count[0]) + + finally: + if tmp_fd is not None: + os.close(tmp_fd) + if tmp_path is not None: + try: + os.remove(tmp_path) + except OSError: + pass + if __name__ == '__main__': unittest.main()