[entropy.db] add direct access support to repository
In latency sensitive code paths, the performance penality caused by file lock contention and memory cache invalidation is too high. This problem happens in Rigo, which is extremely latency sensitive. Since we don't want to crap on the user, a way to solve this is letting API consumers skip the memory cache and read data directly from the database store. The trade off is that data may be stale, incomplete, or invalid, but as long as the consumer is aware of this, that's fine.
This commit is contained in:
@@ -16,6 +16,7 @@ import hashlib
|
||||
import codecs
|
||||
import collections
|
||||
import contextlib
|
||||
import threading
|
||||
|
||||
from entropy.i18n import _
|
||||
from entropy.exceptions import InvalidAtom
|
||||
@@ -405,6 +406,8 @@ class EntropyRepositoryBase(TextInterface, EntropyRepositoryPluginStore):
|
||||
@param name: repository identifier (or name)
|
||||
@type name: string
|
||||
"""
|
||||
self._tls = threading.local()
|
||||
|
||||
TextInterface.__init__(self)
|
||||
self._readonly = readonly
|
||||
self._caching = xcache
|
||||
@@ -426,6 +429,42 @@ class EntropyRepositoryBase(TextInterface, EntropyRepositoryPluginStore):
|
||||
etpConst['entropyrundir'],
|
||||
"repository", self.name + ".lock")
|
||||
|
||||
@contextlib.contextmanager
|
||||
def direct(self):
|
||||
"""
|
||||
Avoid acquiring any kind of lock, disable caches and access directly
|
||||
to the underlying repository data.
|
||||
|
||||
In latency sensitive code paths, acquiring locks (especially file locks)
|
||||
and blocking may be impractical. This context manager makes possible to
|
||||
avoid that, at the price of returning stale or null data.
|
||||
|
||||
This method uses Thread Local Storage. In order to determine if
|
||||
direct mode is enabled, just call directed().
|
||||
memory cache is not cleared by this method, but both shared() and
|
||||
exclusive() do that.
|
||||
Nested calls are reference counted, so it's possible to enter the
|
||||
direct() context more than once (in a nested way) without problems.
|
||||
|
||||
This method exists because some subclasses may have implemented their
|
||||
own in-memory caches and if the locks aren't acquired, they may contain
|
||||
stale data. However, keeping the cache clear may result in a big
|
||||
performance penalty due to the fact that cold caches kill latency.
|
||||
"""
|
||||
counter = getattr(self._tls, "_EntropyRepositoryCacheCounter", 0)
|
||||
self._tls._EntropyRepositoryCacheCounter = counter + 1
|
||||
|
||||
yield
|
||||
|
||||
self._tls._EntropyRepositoryCacheCounter -= 1
|
||||
|
||||
def directed(self):
|
||||
"""
|
||||
Return whether direct mode is enabled or not for the current thread.
|
||||
See direct() for more information.
|
||||
"""
|
||||
return getattr(self._tls, "_EntropyRepositoryCacheCounter", 0) != 0
|
||||
|
||||
@contextlib.contextmanager
|
||||
def shared(self):
|
||||
"""
|
||||
|
||||
@@ -1023,6 +1023,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropySQLRepository.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).getVersioningData(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("getVersioningData")
|
||||
if cached is None:
|
||||
cur = self._cursor().execute("""
|
||||
@@ -1041,6 +1045,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropySQLRepository.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).getStrictData(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("getStrictData")
|
||||
if cached is None:
|
||||
if self._isBaseinfoExtrainfo2010():
|
||||
@@ -1072,6 +1080,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropySQLRepository.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).getStrictScopeData(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("getStrictScopeData")
|
||||
if cached is None:
|
||||
cur = self._cursor().execute("""
|
||||
@@ -1169,6 +1181,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropySQLRepository.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).retrieveDigest(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("retrieveDigest")
|
||||
if cached is None:
|
||||
cur = self._cursor().execute("""
|
||||
@@ -1201,6 +1217,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
We must use the in-memory cache to do some memoization.
|
||||
We must handle _baseinfo_extrainfo_2010.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).retrieveKeySplit(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("retrieveKeySplit")
|
||||
if cached is None:
|
||||
if self._isBaseinfoExtrainfo2010():
|
||||
@@ -1229,6 +1249,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
We must use the in-memory cache to do some memoization.
|
||||
We must handle _baseinfo_extrainfo_2010.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).retrieveKeySlot(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("retrieveKeySlot")
|
||||
if cached is None:
|
||||
if self._isBaseinfoExtrainfo2010():
|
||||
@@ -1257,6 +1281,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
"""
|
||||
Reimplemented from EntropyRepositoryBase.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository,
|
||||
self).retrieveKeySlotAggregated(package_id)
|
||||
|
||||
cached = self._getLiveCache("retrieveKeySlotAggregated")
|
||||
if cached is None:
|
||||
if self._isBaseinfoExtrainfo2010():
|
||||
@@ -1304,6 +1332,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropySQLRepository.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).retrieveVersion(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("retrieveVersion")
|
||||
if cached is None:
|
||||
cur = self._cursor().execute("""
|
||||
@@ -1322,6 +1354,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropySQLRepository.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).retrieveRevision(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("retrieveRevision")
|
||||
if cached is None:
|
||||
cur = self._cursor().execute("""
|
||||
@@ -1340,6 +1376,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropySQLRepository.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).retrieveUseflags(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("retrieveUseflags")
|
||||
if cached is None:
|
||||
cur = self._cursor().execute("""
|
||||
@@ -1365,6 +1405,12 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropyRepositoryBase.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).retrieveDependencies(
|
||||
package_id, exnteded = extended, deptype = deptype,
|
||||
exclude_deptypes = exclude_deptypes,
|
||||
resolve_conditional_deps = resolve_conditional_deps)
|
||||
|
||||
cached = self._getLiveCache("retrieveDependencies")
|
||||
if cached is None:
|
||||
cur = self._cursor().execute("""
|
||||
@@ -1489,6 +1535,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropySQLRepository.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).retrieveSlot(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("retrieveSlot")
|
||||
if cached is None:
|
||||
cur = self._cursor().execute("""
|
||||
@@ -1507,6 +1557,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropySQLRepository.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).retrieveTag(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("retrieveTag")
|
||||
# gain 2% speed on atomMatch()
|
||||
if cached is None:
|
||||
@@ -1527,6 +1581,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
We must handle _baseinfo_extrainfo_2010.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).retrieveCategory(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("retrieveCategory")
|
||||
# this gives 14% speed boost in atomMatch()
|
||||
if cached is None:
|
||||
@@ -1600,6 +1658,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
We must handle _baseinfo_extrainfo_2010.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).searchKeySlot(
|
||||
key, slot)
|
||||
|
||||
cached = self._getLiveCache("searchKeySlot")
|
||||
if cached is None:
|
||||
if self._isBaseinfoExtrainfo2010():
|
||||
@@ -1632,6 +1694,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
We must handle _baseinfo_extrainfo_2010.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).searchKeySlotTag(
|
||||
key, slot, tag)
|
||||
|
||||
cached = self._getLiveCache("searchKeySlotTag")
|
||||
if cached is None:
|
||||
if self._isBaseinfoExtrainfo2010():
|
||||
@@ -1725,6 +1791,10 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
We must handle _baseinfo_extrainfo_2010.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository, self).searchNameCategory(
|
||||
name, category, just_id = just_id)
|
||||
|
||||
cached = self._getLiveCache("searchNameCategory")
|
||||
# this gives 30% speed boost on atomMatch()
|
||||
if cached is None:
|
||||
@@ -2178,6 +2248,11 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropySQLRepository.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository,
|
||||
self).getInstalledPackageRepository(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("getInstalledPackageRepository")
|
||||
if cached is None:
|
||||
cur = self._cursor().execute("""
|
||||
@@ -2196,6 +2271,11 @@ class EntropySQLiteRepository(EntropySQLRepository):
|
||||
Reimplemented from EntropySQLRepositoryBase.
|
||||
We must use the in-memory cache to do some memoization.
|
||||
"""
|
||||
if self.directed():
|
||||
return super(EntropySQLiteRepository,
|
||||
self).getInstalledPackageSource(
|
||||
package_id)
|
||||
|
||||
cached = self._getLiveCache("getInstalledPackageSource")
|
||||
if cached is None:
|
||||
try:
|
||||
|
||||
@@ -6,6 +6,8 @@ import unittest
|
||||
import os
|
||||
import time
|
||||
import tempfile
|
||||
import threading
|
||||
|
||||
from entropy.client.interfaces import Client
|
||||
from entropy.const import etpConst, const_convert_to_unicode, \
|
||||
const_convert_to_rawstring
|
||||
@@ -1071,6 +1073,37 @@ class EntropyRepositoryTest(unittest.TestCase):
|
||||
self.assert_(self.test_db._is_memory())
|
||||
return self._test_repository_locking(self.test_db)
|
||||
|
||||
def test_direct_access(self):
|
||||
local = self.test_db._tls
|
||||
|
||||
self.assertEquals(self.test_db.directed(), False)
|
||||
|
||||
counter = getattr(local, "_EntropyRepositoryCacheCounter", "foo")
|
||||
self.assertEquals(counter, "foo")
|
||||
|
||||
with self.test_db.direct():
|
||||
self.assertEquals(self.test_db.directed(), True)
|
||||
|
||||
counter = local._EntropyRepositoryCacheCounter
|
||||
self.assertEquals(counter, 0)
|
||||
|
||||
self.assertEquals(self.test_db.directed(), False)
|
||||
|
||||
with self.test_db.direct():
|
||||
|
||||
counter = local._EntropyRepositoryCacheCounter
|
||||
self.assertEquals(counter, 1)
|
||||
|
||||
with self.test_db.direct():
|
||||
counter = local._EntropyRepositoryCacheCounter
|
||||
self.assertEquals(counter, 2)
|
||||
|
||||
counter = local._EntropyRepositoryCacheCounter
|
||||
self.assertEquals(counter, 1)
|
||||
|
||||
counter = local._EntropyRepositoryCacheCounter
|
||||
self.assertEquals(counter, 0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user