[entropy.db] implement locking infrastructure (with the same semantics) for in-memory repositories

This commit is contained in:
Fabio Erculiani
2013-12-03 14:49:54 +01:00
parent d7f22534b2
commit c54c893e55
2 changed files with 103 additions and 48 deletions
+64 -17
View File
@@ -27,7 +27,7 @@ from entropy.const import etpConst, const_convert_to_unicode, \
const_setup_directory
from entropy.exceptions import SystemDatabaseError
from entropy.output import bold, red, blue, purple
from entropy.misc import FlockFile
from entropy.misc import FlockFile, ReadersWritersSemaphore
from entropy.db.exceptions import Warning, Error, InterfaceError, \
DatabaseError, DataError, OperationalError, IntegrityError, \
@@ -199,6 +199,9 @@ class EntropySQLiteRepository(EntropySQLRepository):
on close()
@type temporary: bool
"""
self._rwsem_lock = threading.RLock()
self._rwsem = None
self._sqlite = self.ModuleProxy.get()
EntropySQLRepository.__init__(
@@ -496,12 +499,35 @@ class EntropySQLiteRepository(EntropySQLRepository):
return RepositoryFlockFile(lock_path, mode)
def _get_rwsem(self, mode):
"""
Get the lock object used for locking in-memory repositories.
"""
with self._rwsem_lock:
if self._rwsem is None:
self._rwsem = ReadersWritersSemaphore()
class RepositoryRwSemWrapper(object):
def __init__(self, mode, rwsem):
self._sem = rwsem
self._mode = mode
def get(self):
return self._sem
return RepositoryRwSemWrapper(mode, self._rwsem)
def acquire_shared(self):
"""
Reimplemented from EntropyBaseRepository.
"""
if self._is_memory():
return True
rwsem = self._get_rwsem(False)
rwsem.get().reader_acquire()
return rwsem
else:
flock = None
acquired = False
@@ -524,7 +550,13 @@ class EntropySQLiteRepository(EntropySQLRepository):
Reimplemented from EntropyBaseRepository.
"""
if self._is_memory():
return True
rwsem = self._get_rwsem(False)
acquired = rwsem.get().try_reader_acquire()
if acquired:
return rwsem
return None
else:
acquired = False
flock = None
@@ -549,7 +581,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
Reimplemented from EntropyBaseRepository.
"""
if self._is_memory():
return True
rwsem = self._get_rwsem(True)
rwsem.get().writer_acquire()
return rwsem
else:
flock = None
acquired = False
@@ -571,7 +606,12 @@ class EntropySQLiteRepository(EntropySQLRepository):
Reimplemented from EntropyBaseRepository.
"""
if self._is_memory():
return True
rwsem = self._get_rwsem(True)
acquired = rwsem.get().try_writer_acquire()
if acquired:
return rwsem
return None
else:
acquired = False
flock = None
@@ -595,33 +635,40 @@ class EntropySQLiteRepository(EntropySQLRepository):
"""
Release the resource associated with the FlockFile object.
"""
if self._is_memory():
return
else:
if flock._mode != mode:
raise RuntimeError(
"Programming error: acquired lock in a different mode")
flock.release()
flock.close()
if flock._mode != mode:
raise RuntimeError(
"Programming error: acquired lock in a different mode")
flock.release()
flock.close()
def release_shared(self, opaque):
"""
Reimplemented from EntropyBaseRepository.
"""
self.commit()
if self._is_memory():
return
if opaque._mode != False:
raise RuntimeError(
"Programming error: acquired lock in a different mode")
opaque.get().reader_release()
else:
self.commit()
self._release_flock(opaque, False)
def release_exclusive(self, opaque):
"""
Reimplemented from EntropyBaseRepository.
"""
self.commit()
if self._is_memory():
return
if opaque._mode != True:
raise RuntimeError(
"Programming error: acquired lock in a different mode")
opaque.get().writer_release()
else:
self.commit()
self._release_flock(opaque, True)
def close(self, safe=False):
+39 -31
View File
@@ -1016,6 +1016,40 @@ class EntropyRepositoryTest(unittest.TestCase):
data = self.test_db.listAllPreservedLibraries()
self.assertEqual(data, tuple())
def _test_repository_locking(self, test_db):
with test_db.shared():
self.assertEqual(test_db.try_acquire_exclusive(),
None)
with test_db.exclusive():
self.assertEqual(test_db.try_acquire_shared(),
None)
opaque_shared = test_db.try_acquire_shared()
self.assert_(opaque_shared is not None)
opaque_exclusive = test_db.try_acquire_exclusive()
self.assert_(opaque_exclusive is None)
test_db.release_shared(opaque_shared)
opaque_exclusive = test_db.try_acquire_exclusive()
self.assert_(opaque_exclusive is not None)
opaque_exclusive2 = test_db.try_acquire_exclusive()
self.assert_(opaque_exclusive2 is None)
test_db.release_exclusive(opaque_exclusive)
opaque_exclusive = test_db.try_acquire_exclusive()
self.assert_(opaque_exclusive is not None)
self.assertRaises(RuntimeError, test_db.release_shared,
opaque_exclusive)
test_db.release_exclusive(opaque_exclusive)
def test_locking_file(self):
fd, db_file = tempfile.mkstemp()
@@ -1026,43 +1060,17 @@ class EntropyRepositoryTest(unittest.TestCase):
test_db = self.Client.open_generic_repository(db_file)
test_db.initializeRepository()
with test_db.shared():
self.assertEqual(test_db.try_acquire_exclusive(),
None)
with test_db.exclusive():
self.assertEqual(test_db.try_acquire_shared(),
None)
opaque_shared = test_db.try_acquire_shared()
self.assert_(opaque_shared is not None)
opaque_exclusive = test_db.try_acquire_exclusive()
self.assert_(opaque_exclusive is None)
test_db.release_shared(opaque_shared)
opaque_exclusive = test_db.try_acquire_exclusive()
self.assert_(opaque_exclusive is not None)
opaque_exclusive2 = test_db.try_acquire_exclusive()
self.assert_(opaque_exclusive2 is None)
test_db.release_exclusive(opaque_exclusive)
opaque_exclusive = test_db.try_acquire_exclusive()
self.assert_(opaque_exclusive is not None)
self.assertRaises(RuntimeError, test_db.release_shared,
opaque_exclusive)
test_db.release_exclusive(opaque_exclusive)
return self._test_repository_locking(test_db)
finally:
if test_db is not None:
test_db.close()
os.remove(db_file)
def test_locking_memory(self):
self.assert_(self.test_db._is_memory())
return self._test_repository_locking(self.test_db)
if __name__ == '__main__':
unittest.main()