Files
entropy/libraries/tests/db.py
Fabio Erculiani f5b016a146 [entropy.db] make EntropyRepository process-safe and thread-safe at the same time
This is kinda experimental but it seems to work just fine and, at the moment
there are no multithreaded accesses to EntropyRepository.
This patch makes write mutex in EntropyRepository useless because EntropyRepository
will automatically allocate new connections and cursors for every new
thread/pid coming in, avoiding to leak open files around.
SQLite3 is thread-safe when using one connection per thread, so... here it is!
2010-01-29 23:39:42 +01:00

450 lines
17 KiB
Python

# -*- coding: utf-8 -*-
import sys
sys.path.insert(0, '.')
sys.path.insert(0, '../')
import unittest
import os
from entropy.client.interfaces import Client
from entropy.const import etpConst, etpUi, const_convert_to_unicode, \
const_convert_to_rawstring
from entropy.core.settings.base import SystemSettings
from entropy.misc import ParallelTask
from entropy.db import EntropyRepository
import tests._misc as _misc
import entropy.tools
class EntropyRepositoryTest(unittest.TestCase):
def setUp(self):
sys.stdout.write("%s called\n" % (self,))
sys.stdout.flush()
self.Client = Client(noclientdb = 2, indexing = False, xcache = False,
repo_validation = False)
self.Spm = self.Client.Spm()
self.test_db_name = "%s_test_suite" % (etpConst['dbnamerepoprefix'],)
self.client_sysset_plugin_id = \
etpConst['system_settings_plugins_ids']['client_plugin']
self.test_db = self.__open_test_db()
self.test_db2 = self.__open_test_db()
self.SystemSettings = SystemSettings()
def tearDown(self):
"""
tearDown is run after each test
"""
sys.stdout.write("%s ran\n" % (self,))
sys.stdout.flush()
self.test_db.closeDB()
self.test_db2.closeDB()
self.Client.destroy()
def __open_test_db(self):
return self.Client.open_memory_database(dbname = self.test_db_name)
def test_db_clearcache(self):
self.test_db.clearCache()
def test_treeupdates_actions(self):
self.assertEqual(self.test_db.listAllTreeUpdatesActions(), [])
updates = [
('move media-libs/x264-svn media-libs/x264', '2020', '1210199116.46'),
('slotmove x11-libs/lesstif 2.1 0', '2020', '1210753863.16')
]
actions = sorted(['move media-libs/x264-svn media-libs/x264',
'slotmove x11-libs/lesstif 2.1 0'])
self.test_db.insertTreeUpdatesActions(updates, self.test_db_name)
db_actions = self.test_db.retrieveTreeUpdatesActions(self.test_db_name,
forbranch = '2020')
self.assertEqual(actions, db_actions)
self.test_db.removeTreeUpdatesActions(self.test_db_name)
db_actions = self.test_db.retrieveTreeUpdatesActions(self.test_db_name,
forbranch = '2020')
self.assertEqual([], db_actions)
def test_db_creation(self):
self.assert_(isinstance(self.test_db, EntropyRepository))
self.assertEqual(self.test_db_name, self.test_db.dbname)
self.assert_(self.test_db._doesTableExist('baseinfo'))
self.assert_(self.test_db._doesTableExist('extrainfo'))
def test_db_metadata_handling(self):
test_entry = {
const_convert_to_unicode("/path/to/foo", "utf-8"): \
const_convert_to_unicode("dir", "utf-8"),
const_convert_to_unicode("/path/to/foo/foo", "utf-8"): \
const_convert_to_unicode("obj", "utf-8"),
}
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
data['content'].update(test_entry.copy())
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
test_pkg2 = _misc.get_test_package2()
data2 = self.Spm.extract_package_metadata(test_pkg2)
data2['content'].update(test_entry.copy())
idpackage2, rev2, new_data2 = self.test_db2.handlePackage(data2)
db_data2 = self.test_db2.getPackageData(idpackage2)
cont_diff = self.test_db.contentDiff(idpackage, self.test_db2,
idpackage2)
for key in test_entry:
try:
self.assert_(key not in cont_diff)
except AssertionError:
print(key)
raise
py_diff = sorted([x for x in db_data['content'] if x not in \
db_data2['content']])
self.assertEqual(sorted(cont_diff), py_diff)
orig_diff = ['/lib64', '/lib64/libz.so', '/lib64/libz.so.1',
'/lib64/libz.so.1.2.3', '/usr/include', '/usr/include/zconf.h',
'/usr/include/zlib.h', '/usr/lib64/libz.a',
'/usr/lib64/libz.so', '/usr/share/doc/zlib-1.2.3-r1',
'/usr/share/doc/zlib-1.2.3-r1/ChangeLog.bz2',
'/usr/share/doc/zlib-1.2.3-r1/FAQ.bz2',
'/usr/share/doc/zlib-1.2.3-r1/README.bz2',
'/usr/share/doc/zlib-1.2.3-r1/algorithm.txt.bz2',
'/usr/share/man', '/usr/share/man/man3',
'/usr/share/man/man3/zlib.3.bz2'
]
orig_diff = [const_convert_to_unicode(x, 'utf-8') for x in orig_diff]
self.assertEqual(orig_diff, py_diff)
# test package content match
self.assertEqual(self.test_db.getIDPackagesFromFile(orig_diff[3]),
[1])
category = self.test_db.retrieveCategory(idpackage)
idcategory = self.test_db.getIDCategory(category)
self.assertEqual(self.test_db.getCategory(idcategory),
new_data['category'])
versioning_data = self.test_db.getVersioningData(idpackage)
dbverdata = (self.test_db.retrieveVersion(idpackage),
self.test_db.retrieveVersionTag(idpackage),
self.test_db.retrieveRevision(idpackage),)
self.assertEqual(versioning_data, dbverdata)
strict_scope = self.test_db.getStrictScopeData(idpackage)
dbverdata = (self.test_db.retrieveAtom(idpackage),
self.test_db.retrieveSlot(idpackage),
self.test_db.retrieveRevision(idpackage),)
self.assertEqual(strict_scope, dbverdata)
scope_data = self.test_db.getScopeData(idpackage)
dbverdata = (
self.test_db.retrieveAtom(idpackage),
self.test_db.retrieveCategory(idpackage),
self.test_db.retrieveName(idpackage),
self.test_db.retrieveVersion(idpackage),
self.test_db.retrieveSlot(idpackage),
self.test_db.retrieveVersionTag(idpackage),
self.test_db.retrieveRevision(idpackage),
self.test_db.retrieveBranch(idpackage),
self.test_db.retrieveApi(idpackage),
)
self.assertEqual(scope_data, dbverdata)
trigger_info = self.test_db.getTriggerInfo(idpackage)
trigger_keys = ['version', 'eclasses', 'etpapi', 'cxxflags', 'cflags',
'chost', 'atom', 'category', 'name', 'versiontag', 'content',
'trigger', 'branch', 'spm_phases', 'revision']
self.assertEqual(sorted(trigger_keys), sorted(trigger_info.keys()))
system_pkgs = self.test_db.retrieveSystemPackages()
self.assertEqual(system_pkgs, set([idpackage]))
def test_db_insert_compare_match_provide(self):
test_pkg = _misc.get_test_entropy_package_provide()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
def test_db_cache(self):
test_pkg = _misc.get_test_entropy_package_provide()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage, rev, new_data = self.test_db.handlePackage(data)
# enable cache
self.test_db.xcache = True
key = new_data['category'] + "/" + new_data['name']
from entropy.cache import EntropyCacher
cacher = EntropyCacher()
cacher.start()
cached = self.test_db._EntropyRepository__atomMatchFetchCache(
key, True, False, False, None, None, False, False, True)
self.assert_(cached is None)
# now store
self.test_db._EntropyRepository__atomMatchStoreCache(
key, True, False, False, None, None, False, False, True,
result = (123, 0)
)
cacher.sync()
cached = self.test_db._EntropyRepository__atomMatchFetchCache(
key, True, False, False, None, None, False, False, True)
self.assertEqual(cached, (123, 0))
def test_db_insert_compare_match(self):
# insert/compare
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom()
pkg_name = _misc.get_test_package_name()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self.SystemSettings[plug_id]['masking_validation']['cache']
f_match_mask = (1,
self.test_db_name[len(etpConst['dbnamerepoprefix']):],)
self.SystemSettings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
self.SystemSettings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
# now test multimatch
idpackage, rev, new_data = self.test_db.addPackage(db_data)
results, rc = self.test_db.atomMatch(pkg_name, multiMatch = True)
self.assertEqual(2, len(results))
self.assert_(isinstance(results, set))
self.assert_(rc == 0)
results, rc = self.test_db.atomMatch(pkg_name+"foo", multiMatch = True)
self.assertEqual(0, len(results))
self.assert_(isinstance(results, set))
self.assert_(rc == 1)
def test_db_insert_compare_match_utf(self):
# insert/compare
test_pkg = _misc.get_test_package2()
data = self.Spm.extract_package_metadata(test_pkg)
# Portage stores them this way
data['changelog'] = const_convert_to_unicode(
"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel")
data['license'] = const_convert_to_unicode('GPL-2')
data['licensedata'] = {
const_convert_to_unicode('GPL-2'): \
const_convert_to_unicode(
"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel"),
}
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom2()
pkg_name = _misc.get_test_package_name2()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self.SystemSettings[plug_id]['masking_validation']['cache']
f_match_mask = (1,
self.test_db_name[len(etpConst['dbnamerepoprefix']):],)
self.SystemSettings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
self.SystemSettings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
def test_db_insert_compare_match_utf2(self):
# insert/compare
test_pkg = _misc.get_test_package3()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom3()
pkg_name = _misc.get_test_package_name3()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self.SystemSettings[plug_id]['masking_validation']['cache']
f_match_mask = (1,
self.test_db_name[len(etpConst['dbnamerepoprefix']):],)
self.SystemSettings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
self.SystemSettings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
def test_db_insert_compare_match_tag(self):
# insert/compare
test_pkg = _misc.get_test_entropy_package_tag()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
# match
f_match = (1, 0)
for atom, pkg_id, branch in self.test_db.listAllPackages():
pkg_key = entropy.tools.dep_getkey(atom)
self.assertEqual(f_match, self.test_db.atomMatch(pkg_key))
self.assertEqual(f_match, self.test_db.atomMatch(atom))
self.assertEqual(f_match, self.test_db.atomMatch("~"+atom))
def test_db_multithread(self):
# insert/compare
test_pkg = _misc.get_test_entropy_package_tag()
data = self.Spm.extract_package_metadata(test_pkg)
def handle_pkg(xdata):
idpackage, rev, new_data = self.test_db.handlePackage(xdata)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
t1 = ParallelTask(handle_pkg, data)
t2 = ParallelTask(handle_pkg, data)
t3 = ParallelTask(handle_pkg, data)
t4 = ParallelTask(handle_pkg, data)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
cur_cache = self.test_db._EntropyRepository__cursor_cache.keys()
self.assert_(len(cur_cache) > 0)
self.test_db._cleanup_stale_cur_conn(kill_all = True)
cur_cache = self.test_db._EntropyRepository__cursor_cache.keys()
self.assertEqual(len(cur_cache), 0)
def test_db_import_export(self):
test_pkg = _misc.get_test_package2()
data = self.Spm.extract_package_metadata(test_pkg)
# Portage stores them this way
data['changelog'] = const_convert_to_unicode(
"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel")
data['license'] = const_convert_to_unicode('GPL-2')
data['licensedata'] = {
const_convert_to_unicode('GPL-2'): \
const_convert_to_unicode(
"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel"),
}
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
etpUi['mute'] = True
# export
buf_file = "dbtst.txt"
buf = open(buf_file, "wb")
self.test_db.doDatabaseExport(buf)
buf.flush()
buf.close()
new_db_path = "test_db_import_export.db"
self.test_db.doDatabaseImport(buf_file, new_db_path)
new_db = self.Client.open_generic_database(new_db_path)
new_db_data = new_db.getPackageData(idpackage)
new_db.closeDB()
etpUi['mute'] = False
self.assertEqual(new_db_data, db_data)
os.remove(buf_file)
os.remove(new_db_path)
def test_db_package_sets(self):
set_name = 'my_test_set'
set_deps = ["app-foo/foo", "app-pling/plong", "media-foo/ajez"]
set_name2 = 'my_test_set2'
set_deps2 = ["app-foo/foo2", "app-pling/plong2", "media-foo/ajez2"]
pkgsets = {
set_name: set(set_deps),
set_name2: set(set_deps2),
}
self.test_db.insertPackageSets(pkgsets)
self.assertEqual(self.test_db.retrievePackageSets(), pkgsets)
set_search = self.test_db.searchSets(set_name2)
self.assertEqual(set([set_name2]), set_search)
def test_db_license_data_str_insert(self):
lic_txt = const_convert_to_rawstring('[3]\xab foo\n\n', 'utf-8')
lic_name = const_convert_to_unicode('CCPL-Attribution-2.0')
lic_data = {lic_name: lic_txt}
self.test_db.insertLicenses(lic_data)
db_lic_txt = self.test_db.retrieveLicenseText(lic_name)
self.assertEqual(db_lic_txt, lic_txt)
if __name__ == '__main__':
if "--debug" in sys.argv:
sys.argv.remove("--debug")
from entropy.const import etpUi
etpUi['debug'] = True
unittest.main()