Files
entropy/lib/tests/db.py
Fabio Erculiani e717f7a1b3 [entropy.db.sqlite] drop support for locking based on rwsem for memory repositories
Firstly, rwsem is semantically different from flock (but this was known) and
this may confuse the API consumer. Secondly, the locking infrastructure is
purely meant for inter-process synchronization, threads synchronization is
not a current use case.
2013-12-07 20:25:55 +01:00

1069 lines
41 KiB
Python

# -*- coding: utf-8 -*-
import sys
sys.path.insert(0, '.')
sys.path.insert(0, '../')
import unittest
import os
import time
import tempfile
from entropy.client.interfaces import Client
from entropy.const import etpConst, const_convert_to_unicode, \
const_convert_to_rawstring
from entropy.output import set_mute
from entropy.core.settings.base import SystemSettings
from entropy.misc import ParallelTask
from entropy.db import EntropyRepository
import tests._misc as _misc
import entropy.dep
import entropy.tools
class EntropyRepositoryTest(unittest.TestCase):
def setUp(self):
self.Client = Client(installed_repo = -1, indexing = False,
xcache = False, repo_validation = False)
self.Spm = self.Client.Spm()
self.test_db_name = "test_suite"
self.client_sysset_plugin_id = \
etpConst['system_settings_plugins_ids']['client_plugin']
self.test_db = self.__open_test_db(":memory:")
# GenericRepository supports package masking if this property is set
self.test_db.enable_mask_filter = True
self.test_db2 = self.__open_test_db(":memory:")
# GenericRepository supports package masking if this property is set
self.test_db2.enable_mask_filter = True
self._settings = SystemSettings()
# since package files have been produced on amd64, add the same
# arch to etpConst['keywords'] to avoid atomMatch failures on x86
# and arm/other arches.
self._original_keywords = etpConst['keywords'].copy()
etpConst['keywords'].add("~amd64")
etpConst['keywords'].add("amd64")
def tearDown(self):
"""
tearDown is run after each test
"""
self.test_db.close()
self.test_db2.close()
# calling destroy() and shutdown()
# need to call destroy() directly to remove all the SystemSettings
# plugins because shutdown() doesn't, since it's meant to be called
# right before terminating the process
self.Client.destroy()
self.Client.shutdown()
etpConst['keywords'] = self._original_keywords.copy()
def __open_test_db(self, tmp_path):
return self.Client.open_temp_repository(name = self.test_db_name,
temp_file = tmp_path)
def test_db_clearcache(self):
self.test_db.clearCache()
def test_treeupdates_config_files_update(self):
files = _misc.get_config_files_updates_test_files()
actions = [
"move app-admin/anaconda app-admin/fuckaconda",
"slotmove app-admin/anaconda 0 2", # unsupported
"move media-sound/xmms2 media-sound/deadashell",
"move media-sound/pulseaudio media-sound/idiotaudio",
"move sys-auth/pambase sys-auth/fuckbase",
"move sys-devel/gcc sys-devel/fuckcc"
]
config_map = {
'._cfg0000_packages.db.critical': 'faa50df927223bb6de967e33179803b7',
'._cfg0000_packages.db.system_mask': 'b7f536785e315f7c104c7185b0bfe608',
'._cfg0000_packages.server.dep_blacklist.test': '8180f9e89d57f788e5b4bab05e30d447',
'._cfg0000_packages.server.dep_rewrite.test': 'c31d66b7f03c725e586a6e22941b8082',
}
for file_path in files:
updated_files = self.test_db._runConfigurationFilesUpdate(actions,
[file_path])
self.assertTrue(len(updated_files) == 1)
updated_file = list(updated_files)[0]
md5sum = entropy.tools.md5sum(updated_file)
os.remove(updated_file)
updated_name = os.path.basename(updated_file)
self.assertEqual(config_map[updated_name], md5sum)
def test_treeupdates_actions(self):
self.assertEqual(self.test_db.listAllTreeUpdatesActions(), tuple())
updates = (
('move media-libs/x264-svn media-libs/x264', '2020', '1210199116.46'),
('slotmove x11-libs/lesstif 2.1 0', '2020', '1210753863.16')
)
updates_out = (
(1, 'test_suite', 'move media-libs/x264-svn media-libs/x264', '2020', '1210199116.46'),
(2, 'test_suite', 'slotmove x11-libs/lesstif 2.1 0', '2020', '1210753863.16')
)
actions = tuple(sorted(['move media-libs/x264-svn media-libs/x264',
'slotmove x11-libs/lesstif 2.1 0']))
self.test_db.insertTreeUpdatesActions(updates, self.test_db_name)
db_actions = self.test_db.retrieveTreeUpdatesActions(self.test_db_name)
self.assertEqual(actions, db_actions)
self.assertEqual(updates_out, self.test_db.listAllTreeUpdatesActions())
self.test_db.removeTreeUpdatesActions(self.test_db_name)
db_actions = self.test_db.retrieveTreeUpdatesActions(self.test_db_name)
self.assertEqual(tuple(), db_actions)
def test_contentsafety(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
_misc.clean_pkg_metadata(data)
self.assertEqual(data, db_data)
path = "/usr/include/zconf.h"
content_safety = self.test_db.searchContentSafety(path)
self.assertEqual(content_safety, (
{'package_id': 1,
'sha256': data['content_safety'][path]['sha256'],
'path': '/usr/include/zconf.h',
'mtime': data['content_safety'][path]['mtime']},)
)
def test_needed(self):
test_pkg1 = _misc.get_test_package()
test_pkg2 = _misc.get_test_package4()
for test_pkg in (test_pkg1, test_pkg2):
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
_misc.clean_pkg_metadata(data)
self.assertEqual(data, db_data)
db_needed = self.test_db.retrieveNeeded(
idpackage, extended = True)
self.assertEqual(db_needed, data['needed'])
db_needed = self.test_db.retrieveNeeded(idpackage)
self.assertEqual(
db_needed,
tuple((lib for lib, elf_c in data['needed'])))
def test_dependencies(self):
test_pkg = _misc.get_test_package3()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
_misc.clean_pkg_metadata(data)
self.assertEqual(data, db_data)
pkg_deps = sorted(
self.test_db.retrieveDependencies(
idpackage, extended = True))
orig_pkg_deps = sorted([
('=dev-libs/apr-1*', 0),
('dev-libs/openssl', 0),
('dev-libs/libpcre', 0),
('=dev-libs/apr-util-1*', 0),
('=dev-libs/apr-1*', 3),
('dev-libs/openssl', 3),
('dev-libs/libpcre', 3),
('=dev-libs/apr-util-1*', 3),
])
self.assertEqual(pkg_deps, orig_pkg_deps)
def test_use_dependencies(self):
test_pkg = _misc.get_test_entropy_package6()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
_misc.clean_pkg_metadata(data)
self.assertEqual(data, db_data)
useflags = self.test_db.retrieveUseflags(idpackage)
self.assertTrue("gtk" not in useflags)
self.assertTrue("-gtk" in useflags)
self.assertTrue("-kde" in useflags)
self.assertTrue("-debug" in useflags)
self.assertTrue("-examples" in useflags)
def test_content(self):
test_pkg = _misc.get_test_package3()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
_misc.clean_pkg_metadata(data)
self.assertEqual(data, db_data)
content = self.test_db.retrieveContent(
idpackage, extended = True, order_by="file")
orig_content = (('/usr/sbin/ab2-ssl', 'sym'),
('/usr/sbin/logresolve2', 'sym'),
('/usr/sbin/log_server_status', 'obj'),
('/usr/sbin/checkgid2', 'sym'),
('/usr/sbin/htdbm', 'obj'),
('/usr/sbin/rotatelogs2', 'sym'),
('/usr/share/man/man1/htpasswd.1.bz2', 'obj'),
('/usr/sbin/ab-ssl', 'sym'),
('/usr/sbin/htcacheclean2', 'sym'),
('/usr/sbin/split-logfile2', 'sym'),
('/usr/share/man/man8', 'dir'),
('/usr/sbin/htcacheclean', 'obj'),
('/usr/sbin', 'dir'), ('/usr/sbin/ab', 'obj'),
('/usr/share/doc/apache-tools-2.2.11/CHANGES.bz2', 'obj'),
('/usr/sbin/htpasswd', 'obj'), ('/usr', 'dir'),
('/usr/bin/htpasswd', 'sym'),
('/usr/share/man/man1/htdigest.1.bz2', 'obj'),
('/usr/sbin/dbmmanage', 'obj'), ('/usr/share', 'dir'),
('/usr/share/man/man1', 'dir'), ('/usr/sbin/htdbm2', 'sym'),
('/usr/sbin/log_server_status2', 'sym'),
('/usr/share/man/man1/dbmmanage.1.bz2', 'obj'),
('/usr/share/man', 'dir'), ('/usr/sbin/htpasswd2', 'sym'),
('/usr/sbin/htdigest2', 'sym'), ('/usr/sbin/httxt2dbm2', 'sym'),
('/usr/bin', 'dir'), ('/usr/sbin/logresolve', 'obj'),
('/usr/share/doc', 'dir'), ('/usr/share/man/man8/ab.8.bz2', 'obj'),
('/usr/share/man/man8/logresolve.8.bz2', 'obj'),
('/usr/share/man/man8/htcacheclean.8.bz2', 'obj'),
('/usr/sbin/rotatelogs', 'obj'), ('/usr/sbin/checkgid', 'obj'),
('/usr/share/man/man1/htdbm.1.bz2', 'obj'),
('/usr/sbin/dbmmanage2', 'sym'), ('/usr/sbin/httxt2dbm', 'obj'),
('/usr/sbin/split-logfile', 'obj'),
('/usr/sbin/htdigest', 'obj'),
('/usr/share/doc/apache-tools-2.2.11', 'dir'),
('/usr/sbin/ab2', 'sym'),
('/usr/share/man/man8/rotatelogs.8.bz2', 'obj')
)
self.assertEqual(
content,
tuple(sorted(orig_content, key = lambda x: x[0])))
def test_db_creation(self):
self.assertTrue(isinstance(self.test_db, EntropyRepository))
self.assertEqual(self.test_db_name, self.test_db.repository_id())
self.assertTrue(self.test_db._doesTableExist('baseinfo'))
self.assertTrue(self.test_db._doesTableExist('extrainfo'))
def test_db_metadata_handling(self):
test_entry = {
const_convert_to_unicode("/path/to/foo", "utf-8"): \
const_convert_to_unicode("dir", "utf-8"),
const_convert_to_unicode("/path/to/foo/foo", "utf-8"): \
const_convert_to_unicode("obj", "utf-8"),
}
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
data['content'].update(test_entry.copy())
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
test_pkg2 = _misc.get_test_package2()
data2 = self.Spm.extract_package_metadata(test_pkg2)
data2['content'].update(test_entry.copy())
idpackage2 = self.test_db2.addPackage(data2)
db_data2 = self.test_db2.getPackageData(idpackage2)
cont_diff = self.test_db.contentDiff(idpackage, self.test_db2,
idpackage2)
for key in test_entry:
try:
self.assertTrue(key not in cont_diff)
except AssertionError:
print(key)
raise
py_diff = sorted([x for x in db_data['content'] if x not in \
db_data2['content']])
self.assertEqual(sorted(cont_diff), py_diff)
orig_diff = ['/lib64', '/lib64/libz.so', '/lib64/libz.so.1',
'/lib64/libz.so.1.2.3', '/usr/include', '/usr/include/zconf.h',
'/usr/include/zlib.h', '/usr/lib64/libz.a',
'/usr/lib64/libz.so', '/usr/share/doc/zlib-1.2.3-r1',
'/usr/share/doc/zlib-1.2.3-r1/ChangeLog.bz2',
'/usr/share/doc/zlib-1.2.3-r1/FAQ.bz2',
'/usr/share/doc/zlib-1.2.3-r1/README.bz2',
'/usr/share/doc/zlib-1.2.3-r1/algorithm.txt.bz2',
'/usr/share/man', '/usr/share/man/man3',
'/usr/share/man/man3/zlib.3.bz2'
]
orig_diff = [const_convert_to_unicode(x, 'utf-8') for x in orig_diff]
self.assertEqual(orig_diff, py_diff)
versioning_data = self.test_db.getVersioningData(idpackage)
dbverdata = (self.test_db.retrieveVersion(idpackage),
self.test_db.retrieveTag(idpackage),
self.test_db.retrieveRevision(idpackage),)
self.assertEqual(versioning_data, dbverdata)
strict_scope = self.test_db.getStrictScopeData(idpackage)
dbverdata = (self.test_db.retrieveAtom(idpackage),
self.test_db.retrieveSlot(idpackage),
self.test_db.retrieveRevision(idpackage),)
self.assertEqual(strict_scope, dbverdata)
scope_data = self.test_db.getScopeData(idpackage)
dbverdata = (
self.test_db.retrieveAtom(idpackage),
self.test_db.retrieveCategory(idpackage),
self.test_db.retrieveName(idpackage),
self.test_db.retrieveVersion(idpackage),
self.test_db.retrieveSlot(idpackage),
self.test_db.retrieveTag(idpackage),
self.test_db.retrieveRevision(idpackage),
self.test_db.retrieveBranch(idpackage),
self.test_db.retrieveApi(idpackage),
)
self.assertEqual(scope_data, dbverdata)
trigger_info = self.test_db.getTriggerData(idpackage)
trigger_keys = ['version', 'etpapi', 'slot', 'cxxflags', 'cflags',
'chost', 'atom', 'category', 'name', 'versiontag', 'content',
'trigger', 'branch', 'spm_phases', 'revision']
self.assertEqual(sorted(trigger_keys), sorted(trigger_info.keys()))
def test_db_insert_compare_match_provide(self):
test_pkg = _misc.get_test_entropy_package_provide()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
_misc.clean_pkg_metadata(data)
self.assertEqual(data, db_data)
def test_db_cache(self):
test_pkg = _misc.get_test_entropy_package_provide()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
# enable cache
self.test_db._caching = True
key = data['category'] + "/" + data['name']
from entropy.cache import EntropyCacher
cacher = EntropyCacher()
started = cacher.is_started()
cacher.start()
# avoid race conditions, unittest bug
time.sleep(2)
cached = self.test_db._EntropyRepositoryBase__atomMatchFetchCache(
key, True, False, False, None, None, False, False, True)
self.assertTrue(cached is None)
# now store
self.test_db._EntropyRepositoryBase__atomMatchStoreCache(
key, True, False, False, None, None, False, False, True,
result = (123, 0)
)
cacher.sync()
cached = self.test_db._EntropyRepositoryBase__atomMatchFetchCache(
key, True, False, False, None, None, False, False, True)
self.assertEqual(cached, (123, 0))
if not started:
cacher.stop()
def test_db_insert_compare_match(self):
# insert/compare
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
_misc.clean_pkg_metadata(data)
self.assertEqual(data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom()
pkg_name = _misc.get_test_package_name()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self._settings[plug_id]['masking_validation']['cache']
f_match_mask = (1, self.test_db_name,)
self._settings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
self._settings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
# now test multimatch
idpackage = self.test_db.addPackage(db_data)
results, rc = self.test_db.atomMatch(pkg_name, multiMatch = True)
self.assertEqual(2, len(results))
self.assertTrue(isinstance(results, set))
self.assertTrue(rc == 0)
results, rc = self.test_db.atomMatch(pkg_name+"foo", multiMatch = True)
self.assertEqual(0, len(results))
self.assertTrue(isinstance(results, set))
self.assertTrue(rc == 1)
def test_db_insert_compare_match_utf(self):
# insert/compare
test_pkg = _misc.get_test_package2()
data = self.Spm.extract_package_metadata(test_pkg)
# Portage stores them this way
unicode_msg = const_convert_to_unicode(
"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel")
data['changelog'] = unicode_msg
data['license'] = const_convert_to_unicode('GPL-2')
data['licensedata'] = {
const_convert_to_unicode('GPL-2'): unicode_msg,
}
data['content_safety'] = {
unicode_msg: {
'sha256': "abcdbbcdbcdbcdbcdbcdbcdbcbd",
'mtime': 1024.0,
}
}
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
_misc.clean_pkg_metadata(data)
self.assertEqual(data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom2()
pkg_name = _misc.get_test_package_name2()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self._settings[plug_id]['masking_validation']['cache']
f_match_mask = (1, self.test_db_name,)
self._settings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
self._settings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
def test_db_insert_compare_match_utf2(self):
# insert/compare
test_pkg = _misc.get_test_package3()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
_misc.clean_pkg_metadata(data)
self.assertEqual(data, db_data)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom3()
pkg_name = _misc.get_test_package_name3()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self._settings[plug_id]['masking_validation']['cache']
f_match_mask = (1, self.test_db_name,)
self._settings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
self._settings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
def test_db_insert_compare_match_mime(self):
# insert/compare
test_pkg = _misc.get_test_package4()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
_misc.clean_pkg_metadata(data)
self.assertEqual(data, db_data)
known_mime = set(['application/ogg', 'audio/x-oggflac', 'audio/x-mp3',
'audio/x-pn-realaudio', 'audio/mpeg', 'application/x-ogm-audio',
'audio/vorbis', 'video/x-ms-asf', 'audio/x-speex', 'audio/x-scpls',
'audio/x-vorbis', 'audio/mpegurl', 'audio/aac', 'audio/x-ms-wma',
'audio/ogg', 'audio/x-mpegurl', 'audio/mp4',
'audio/vnd.rn-realaudio', 'audio/x-vorbis+ogg', 'audio/x-musepack',
'audio/x-flac', 'audio/x-wav'])
self.assertEqual(db_data['provided_mime'], known_mime)
# match
nf_match = (-1, 1)
f_match = (1, 0)
pkg_atom = _misc.get_test_package_atom4()
pkg_name = _misc.get_test_package_name4()
self.assertEqual(nf_match, self.test_db.atomMatch("slib"))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_name))
self.assertEqual(f_match,
self.test_db.atomMatch(pkg_atom))
# test package masking
plug_id = self.client_sysset_plugin_id
masking_validation = \
self._settings[plug_id]['masking_validation']['cache']
f_match_mask = (1, self.test_db_name,)
self._settings['live_packagemasking']['mask_matches'].add(
f_match_mask)
masking_validation.clear()
self.assertEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
self._settings['live_packagemasking']['mask_matches'].discard(
f_match_mask)
masking_validation.clear()
self.assertNotEqual((-1, 1), self.test_db.atomMatch(pkg_atom))
def test_db_insert_compare_match_tag(self):
# insert/compare
test_pkg = _misc.get_test_entropy_package_tag()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
_misc.clean_pkg_metadata(data)
self.assertEqual(data, db_data)
# match
f_match = (1, 0)
for atom, pkg_id, branch in self.test_db.listAllPackages():
pkg_key = entropy.dep.dep_getkey(atom)
self.assertEqual(f_match, self.test_db.atomMatch(pkg_key))
self.assertEqual(f_match, self.test_db.atomMatch(atom))
self.assertEqual(f_match, self.test_db.atomMatch("~"+atom))
def test_db_multithread(self):
# insert/compare
test_pkg = _misc.get_test_entropy_package_tag()
data = self.Spm.extract_package_metadata(test_pkg)
_misc.clean_pkg_metadata(data)
def handle_pkg(xdata):
idpackage = self.test_db.addPackage(xdata)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
self.assertEqual(xdata, db_data)
t1 = ParallelTask(handle_pkg, data)
t2 = ParallelTask(handle_pkg, data)
t3 = ParallelTask(handle_pkg, data)
t4 = ParallelTask(handle_pkg, data)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
cur_cache = self.test_db._cursor_pool().keys()
self.assertTrue(len(cur_cache) > 0)
self.test_db._cleanup_all()
cur_cache = self.test_db._cursor_pool().keys()
self.assertEqual(len(cur_cache), 0)
def test_db_close_all(self):
"""
This tests if EntropyRepository.close() really closes
all the resources, including those allocated by the
main thread.
"""
test_pkg = _misc.get_test_entropy_package_tag()
data = self.Spm.extract_package_metadata(test_pkg)
_misc.clean_pkg_metadata(data)
_tmp_data = {
"path": None,
"db": None,
"T1": False,
"T2": False,
"T3": False,
}
def handle_pkg(_tmp_data, xdata):
idpackage = self.test_db.addPackage(xdata)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(db_data)
self.assertEqual(xdata, db_data)
self.test_db.commit()
fd, buf_file = tempfile.mkstemp()
os.close(fd)
buf = open(buf_file, "wb")
set_mute(True)
self.test_db.exportRepository(buf)
set_mute(False)
buf.flush()
buf.close()
fd, buf_file_db = tempfile.mkstemp()
os.close(fd)
self.test_db.importRepository(buf_file, buf_file_db)
os.remove(buf_file)
db = self.Client.open_generic_repository(buf_file_db)
self.assertTrue(db is not None)
pkg_ids = db.listAllPackageIds()
self.assertTrue(1 in pkg_ids)
_tmp_data['path'] = buf_file_db
_tmp_data['db'] = db
_tmp_data['T1'] = True
def select_pkg(_tmp_data, t1):
t1.join() # wait for t1 to finish
pkg_ids = _tmp_data['db'].listAllPackageIds()
self.assertTrue(1 in pkg_ids)
self.assertTrue(len(pkg_ids) == 1)
_tmp_data['T2'] = True
def close_all(_tmp_data, t1, t2):
t1.join()
t2.join()
_tmp_data['db']._cleanup_all(_cleanup_main_thread=False)
with _tmp_data['db']._cursor_pool_mutex():
cur_cache = _tmp_data['db']._cursor_pool().keys()
self.assertEqual(len(cur_cache), 1) # just MainThread
_tmp_data['db'].close()
with _tmp_data['db']._cursor_pool_mutex():
cur_cache = _tmp_data['db']._cursor_pool().keys()
self.assertEqual(len(cur_cache), 0) # nothing left
_tmp_data['T3'] = True
t1 = ParallelTask(handle_pkg, _tmp_data, data)
t1.name = "T1"
t2 = ParallelTask(select_pkg, _tmp_data, t1)
t2.name = "T2"
t3 = ParallelTask(close_all, _tmp_data, t1, t2)
t3.name = "T3"
t1.start()
t2.start()
t1.join()
pkg_ids = _tmp_data['db'].listAllPackageIds()
t2.join()
t3.start()
t3.join()
self.assertTrue(_tmp_data['T1'] and _tmp_data['T2'] \
and _tmp_data['T3'])
self.assertTrue(1 in pkg_ids)
self.assertTrue(len(pkg_ids) == 1)
_tmp_data['db'].close()
with _tmp_data['db']._cursor_pool_mutex():
cur_cache = _tmp_data['db']._cursor_pool().keys()
self.assertEqual(len(cur_cache), 0) # nothing left
os.remove(_tmp_data['path'])
def test_db_reverse_deps(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
test_pkg2 = _misc.get_test_package2()
data2 = self.Spm.extract_package_metadata(test_pkg2)
data['pkg_dependencies'] += ((
_misc.get_test_package_atom2(),
etpConst['dependency_type_ids']['rdepend_id']),)
data2['pkg_dependencies'] += ((
_misc.get_test_package_atom(),
etpConst['dependency_type_ids']['rdepend_id']),)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(data)
_misc.clean_pkg_metadata(db_data)
self.assertEqual(data, db_data)
idpackage2 = self.test_db.addPackage(data2)
db_data2 = self.test_db.getPackageData(idpackage2)
_misc.clean_pkg_metadata(data2)
_misc.clean_pkg_metadata(db_data2)
self.assertEqual(data2, db_data2)
rev_deps = self.test_db.retrieveReverseDependencies(idpackage)
rev_deps2 = self.test_db.retrieveReverseDependencies(idpackage2)
self.assertTrue(idpackage in rev_deps2)
self.assertTrue(idpackage2 in rev_deps)
rev_deps_t = self.test_db.retrieveReverseDependencies(idpackage,
key_slot = True)
self.assertEqual(rev_deps_t, (('app-dicts/aspell-es', '0'),))
pkg_data = self.test_db.retrieveUnusedPackageIds()
self.assertEqual(pkg_data, tuple())
def test_similar(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(data)
_misc.clean_pkg_metadata(db_data)
self.assertEqual(data, db_data)
out = self.test_db.searchSimilarPackages(_misc.get_test_package_name())
self.assertEqual(out, (1,))
def test_search(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(data)
_misc.clean_pkg_metadata(db_data)
self.assertEqual(data, db_data)
out = self.test_db.searchPackages(_misc.get_test_package_name())
self.assertEqual(out, (('sys-libs/zlib-1.2.3-r1', 1, '5'),))
out = self.test_db.searchPackages(_misc.get_test_package_name(),
slot = "0")
self.assertEqual(out, (('sys-libs/zlib-1.2.3-r1', 1, '5'),))
out = self.test_db.searchPackages(_misc.get_test_package_name(),
slot = "0", just_id = True)
self.assertEqual(out, (1,))
def test_list_packages(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
out = self.test_db.listAllPackages()
self.assertEqual(out, (('sys-libs/zlib-1.2.3-r1', 1, '5'),))
def test_spmuids(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
out = self.test_db.listAllSpmUids()
self.assertEqual(out, ((22331, 1),))
def test_list_pkg_ids(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
out = self.test_db.listAllPackageIds(order_by="atom")
self.assertEqual(out, (1,))
def test_list_files(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
out = sorted(self.test_db.listAllFiles())
self.assertEqual(out, sorted([
'/lib64/libz.so', '/usr/share/doc/zlib-1.2.3-r1',
'/usr/share/doc/zlib-1.2.3-r1/algorithm.txt.bz2',
'/usr/share/doc/zlib-1.2.3-r1/FAQ.bz2',
'/usr/share/doc/zlib-1.2.3-r1/ChangeLog.bz2',
'/usr', '/usr/include', '/usr/lib64',
'/usr/share/man/man3/zlib.3.bz2', '/usr/lib64/libz.a', '/lib64',
'/usr/share', '/usr/share/doc/zlib-1.2.3-r1/README.bz2',
'/usr/lib64/libz.so', '/usr/share/man', '/usr/include/zconf.h',
'/lib64/libz.so.1.2.3', '/usr/include/zlib.h', '/usr/share/doc',
'/usr/share/man/man3', '/lib64/libz.so.1'])
)
def test_list_categories(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
out = self.test_db.listAllCategories()
self.assertEqual(out, frozenset(('sys-libs',)))
def test_list_downloads(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
out = self.test_db.listAllDownloads()
self.assertEqual(out, ('sys-libs:zlib-1.2.3-r1.334f75538e6eddde753d9a247609dd8b1123a541.tbz2',))
def test_search_name(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(data)
_misc.clean_pkg_metadata(db_data)
self.assertEqual(db_data, data)
out = self.test_db.searchName(_misc.get_test_package_name())
self.assertEqual(out, frozenset([('sys-libs/zlib-1.2.3-r1', 1)]))
def test_db_indexes(self):
self.test_db.createAllIndexes()
def test_db_import_export(self):
test_pkg = _misc.get_test_package2()
data = self.Spm.extract_package_metadata(test_pkg)
# Portage stores them this way
data['changelog'] = const_convert_to_unicode(
"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel")
data['license'] = const_convert_to_unicode('GPL-2')
data['licensedata'] = {
const_convert_to_unicode('GPL-2'): \
const_convert_to_unicode(
"#248083).\n\n 06 Feb 2009; Ra\xc3\xbal Porcel"),
}
idpackage = self.test_db.addPackage(data)
db_data = self.test_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(data)
_misc.clean_pkg_metadata(db_data)
self.assertEqual(data, db_data)
set_mute(True)
# export
fd, buf_file = tempfile.mkstemp()
os.close(fd)
buf = open(buf_file, "wb")
self.test_db.exportRepository(buf)
buf.flush()
buf.close()
fd, new_db_path = tempfile.mkstemp()
os.close(fd)
self.test_db.importRepository(buf_file, new_db_path)
new_db = self.Client.open_generic_repository(new_db_path)
new_db_data = new_db.getPackageData(idpackage)
_misc.clean_pkg_metadata(new_db_data)
new_db.close()
set_mute(True)
self.assertEqual(new_db_data, db_data)
os.remove(buf_file)
os.remove(new_db_path)
def test_use_defaults(self):
test_pkg = _misc.get_test_package()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
key, slot = self.test_db.retrieveKeySlot(idpackage)
valid_test_deps = [
"%s[%s(+)]" % (key, "doesntexistforsure"),
"%s[-%s(-)]" % (key, "doesntexistforsure"),
"%s[%s(+)]" % (key, "kernel_linux"),
"%s[%s(-)]" % (key, "kernel_linux"),
]
invalid_test_deps = [
"%s[%s(-)]" % (key, "doesntexistforsure"),
"%s[-%s(+)]" % (key, "kernel_linux"),
"%s[-%s(+)]" % (key, "doesntexistforsure"),
"%s[-%s(-)]" % (key, "kernel_linux"),
]
for dep in valid_test_deps:
self.assertEqual((1, 0), self.test_db.atomMatch(dep))
for dep in invalid_test_deps:
self.assertEqual((-1, 1), self.test_db.atomMatch(dep))
def test_db_package_sets(self):
set_name = 'my_test_set'
set_deps = ["app-foo/foo", "app-pling/plong", "media-foo/ajez"]
set_name2 = 'my_test_set2'
set_deps2 = ["app-foo/foo2", "app-pling/plong2", "media-foo/ajez2"]
pkgsets = {
set_name: set(set_deps),
set_name2: set(set_deps2),
}
self.test_db.insertPackageSets(pkgsets)
self.assertEqual(self.test_db.retrievePackageSets(), pkgsets)
set_search = self.test_db.searchSets(set_name2)
self.assertEqual(set([set_name2]), set_search)
def test_db_license_data_str_insert(self):
lic_txt = const_convert_to_rawstring('[3]\xab foo\n\n', 'utf-8')
lic_name = const_convert_to_unicode('CCPL-Attribution-2.0')
lic_data = {lic_name: lic_txt}
self.test_db._insertLicenses(lic_data)
db_lic_txt = self.test_db.retrieveLicenseText(lic_name)
self.assertEqual(db_lic_txt, lic_txt)
def test_settings(self):
self.assertRaises(KeyError, self.test_db.getSetting, "fuck")
self.test_db._setSetting("something_cool", "abcdef\nabcdef")
self.assertEqual(self.test_db.getSetting("something_cool"),
"abcdef\nabcdef")
def test_new_entropyrepository_schema(self):
test_pkg = _misc.get_test_package2()
data = self.Spm.extract_package_metadata(test_pkg)
idpackage = self.test_db.addPackage(data)
old_data = self.test_db.getPackageData(idpackage)
old_base_data = self.test_db.getBaseData(idpackage)
old_cats = self.test_db.listAllCategories()
test_db = self.__open_test_db(":memory:")
idpackage = test_db.addPackage(data)
new_data = test_db.getPackageData(idpackage)
new_base_data = test_db.getBaseData(idpackage)
new_cats = test_db.listAllCategories()
self.assertTrue(test_db._isBaseinfoExtrainfo2010())
self.assertEqual(old_data, new_data)
self.assertEqual(old_base_data, new_base_data)
self.assertEqual(old_cats, new_cats)
test_db.close()
def test_preserved_libs(self):
data = self.test_db.listAllPreservedLibraries()
self.assertEqual(data, tuple())
atom = "app-foo/bar-1.2.3"
fake_data = [
("libfoo.so.3", 2, "/usr/lib64/libfoo.so.3", atom),
("libfoo.so.3", 2, "/usr/lib64/libfoo.so.3.0.0", atom),
("libfoo.so.3", 2, "/usr/lib64/libfoo.so", atom),
("libbar.so.10", 2, "/usr/lib64/libbar.so.10", atom),
("libbar.so.10", 2, "/usr/lib64/libbar.so", atom),
("libbar.so.10", 2, "/usr/lib64/libbar.so.10.0.0", atom),
]
for entry in fake_data:
self.test_db.insertPreservedLibrary(*entry)
# test that insert order equals list
data = self.test_db.listAllPreservedLibraries()
self.assertEqual(tuple(data), tuple(fake_data))
grouped = {}
for lib, elfclass, path, _atom in data:
obj = grouped.setdefault((lib, elfclass), [])
obj.append(path)
# make sure that retrieve works as intended
for (lib, elfclass), paths in grouped.items():
out_paths = self.test_db.retrievePreservedLibraries(
lib, elfclass)
self.assertEqual(sorted(out_paths), sorted(paths))
# test removal
count = len(data)
for lib, elfclass, path, _atom in data:
self.test_db.removePreservedLibrary(lib, elfclass, path)
count -= 1
current_preserved_libs = self.test_db.listAllPreservedLibraries()
self.assertEqual(len(current_preserved_libs), count)
self.assertTrue((lib, elfclass, path) not in current_preserved_libs)
data = self.test_db.listAllPreservedLibraries()
self.assertEqual(data, tuple())
def test_locking_file(self):
fd, db_file = tempfile.mkstemp()
os.close(fd)
test_db = None
try:
test_db = self.Client.open_generic_repository(db_file)
test_db.initializeRepository()
with test_db.shared():
self.assert_(test_db._flock is not None)
test_db.close()
self.assert_(test_db._flock is None)
with test_db.shared():
self.assert_(test_db._flock is not None)
acquired = test_db.try_acquire_shared()
self.assertEqual(acquired, True)
acquired = test_db.try_acquire_exclusive()
self.assertEqual(acquired, True)
pid = os.fork()
if pid == 0:
test_db.close()
acquired = test_db.try_acquire_exclusive()
if acquired:
os._exit(1)
acquired = test_db.try_acquire_shared()
if acquired:
os._exit(1)
os._exit(0)
else:
child_pid, exit_st = os.waitpid(pid, 0)
self.assertEqual(exit_st, 0)
finally:
if test_db is not None:
test_db.close()
os.remove(db_file)
if __name__ == '__main__':
unittest.main()
raise SystemExit(0)