[entropy.locks] make ResourceLock reentrant wrt to individual threads only

This commit is contained in:
Fabio Erculiani
2013-12-15 11:45:23 +01:00
parent 3a448a3b19
commit 1f2575204e
6 changed files with 135 additions and 84 deletions

View File

@@ -37,9 +37,6 @@ class RepositoriesUpdateResourcesLock(ResourceLock):
access to the repositories update process.
"""
_FILE_LOCK_MUTEX = threading.Lock()
_FILE_LOCK_MAP = {}
def __init__(self, output=None):
"""
Object constructor.
@@ -48,8 +45,6 @@ class RepositoriesUpdateResourcesLock(ResourceLock):
@type output: entropy.output.TextInterface or None
"""
super(RepositoriesUpdateResourcesLock, self).__init__(
RepositoriesUpdateResourcesLock._FILE_LOCK_MAP,
RepositoriesUpdateResourcesLock._FILE_LOCK_MUTEX,
output=output)
def path(self):

View File

@@ -512,9 +512,6 @@ class EntropySQLiteRepository(EntropySQLRepository):
self._discardLiveCache()
return self._live_cacher.get(self._getLiveCacheKey() + key)
_FLOCK_LOCK_MAP = {}
_FLOCK_LOCK_MUTEX = threading.Lock()
def _get_reslock(self, mode):
"""
Get the lock object used for locking.
@@ -524,8 +521,6 @@ class EntropySQLiteRepository(EntropySQLRepository):
def __init__(self, repo, mode, path):
super(RepositoryResourceLock, self).__init__(
EntropySQLiteRepository._FLOCK_LOCK_MAP,
EntropySQLiteRepository._FLOCK_LOCK_MUTEX,
output = repo)
self._path = path
self._mode = mode

View File

