Files
entropy/libraries/tests/db.py
2009-10-04 18:01:35 +02:00

282 lines
10 KiB
Python

# -*- coding: utf-8 -*-
import sys
sys.path.insert(0,'.')
sys.path.insert(0,'../')
import unittest
import os
from entropy.client.interfaces import Client
from entropy.const import etpConst, etpUi
from entropy.core.settings.base import SystemSettings
from entropy.db import EntropyRepository
import _misc
class EntropyRepositoryTest(unittest.TestCase):
def setUp(self):
self.Client = Client(noclientdb = 2, indexing = False, xcache = False,
repo_validation = False)
self.Spm = self.Client.Spm()
self.test_db_name = "%s_test_suite" % (etpConst['dbnamerepoprefix'],)
self.client_sysset_plugin_id = \
etpConst['system_settings_plugins_ids']['client_plugin']
self.test_db = self.__open_test_db()
self.test_db2 = self.__open_test_db()
self.SystemSettings = SystemSettings()
def tearDown(self):
"""
tearDown is run after each test
"""
sys.stdout.write("%s ran\n" % (self,))
sys.stdout.flush()
self.test_db.closeDB()
self.test_db2.closeDB()
self.Client.destroy()
def __open_test_db(self):
return self.Client.open_memory_database(dbname = self.test_db_name)
def test_db_creation(self):
self.assert_(isinstance(self.test_db, EntropyRepository))
self.assertEqual(self.test_db_name,self.test_db.dbname)
self.assert_(self.test_db._doesTableExist('baseinfo'))
self.assert_(self.test_db._doesTableExist('extrainfo'))
def test_db_contentdiff(self):
test_entry = {
u'/path/to/foo': u"dir",
u'/path/to/foo/foo': u"obj",
}
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
data['content'].update(test_entry.copy())
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
test_pkg2 = _misc.get_test_package2()
data2 = self.Spm.extract_package_metadata(test_pkg2)
data2['content'].update(test_entry.copy())
idpackage2, rev2, new_data2 = self.test_db2.handlePackage(data2)
db_data2 = self.test_db2.getPackageData(idpackage2)
cont_diff = self.test_db.contentDiff(idpackage, self.test_db2,
idpackage2)
for key in test_entry:
try:
self.assert_(key not in cont_diff)
except AssertionError:
print(key)
raise
py_diff = sorted([x for x in db_data['content'] if x not in \
db_data2['content']])
self.assertEqual(sorted(cont_diff), py_diff)
orig_diff = [u'/lib64', u'/lib64/libz.so', u'/lib64/libz.so.1',
u'/lib64/libz.so.1.2.3', u'/usr/include', u'/usr/include/zconf.h',
u'/usr/include/zlib.h', u'/usr/lib64/libz.a',
u'/usr/lib64/libz.so', u'/usr/share/doc/zlib-1.2.3-r1',
u'/usr/share/doc/zlib-1.2.3-r1/ChangeLog.bz2',
u'/usr/share/doc/zlib-1.2.3-r1/FAQ.bz2',
u'/usr/share/doc/zlib-1.2.3-r1/README.bz2',
u'/usr/share/doc/zlib-1.2.3-r1/algorithm.txt.bz2',
u'/usr/share/man', u'/usr/share/man/man3',
u'/usr/share/man/man3/zlib.3.bz2'
]
self.assertEqual(orig_diff, py_diff)
def test_db_insert_compare_match(self):
# insert/compare
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom()
pkg_name = _misc.get_test_package_name()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self.SystemSettings[plug_id]['masking_validation']['cache']
f_match_mask = (1,
self.test_db_name[len(etpConst['dbnamerepoprefix']):],)
self.SystemSettings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1),self.test_db.atomMatch(pkg_atom))
self.SystemSettings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1),self.test_db.atomMatch(pkg_atom))
# now test multimatch
idpackage, rev, new_data = self.test_db.addPackage(db_data)
results, rc = self.test_db.atomMatch(pkg_name, multiMatch = True)
self.assertEqual(2, len(results))
self.assert_(type(results) is set)
self.assert_(rc == 0)
results, rc = self.test_db.atomMatch(pkg_name+"foo", multiMatch = True)
self.assertEqual(0, len(results))
self.assert_(type(results) is set)
self.assert_(rc == 1)
def test_db_insert_compare_match_utf(self):
# insert/compare
test_pkg = _misc.get_test_package2()
data = self.Spm.extract_package_metadata(test_pkg)
# Portage stores them this way
data['changelog'] = u"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel"
data['license'] = u'GPL-2'
data['licensedata'] = {
u'GPL-2': u"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel",
}
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom2()
pkg_name = _misc.get_test_package_name2()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self.SystemSettings[plug_id]['masking_validation']['cache']
f_match_mask = (1,
self.test_db_name[len(etpConst['dbnamerepoprefix']):],)
self.SystemSettings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1),self.test_db.atomMatch(pkg_atom))
self.SystemSettings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1),self.test_db.atomMatch(pkg_atom))
def test_db_insert_compare_match_utf2(self):
# insert/compare
test_pkg = _misc.get_test_package3()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom3()
pkg_name = _misc.get_test_package_name3()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self.SystemSettings[plug_id]['masking_validation']['cache']
f_match_mask = (1,
self.test_db_name[len(etpConst['dbnamerepoprefix']):],)
self.SystemSettings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1),self.test_db.atomMatch(pkg_atom))
self.SystemSettings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1),self.test_db.atomMatch(pkg_atom))
def test_db_import_export(self):
test_pkg = _misc.get_test_package2()
data = self.Spm.extract_package_metadata(test_pkg)
# Portage stores them this way
data['changelog'] = u"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel"
data['license'] = u'GPL-2'
data['licensedata'] = {
u'GPL-2': u"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel",
}
idpackage, rev, new_data = self.test_db.handlePackage(data)
db_data = self.test_db.getPackageData(idpackage)
self.assertEqual(new_data, db_data)
etpUi['mute'] = True
# export
buf_file = "dbtst.txt"
buf = open(buf_file,"w")
self.test_db.doDatabaseExport(buf)
buf.flush()
buf.close()
new_db_path = "test_db_import_export.db"
self.test_db.doDatabaseImport(buf_file, new_db_path)
new_db = self.Client.open_generic_database(new_db_path)
new_db_data = new_db.getPackageData(idpackage)
new_db.closeDB()
etpUi['mute'] = False
self.assertEqual(new_db_data, db_data)
os.remove(buf_file)
os.remove(new_db_path)
def test_db_package_sets(self):
set_name = 'my_test_set'
set_deps = ["app-foo/foo","app-pling/plong","media-foo/ajez"]
set_name2 = 'my_test_set2'
set_deps2 = ["app-foo/foo2","app-pling/plong2","media-foo/ajez2"]
pkgsets = {
set_name: set(set_deps),
set_name2: set(set_deps2),
}
self.test_db.insertPackageSets(pkgsets)
self.assertEqual(self.test_db.retrievePackageSets(), pkgsets)
set_search = self.test_db.searchSets(set_name2)
self.assertEqual(set([set_name2]),set_search)
def test_db_license_data_str_insert(self):
lic_txt = '[3]\xab foo\n\n'
lic_data = {u'CCPL-Attribution-2.0': lic_txt}
self.test_db.insertLicenses(lic_data)
db_lic_txt = self.test_db.retrieveLicenseText('CCPL-Attribution-2.0')
self.assertEqual(db_lic_txt, lic_txt)
# XXX complete
if __name__ == '__main__':
unittest.main()