Files
entropy/libraries/tests/db.py
2010-01-09 01:06:59 +01:00

397 lines
15 KiB
Python

# -*- coding: utf-8 -*-
import sys
sys.path.insert(0, '.')
sys.path.insert(0, '../')
import unittest
import os
from entropy.client.interfaces import Client
from entropy.const import etpConst, etpUi, const_convert_to_unicode, \
const_convert_to_rawstring
from entropy.core.settings.base import SystemSettings
from entropy.db import EntropyRepository
import tests._misc as _misc
class EntropyRepositoryTest(unittest.TestCase):
def setUp(self):
sys.stdout.write("%s called\n" % (self,))
sys.stdout.flush()
self.Client = Client(noclientdb = 2, indexing = False, xcache = False,
repo_validation = False)
self.Spm = self.Client.Spm()
self.test_db_name = "%s_test_suite" % (etpConst['dbnamerepoprefix'],)
self.client_sysset_plugin_id = \
etpConst['system_settings_plugins_ids']['client_plugin']
self.test_db = self.__open_test_db()
self.test_db2 = self.__open_test_db()
self.SystemSettings = SystemSettings()
def tearDown(self):
"""
tearDown is run after each test
"""
sys.stdout.write("%s ran\n" % (self,))
sys.stdout.flush()
self.test_db.closeDB()
self.test_db2.closeDB()
self.Client.destroy()
def __open_test_db(self):
return self.Client.open_memory_database(dbname = self.test_db_name)
def test_db_clearcache(self):
self.test_db.clearCache()
def test_treeupdates_actions(self):
self.assertEqual(self.test_db.listAllTreeUpdatesActions(), [])
updates = [
('move media-libs/x264-svn media-libs/x264', '2020', '1210199116.46'),
('slotmove x11-libs/lesstif 2.1 0', '2020', '1210753863.16')
]
actions = sorted(['move media-libs/x264-svn media-libs/x264',
'slotmove x11-libs/lesstif 2.1 0'])
self.test_db.insertTreeUpdatesActions(updates, self.test_db_name)
db_actions = self.test_db.retrieveTreeUpdatesActions(self.test_db_name,
forbranch = '2020')
self.assertEqual(actions, db_actions)
self.test_db.removeTreeUpdatesActions(self.test_db_name)
db_actions = self.test_db.retrieveTreeUpdatesActions(self.test_db_name,
forbranch = '2020')
self.assertEqual([], db_actions)
def test_db_creation(self):
self.assert_(isinstance(self.test_db, EntropyRepository))
self.assertEqual(self.test_db_name, self.test_db.dbname)
self.assert_(self.test_db._doesTableExist('baseinfo'))
self.assert_(self.test_db._doesTableExist('extrainfo'))
def test_db_metadata_handling(self):
test_entry = {
const_convert_to_unicode("/path/to/foo", "utf-8"): \
const_convert_to_unicode("dir", "utf-8"),
const_convert_to_unicode("/path/to/foo/foo", "utf-8"): \
const_convert_to_unicode("obj", "utf-8"),
}
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
data['content'].update(test_entry.copy())
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
test_pkg2 = _misc.get_test_package2()
data2 = self.Spm.extract_package_metadata(test_pkg2)
data2['content'].update(test_entry.copy())
idpackage2, rev2, new_data2 = self.test_db2.handlePackage(data2)
db_data2 = self.test_db2.getPackageData(idpackage2)
cont_diff = self.test_db.contentDiff(idpackage, self.test_db2,
idpackage2)
for key in test_entry:
try:
self.assert_(key not in cont_diff)
except AssertionError:
print(key)
raise
py_diff = sorted([x for x in db_data['content'] if x not in \
db_data2['content']])
self.assertEqual(sorted(cont_diff), py_diff)
orig_diff = ['/lib64', '/lib64/libz.so', '/lib64/libz.so.1',
'/lib64/libz.so.1.2.3', '/usr/include', '/usr/include/zconf.h',
'/usr/include/zlib.h', '/usr/lib64/libz.a',
'/usr/lib64/libz.so', '/usr/share/doc/zlib-1.2.3-r1',
'/usr/share/doc/zlib-1.2.3-r1/ChangeLog.bz2',
'/usr/share/doc/zlib-1.2.3-r1/FAQ.bz2',
'/usr/share/doc/zlib-1.2.3-r1/README.bz2',
'/usr/share/doc/zlib-1.2.3-r1/algorithm.txt.bz2',
'/usr/share/man', '/usr/share/man/man3',
'/usr/share/man/man3/zlib.3.bz2'
]
orig_diff = [const_convert_to_unicode(x, 'utf-8') for x in orig_diff]
self.assertEqual(orig_diff, py_diff)
# test package content match
self.assertEqual(self.test_db.getIDPackagesFromFile(orig_diff[3]),
[1])
category = self.test_db.retrieveCategory(idpackage)
idcategory = self.test_db.getIDCategory(category)
self.assertEqual(self.test_db.getCategory(idcategory),
new_data['category'])
versioning_data = self.test_db.getVersioningData(idpackage)
dbverdata = (self.test_db.retrieveVersion(idpackage),
self.test_db.retrieveVersionTag(idpackage),
self.test_db.retrieveRevision(idpackage),)
self.assertEqual(versioning_data, dbverdata)
strict_scope = self.test_db.getStrictScopeData(idpackage)
dbverdata = (self.test_db.retrieveAtom(idpackage),
self.test_db.retrieveSlot(idpackage),
self.test_db.retrieveRevision(idpackage),)
self.assertEqual(strict_scope, dbverdata)
scope_data = self.test_db.getScopeData(idpackage)
dbverdata = (
self.test_db.retrieveAtom(idpackage),
self.test_db.retrieveCategory(idpackage),
self.test_db.retrieveName(idpackage),
self.test_db.retrieveVersion(idpackage),
self.test_db.retrieveSlot(idpackage),
self.test_db.retrieveVersionTag(idpackage),
self.test_db.retrieveRevision(idpackage),
self.test_db.retrieveBranch(idpackage),
self.test_db.retrieveApi(idpackage),
)
self.assertEqual(scope_data, dbverdata)
trigger_info = self.test_db.getTriggerInfo(idpackage)
trigger_keys = ['version', 'eclasses', 'etpapi', 'cxxflags', 'cflags',
'chost', 'atom', 'category', 'name', 'versiontag', 'content',
'trigger', 'branch', 'spm_phases', 'revision']
self.assertEqual(sorted(trigger_keys), sorted(trigger_info.keys()))
system_pkgs = self.test_db.retrieveSystemPackages()
self.assertEqual(system_pkgs, set([idpackage]))
def test_db_insert_compare_match_provide(self):
test_pkg = _misc.get_test_entropy_package_provide()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
def test_db_cache(self):
test_pkg = _misc.get_test_entropy_package_provide()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage, rev, new_data = self.test_db.handlePackage(data)
# enable cache
self.test_db.xcache = True
key = new_data['category'] + "/" + new_data['name']
from entropy.cache import EntropyCacher
cacher = EntropyCacher()
cacher.start()
cached = self.test_db._EntropyRepository__atomMatchFetchCache(
key, True, False, False, None, None, False, False, True)
self.assert_(cached is None)
# now store
self.test_db._EntropyRepository__atomMatchStoreCache(
key, True, False, False, None, None, False, False, True,
result = (123, 0)
)
cacher.sync()
cached = self.test_db._EntropyRepository__atomMatchFetchCache(
key, True, False, False, None, None, False, False, True)
self.assertEqual(cached, (123, 0))
def test_db_insert_compare_match(self):
# insert/compare
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom()
pkg_name = _misc.get_test_package_name()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self.SystemSettings[plug_id]['masking_validation']['cache']
f_match_mask = (1,
self.test_db_name[len(etpConst['dbnamerepoprefix']):],)
self.SystemSettings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
self.SystemSettings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
# now test multimatch
idpackage, rev, new_data = self.test_db.addPackage(db_data)
results, rc = self.test_db.atomMatch(pkg_name, multiMatch = True)
self.assertEqual(2, len(results))
self.assert_(isinstance(results, set))
self.assert_(rc == 0)
results, rc = self.test_db.atomMatch(pkg_name+"foo", multiMatch = True)
self.assertEqual(0, len(results))
self.assert_(isinstance(results, set))
self.assert_(rc == 1)
def test_db_insert_compare_match_utf(self):
# insert/compare
test_pkg = _misc.get_test_package2()
data = self.Spm.extract_package_metadata(test_pkg)
# Portage stores them this way
data['changelog'] = const_convert_to_unicode(
"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel")
data['license'] = const_convert_to_unicode('GPL-2')
data['licensedata'] = {
const_convert_to_unicode('GPL-2'): \
const_convert_to_unicode(
"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel"),
}
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom2()
pkg_name = _misc.get_test_package_name2()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self.SystemSettings[plug_id]['masking_validation']['cache']
f_match_mask = (1,
self.test_db_name[len(etpConst['dbnamerepoprefix']):],)
self.SystemSettings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
self.SystemSettings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
def test_db_insert_compare_match_utf2(self):
# insert/compare
test_pkg = _misc.get_test_package3()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom3()
pkg_name = _misc.get_test_package_name3()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self.SystemSettings[plug_id]['masking_validation']['cache']
f_match_mask = (1,
self.test_db_name[len(etpConst['dbnamerepoprefix']):],)
self.SystemSettings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
self.SystemSettings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
def test_db_import_export(self):
test_pkg = _misc.get_test_package2()
data = self.Spm.extract_package_metadata(test_pkg)
# Portage stores them this way
data['changelog'] = const_convert_to_unicode(
"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel")
data['license'] = const_convert_to_unicode('GPL-2')
data['licensedata'] = {
const_convert_to_unicode('GPL-2'): \
const_convert_to_unicode(
"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel"),
}
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
etpUi['mute'] = True
# export
buf_file = "dbtst.txt"
buf = open(buf_file, "wb")
self.test_db.doDatabaseExport(buf)
buf.flush()
buf.close()
new_db_path = "test_db_import_export.db"
self.test_db.doDatabaseImport(buf_file, new_db_path)
new_db = self.Client.open_generic_database(new_db_path)
new_db_data = new_db.getPackageData(idpackage)
new_db.closeDB()
etpUi['mute'] = False
self.assertEqual(new_db_data, db_data)
os.remove(buf_file)
os.remove(new_db_path)
def test_db_package_sets(self):
set_name = 'my_test_set'
set_deps = ["app-foo/foo", "app-pling/plong", "media-foo/ajez"]
set_name2 = 'my_test_set2'
set_deps2 = ["app-foo/foo2", "app-pling/plong2", "media-foo/ajez2"]
pkgsets = {
set_name: set(set_deps),
set_name2: set(set_deps2),
}
self.test_db.insertPackageSets(pkgsets)
self.assertEqual(self.test_db.retrievePackageSets(), pkgsets)
set_search = self.test_db.searchSets(set_name2)
self.assertEqual(set([set_name2]), set_search)
def test_db_license_data_str_insert(self):
lic_txt = const_convert_to_rawstring('[3]\xab foo\n\n', 'utf-8')
lic_name = const_convert_to_unicode('CCPL-Attribution-2.0')
lic_data = {lic_name: lic_txt}
self.test_db.insertLicenses(lic_data)
db_lic_txt = self.test_db.retrieveLicenseText(lic_name)
self.assertEqual(db_lic_txt, lic_txt)
if __name__ == '__main__':
if "--debug" in sys.argv:
sys.argv.remove("--debug")
from entropy.const import etpUi
etpUi['debug'] = True
unittest.main()