@@ -96,16 +96,13 @@ class ResourceLock(object):
_TLS = threading.local()
def __init__(self, lock_map, lock_mutex, output=None):
def __init__(self, output=None):
"""
Object constructor.
@keyword output: a TextInterface interface
@type output: entropy.output.TextInterface or None
"""
self._lock_map = lock_map
self._lock_mutex = lock_mutex
if output is not None:
self._out = output
else:
@@ -117,19 +114,26 @@ class ResourceLock(object):
"""
raise NotImplementedError()
def _file_lock_setup(self, file_path):
def _file_lock_setup(self):
"""
Setup _FILE_LOCK_MAP for file_path, allocating locking information.
Setup the locking status dict for self.path().
"""
mapped = self._lock_map.get(file_path)
lock_map = getattr(self._TLS, "lock_map", None)
if lock_map is None:
lock_map = {}
self._TLS.lock_map = lock_map
file_path = self.path()
mapped = lock_map.get(file_path)
if mapped is None:
mapped = {
'count': 0,
'ref': None,
'path': file_path,
'shared': None,
'recursed': False,
}
self._lock_map[file_path] = mapped
lock_map[file_path] = mapped
return mapped
def _lock_resource(self, blocking, shared):
@@ -137,46 +141,41 @@ class ResourceLock(object):
Internal function that does the locking given a lock
file path.
"""
lock_path = self.path()
mapped = self._file_lock_setup()
with self._lock_mutex:
mapped = self._file_lock_setup(lock_path)
# I asked for an exclusive lock, but
# I am only holding a shared one, don't
# return True.
want_exclusive_when_shared = not shared and mapped['shared']
# I asked for an exclusive lock, but
# I am only holding a shared one, don't
# return True.
want_exclusive_when_shared = not shared and mapped['shared']
if mapped['ref'] is not None:
if not want_exclusive_when_shared:
# reentrant lock, already acquired
mapped['count'] += 1
return True
if mapped['ref'] is not None:
if not want_exclusive_when_shared:
# reentrant lock, already acquired
mapped['count'] += 1
return True
else:
mapped['shared'] = shared
else:
mapped['shared'] = shared
# watch for deadlocks using TLS
if mapped['recursed'] and want_exclusive_when_shared:
# deadlock, raise exception
raise RuntimeError(
"want exclusive lock when shared acquired")
# watch for deadlocks using TLS
recursed = getattr(self._TLS, "recursed", False)
if recursed and want_exclusive_when_shared:
# deadlock, raise exception
raise RuntimeError(
"want exclusive lock when shared acquired")
# not the same thread requested an exclusive lock when shared
mapped['recursed'] = True
# fall through, we won't deadlock
# not the same thread requested an exclusive lock when shared
self._TLS.recursed = True
# fall through, we won't deadlock
path = mapped['path']
path = mapped['path']
acquired, flock_f = self._file_lock_create(
path, blocking=blocking, shared=shared)
if acquired:
with self._lock_mutex:
mapped['count'] += 1
if flock_f is not None:
mapped['ref'] = flock_f
mapped['count'] += 1
if flock_f is not None:
mapped['ref'] = flock_f
return acquired
@@ -185,31 +184,28 @@ class ResourceLock(object):
Internal function that does the unlocking of a given
lock file.
"""
lock_path = self.path()
with self._lock_mutex:
mapped = self._file_lock_setup()
mapped = self._file_lock_setup(lock_path)
if mapped['count'] == 0:
raise RuntimeError("releasing a non-acquired lock")
if mapped['count'] == 0:
raise RuntimeError("releasing a non-acquired lock")
# decrement lock counter
if mapped['count'] > 0:
mapped['count'] -= 1
# if lock counter > 0, still locked
# waiting for other upper-level calls
if mapped['count'] > 0:
return
# decrement lock counter
if mapped['count'] > 0:
mapped['count'] -= 1
# if lock counter > 0, still locked
# waiting for other upper-level calls
if mapped['count'] > 0:
return
ref_obj = mapped['ref']
if ref_obj is not None:
# do not remove!
ref_obj.release()
ref_obj.close()
mapped['ref'] = None
ref_obj = mapped['ref']
if ref_obj is not None:
# do not remove!
ref_obj.release()
ref_obj.close()
mapped['ref'] = None
# allow the same thread to acquire the lock again.
self._TLS.recursed = False
# allow the same thread to acquire the lock again.
mapped['recursed'] = False
def _file_lock_create(self, lock_path, blocking=False, shared=False):
"""
@@ -376,9 +372,6 @@ class EntropyResourcesLock(ResourceLock):
only a handful of code paths require exclusive locking.
"""
_FILE_LOCK_MUTEX = threading.Lock()
_FILE_LOCK_MAP = {}
# List of callables that will be triggered upon lock acquisition.
# It can be used to execute cache cleanups.
_POST_ACQUIRE_HOOK_LOCK = threading.Lock()
@@ -421,8 +414,6 @@ class EntropyResourcesLock(ResourceLock):
@type output: entropy.output.TextInterface or None
"""
super(EntropyResourcesLock, self).__init__(
EntropyResourcesLock._FILE_LOCK_MAP,
EntropyResourcesLock._FILE_LOCK_MUTEX,
output=output)
def path(self):

View File

