Entropy/DistributionUGCInterface:
- add caching infrastructure to avoid running long queries multiple times returning the same result - set cache expiration to 1 day by default git-svn-id: http://svn.sabayonlinux.org/projects/entropy/trunk@2724 cd1c1023-2f26-0410-ae45-c471fc1f0318
This commit is contained in:
@@ -120,6 +120,13 @@ def loadobj(name, completePath = False):
|
||||
return x
|
||||
break
|
||||
|
||||
def getobjmtime(name):
|
||||
mtime = 0
|
||||
dump_path = os.path.join(etpConst['dumpstoragedir'],name)
|
||||
if os.path.isfile(dump_path) and os.access(dump_path,os.R_OK):
|
||||
mtime = os.path.getmtime(dump_path)
|
||||
return int(mtime)
|
||||
|
||||
def removeobj(name):
|
||||
if os.path.isfile(etpConst['dumpstoragedir']+"/"+name+".dmp"):
|
||||
try:
|
||||
|
||||
@@ -17517,8 +17517,8 @@ class DistributionUGCInterface(RemoteDbSkelInterface):
|
||||
dev-python/gdata
|
||||
'''
|
||||
def __init__(self, connection_data, store_path, store_url = ''):
|
||||
import entropyTools
|
||||
self.entropyTools = entropyTools
|
||||
import entropyTools, dumpTools
|
||||
self.entropyTools, self.dumpTools = entropyTools, dumpTools
|
||||
self.store_url = store_url
|
||||
self.FLOOD_INTERVAL = 30
|
||||
self.DOC_TYPES = etpConst['ugc_doctypes'].copy()
|
||||
@@ -17543,6 +17543,44 @@ class DistributionUGCInterface(RemoteDbSkelInterface):
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
self.cached_results = {
|
||||
#'get_ugc_allvotes': (self.get_ugc_allvotes, [], {}),
|
||||
'get_ugc_alldownloads': (self.get_ugc_alldownloads, [], {}),
|
||||
#'get_users_scored_count': (self.get_users_scored_count, [], {}),
|
||||
'get_total_downloads_count': (self.get_total_downloads_count, [], {}),
|
||||
}
|
||||
self.cache_expire_timer = 86400 # 1 day
|
||||
|
||||
def get_current_time(self):
|
||||
return int(time.time())
|
||||
|
||||
def cache_results(self):
|
||||
for cache_item in self.cached_results:
|
||||
fdata = self.cached_results.get(cache_item)
|
||||
if fdata == None: return
|
||||
func, args, kwargs = fdata
|
||||
key = self.get_cache_item_key(cache_item)
|
||||
r = func(*args,**kwargs)
|
||||
self.dumpTools.dumpobj(key, r)
|
||||
|
||||
def get_cache_item_key(self, cache_item):
|
||||
return os.path.join(etpCache['ugc_srv_cache'],cache_item)
|
||||
|
||||
def cache_result(self, cache_item, r):
|
||||
if not self.cached_results.get(cache_item): return None
|
||||
key = self.get_cache_item_key(cache_item)
|
||||
self.dumpTools.dumpobj(key, r)
|
||||
|
||||
def get_cached_result(self, cache_item):
|
||||
if not self.cached_results.get(cache_item): return None
|
||||
key = self.get_cache_item_key(cache_item)
|
||||
cur_time = self.get_current_time()
|
||||
cache_time = self.dumpTools.getobjmtime(key)
|
||||
if (cache_time + self.cache_expire_timer) < cur_time:
|
||||
# expired
|
||||
return None
|
||||
return self.dumpTools.loadobj(key)
|
||||
|
||||
def setup_store_path(self, path):
|
||||
path = os.path.realpath(path)
|
||||
if not os.path.isabs(path):
|
||||
@@ -17763,12 +17801,21 @@ class DistributionUGCInterface(RemoteDbSkelInterface):
|
||||
return vote
|
||||
|
||||
def get_ugc_allvotes(self):
|
||||
|
||||
# cached?
|
||||
cache_item = 'get_ugc_allvotes'
|
||||
cached = self.get_cached_result(cache_item)
|
||||
if cached != None: return cached
|
||||
|
||||
self.check_connection()
|
||||
vote_data = {}
|
||||
self.execute_query('SELECT entropy_base.`key` as `vkey`,avg(entropy_votes.vote) as `avg_vote` FROM entropy_votes,entropy_base WHERE entropy_votes.`idkey` = entropy_base.`idkey` GROUP BY entropy_base.`key`')
|
||||
data = self.fetchall()
|
||||
for d_dict in data:
|
||||
vote_data[d_dict['vkey']] = d_dict['avg_vote']
|
||||
|
||||
# do cache
|
||||
self.cache_result(cache_item, vote_data)
|
||||
return vote_data
|
||||
|
||||
def get_ugc_downloads(self, pkgkey):
|
||||
@@ -17781,12 +17828,21 @@ class DistributionUGCInterface(RemoteDbSkelInterface):
|
||||
return downloads
|
||||
|
||||
def get_ugc_alldownloads(self):
|
||||
|
||||
# cached?
|
||||
cache_item = 'get_ugc_alldownloads'
|
||||
cached = self.get_cached_result(cache_item)
|
||||
if cached != None: return cached
|
||||
|
||||
self.check_connection()
|
||||
down_data = {}
|
||||
self.execute_query('SELECT SQL_CACHE entropy_base.`key` as `vkey`,sum(entropy_downloads.`count`) as `tot_downloads` FROM entropy_downloads,entropy_base WHERE entropy_downloads.`idkey` = entropy_base.`idkey` GROUP BY entropy_base.`key`')
|
||||
self.execute_query('SELECT SQL_CACHE entropy_base.`key` as `vkey`,sum(entropy_downloads.`count`) as `tot_downloads` FROM entropy_downloads,entropy_base WHERE entropy_downloads.`idkey` = entropy_base.`idkey` GROUP BY entropy_base.`idkey`')
|
||||
data = self.fetchall()
|
||||
for d_dict in data:
|
||||
down_data[d_dict['vkey']] = d_dict['tot_downloads']
|
||||
|
||||
# do cache
|
||||
self.cache_result(cache_item, down_data)
|
||||
return down_data
|
||||
|
||||
def get_iddoc_userid(self, iddoc):
|
||||
@@ -17824,12 +17880,22 @@ class DistributionUGCInterface(RemoteDbSkelInterface):
|
||||
return 0
|
||||
|
||||
def get_total_downloads_count(self):
|
||||
|
||||
# cached?
|
||||
cache_item = 'get_total_downloads_count'
|
||||
cached = self.get_cached_result(cache_item)
|
||||
if cached != None: return cached
|
||||
|
||||
self.check_connection()
|
||||
self.execute_query('SELECT SQL_CACHE sum(entropy_downloads.`count`) as downloads FROM entropy_downloads')
|
||||
data = self.fetchone()
|
||||
r = 0
|
||||
if isinstance(data,dict):
|
||||
if data['downloads']: return int(data['downloads'])
|
||||
return 0
|
||||
if data['downloads']: r = int(data['downloads'])
|
||||
|
||||
# do cache
|
||||
self.cache_result(cache_item, r)
|
||||
return r
|
||||
|
||||
def get_user_score_ranking(self, userid):
|
||||
self.check_connection()
|
||||
@@ -17841,12 +17907,22 @@ class DistributionUGCInterface(RemoteDbSkelInterface):
|
||||
return 0
|
||||
|
||||
def get_users_scored_count(self):
|
||||
|
||||
# cached?
|
||||
cache_item = 'get_users_scored_count'
|
||||
cached = self.get_cached_result(cache_item)
|
||||
if cached != None: return cached
|
||||
|
||||
self.check_connection()
|
||||
self.execute_query('SELECT SQL_CACHE count(`userid`) as mycount FROM entropy_user_scores')
|
||||
data = self.fetchone()
|
||||
r = 0
|
||||
if isinstance(data,dict):
|
||||
if data.get('mycount'): return data['mycount']
|
||||
return 0
|
||||
if data.get('mycount'): r = data['mycount']
|
||||
|
||||
# do cache
|
||||
self.cache_result(cache_item, r)
|
||||
return r
|
||||
|
||||
def is_user_score_available(self, userid):
|
||||
self.check_connection()
|
||||
@@ -28329,7 +28405,7 @@ class ServerMirrorsInterface:
|
||||
)
|
||||
errors, m_fine_uris, m_broken_uris = uploader.go()
|
||||
if errors:
|
||||
my_fine_uris = sorted([self.entropyTools.extractFTPHostFromUri(x) for x in m_fine_uris])
|
||||
#my_fine_uris = sorted([self.entropyTools.extractFTPHostFromUri(x) for x in m_fine_uris])
|
||||
my_broken_uris = sorted([(self.entropyTools.extractFTPHostFromUri(x[0]),x[1]) for x in m_broken_uris])
|
||||
self.Entropy.updateProgress(
|
||||
"[repo:%s|%s|%s] %s" % (
|
||||
@@ -28444,7 +28520,7 @@ class ServerMirrorsInterface:
|
||||
)
|
||||
errors, m_fine_uris, m_broken_uris = downloader.go()
|
||||
if errors:
|
||||
my_fine_uris = sorted([self.entropyTools.extractFTPHostFromUri(x) for x in m_fine_uris])
|
||||
#my_fine_uris = sorted([self.entropyTools.extractFTPHostFromUri(x) for x in m_fine_uris])
|
||||
my_broken_uris = sorted([(self.entropyTools.extractFTPHostFromUri(x[0]),x[1]) for x in m_broken_uris])
|
||||
self.Entropy.updateProgress(
|
||||
"[repo:%s|%s|%s] %s" % (
|
||||
|
||||
@@ -403,6 +403,7 @@ etpCache = {
|
||||
'ugc_votes': 'ugc/ugc_votes',
|
||||
'ugc_downloads': 'ugc/ugc_downloads',
|
||||
'ugc_docs': 'ugc/ugc_docs',
|
||||
'ugc_srv_cache': 'ugc/ugc_srv_cache'
|
||||
}
|
||||
|
||||
# ahahaha
|
||||
|
||||
Reference in New Issue
Block a user