@@ -43,9 +43,6 @@ class SystemResourcesLock(ResourceLock):
or shared access to the System data.
"""
_FILE_LOCK_MUTEX = threading.Lock()
_FILE_LOCK_MAP = {}
def __init__(self, output=None):
"""
Object constructor.
@@ -54,8 +51,6 @@ class SystemResourcesLock(ResourceLock):
@type output: entropy.output.TextInterface or None
"""
super(SystemResourcesLock, self).__init__(
SystemResourcesLock._FILE_LOCK_MAP,
SystemResourcesLock._FILE_LOCK_MUTEX,
output=output)
def path(self):

View File

@@ -1035,24 +1035,25 @@ class EntropyRepositoryTest(unittest.TestCase):
opaque_exclusive2 = test_db.try_acquire_exclusive()
self.assert_(opaque_exclusive2 is not None)
self.assert_(opaque_exclusive._lock_map is opaque_exclusive2._lock_map)
self.assert_(opaque_exclusive._file_lock_setup() is
opaque_exclusive2._file_lock_setup())
# test reference counting
self.assertEquals(
2,
opaque_exclusive._lock_map[test_db.lock_path()]['count'])
opaque_exclusive._file_lock_setup()['count'])
test_db.release_exclusive(opaque_exclusive)
self.assertEquals(
1,
opaque_exclusive._lock_map[test_db.lock_path()]['count'])
opaque_exclusive._file_lock_setup()['count'])
test_db.release_exclusive(opaque_exclusive2)
self.assertEquals(
0,
opaque_exclusive._lock_map[test_db.lock_path()]['count'])
opaque_exclusive._file_lock_setup()['count'])
# test that refcount doesn't go below zero
self.assertRaises(
@@ -1060,7 +1061,7 @@ class EntropyRepositoryTest(unittest.TestCase):
self.assertEquals(
0,
opaque_exclusive._lock_map[test_db.lock_path()]['count'])
opaque_exclusive._file_lock_setup()['count'])
opaque_exclusive = test_db.try_acquire_exclusive()
self.assert_(opaque_exclusive is not None)

View File

@@ -3,6 +3,8 @@ import sys
sys.path.insert(0, '.')
sys.path.insert(0, '../')
import threading
import time
import os
import unittest
@@ -134,7 +136,7 @@ class EntropyLocksTest(unittest.TestCase):
erl.path = lambda: tmp_path
get_count = lambda: erl._file_lock_setup(erl.path())['count']
get_count = lambda: erl._file_lock_setup()['count']
self.assertEquals(True, erl.try_acquire_shared())
self.assertRaises(RuntimeError, erl.try_acquire_exclusive)
@@ -173,6 +175,78 @@ class EntropyLocksTest(unittest.TestCase):
except OSError:
pass
def test_entropy_resources_lock_threads(self):
"""
ResourceLock multithreaded test.
This test ensures that the resource is also contended
between threads, not just processes.
"""
erl = EntropyResourcesLock()
tmp_fd, tmp_path = None, None
try:
tmp_fd, tmp_path = const_mkstemp(
prefix="test_entropy_resources_lock")
erl.path = lambda: tmp_path
get_count = lambda: erl._file_lock_setup()['count']
get_ref = lambda: erl._file_lock_setup()['ref']
other_thread_count = [0]
other_thread_loop_count = [0]
cond = threading.Condition()
self.assertEquals(True, erl.try_acquire_exclusive())
self.assertEquals(1, get_count())
self.assertNotEquals(None, get_ref())
self.assertEquals(0, other_thread_count[0])
def try_acquire_thread():
milliseconds = 10 * 1000
acquired = False
loop_n = 0
while milliseconds:
self.assertEquals(None, get_ref())
acquired = erl.try_acquire_exclusive()
if loop_n == 0:
self.assertFalse(acquired)
if acquired:
self.assertNotEquals(None, get_ref())
other_thread_count[0] += 1
break
time.sleep(0.100)
milliseconds -= 100
loop_n += 1
with cond:
other_thread_loop_count[0] += 1
cond.notify()
self.assertTrue(acquired)
th = threading.Thread(target=try_acquire_thread,
name="TryAcquireThread")
th.start()
with cond:
while other_thread_loop_count[0] < 1:
cond.wait()
erl.release()
th.join()
self.assertEquals(1, other_thread_count[0])
finally:
if tmp_fd is not None:
os.close(tmp_fd)
if tmp_path is not None:
try:
os.remove(tmp_path)
except OSError:
pass
if __name__ == '__main__':
unittest.main()