e406a4edaa
The old "dependencies" metadata is deprecated. It was found that the generated metadata might get corrupted by colliding atom strings. The new implementation avoids collisions completely and is more efficient.
458 lines
17 KiB
Python
458 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
|
import sys
|
|
sys.path.insert(0, 'client')
|
|
sys.path.insert(0, '../../client')
|
|
sys.path.insert(0, '.')
|
|
sys.path.insert(0, '../')
|
|
import unittest
|
|
import os
|
|
import shutil
|
|
import signal
|
|
import time
|
|
import tempfile
|
|
|
|
from entropy.client.interfaces import Client
|
|
from entropy.client.interfaces.db import InstalledPackagesRepository
|
|
from entropy.cache import EntropyCacher
|
|
from entropy.const import etpConst
|
|
from entropy.output import set_mute
|
|
from entropy.core.settings.base import SystemSettings
|
|
from entropy.db import EntropyRepository
|
|
from entropy.exceptions import RepositoryError, EntropyPackageException
|
|
import entropy.tools
|
|
import tests._misc as _misc
|
|
|
|
class EntropyClientTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
sys.stdout.write("%s called\n" % (self,))
|
|
sys.stdout.flush()
|
|
self.mem_repoid = "mem_repo"
|
|
self.mem_repo_desc = "This is a testing repository"
|
|
self.Client = Client(installed_repo = -1, indexing = False,
|
|
xcache = False, repo_validation = False)
|
|
self.Client._installed_repository = self.Client.open_temp_repository(
|
|
name = InstalledPackagesRepository.NAME, temp_file = ":memory:")
|
|
# as per GenericRepository specifications, enable generic handlePackage
|
|
self.Client._installed_repository.override_handlePackage = True
|
|
self.Spm = self.Client.Spm()
|
|
self._settings = SystemSettings()
|
|
self.test_pkgs = [_misc.get_entrofoo_test_package()]
|
|
|
|
def tearDown(self):
|
|
"""
|
|
tearDown is run after each test
|
|
"""
|
|
sys.stdout.write("%s ran\n" % (self,))
|
|
sys.stdout.flush()
|
|
# calling destroy() and shutdown()
|
|
# need to call destroy() directly to remove all the SystemSettings
|
|
# plugins because shutdown() doesn't, since it's meant to be called
|
|
# right before terminating the process
|
|
self.Client.destroy()
|
|
self.Client.shutdown()
|
|
|
|
def test_singleton(self):
|
|
myclient = Client(installed_repo = -1)
|
|
self.assertTrue(myclient is self.Client)
|
|
myclient.shutdown()
|
|
self.assertTrue(myclient.is_destroyed())
|
|
self.assertTrue(self.Client.is_destroyed())
|
|
myclient2 = Client(installed_repo = -1, indexing = False,
|
|
xcache = False, repo_validation = False)
|
|
self.assertTrue(myclient is not myclient2)
|
|
myclient2.shutdown()
|
|
self.assertTrue(myclient2.is_destroyed())
|
|
|
|
def test_syssetting_backup(self):
|
|
key1 = 'foo_foo_foo2'
|
|
key2 = 'asdasdadsadas'
|
|
val1 = set([1, 2, 3])
|
|
val2 = None
|
|
foo_data = {
|
|
key1: val1,
|
|
key2: val2,
|
|
}
|
|
self._settings.update(foo_data)
|
|
self._settings.set_persistent_setting(foo_data)
|
|
self._settings.clear()
|
|
self.assertEqual(True, key1 in self._settings)
|
|
self.assertEqual(True, key2 in self._settings)
|
|
self.assertEqual(val1, self._settings.get(key1))
|
|
self.assertEqual(val2, self._settings.get(key2))
|
|
|
|
# now remove
|
|
self._settings.unset_persistent_setting(key1)
|
|
self._settings.clear()
|
|
self.assertEqual(False, key1 in self._settings)
|
|
self.assertEqual(True, key2 in self._settings)
|
|
|
|
self._settings.unset_persistent_setting(key2)
|
|
self._settings.clear()
|
|
self.assertEqual(False, key1 in self._settings)
|
|
self.assertEqual(False, key2 in self._settings)
|
|
|
|
def test_entropy_cacher(self):
|
|
self.Client._cacher.start()
|
|
self.assertTrue(self.Client._cacher.is_started())
|
|
self.Client._cacher.stop()
|
|
self.assertTrue(not self.Client._cacher.is_started())
|
|
|
|
def test_cacher_lock_usage(self):
|
|
cacher = self.Client._cacher
|
|
tmp_dir = tempfile.mkdtemp()
|
|
cacher.start()
|
|
try:
|
|
with cacher:
|
|
self.assertTrue(cacher._EntropyCacher__enter_context_lock._is_owned())
|
|
cacher.discard()
|
|
# even if cacher is paused, this must be saved
|
|
cacher.save("foo", "bar", cache_dir = tmp_dir)
|
|
self.assertEqual(cacher.pop("foo", cache_dir = tmp_dir), "bar")
|
|
finally:
|
|
cacher.stop()
|
|
shutil.rmtree(tmp_dir, True)
|
|
|
|
def test_cacher_general_usage(self):
|
|
cacher = self.Client._cacher
|
|
tmp_dir = tempfile.mkdtemp()
|
|
cacher.start()
|
|
st_val = EntropyCacher.STASHING_CACHE
|
|
try:
|
|
EntropyCacher.STASHING_CACHE = True
|
|
with cacher:
|
|
self.assertTrue(
|
|
cacher._EntropyCacher__enter_context_lock._is_owned())
|
|
cacher.discard()
|
|
cacher.push("bar", "foo", cache_dir = tmp_dir)
|
|
self.assertTrue(cacher._EntropyCacher__cache_buffer)
|
|
self.assertTrue(cacher._EntropyCacher__stashing_cache)
|
|
cacher.sync()
|
|
self.assertEqual(cacher.pop("bar", cache_dir = tmp_dir), "foo")
|
|
finally:
|
|
EntropyCacher.STASHING_CACHE = st_val
|
|
cacher.stop()
|
|
shutil.rmtree(tmp_dir, True)
|
|
|
|
def test_cacher_push_pop_sync(self):
|
|
cacher = self.Client._cacher
|
|
tmp_dir = tempfile.mkdtemp()
|
|
cacher.stop()
|
|
try:
|
|
cacher.push("bar", "foo", async = False, cache_dir = tmp_dir)
|
|
# must return None
|
|
cacher.sync()
|
|
self.assertEqual(cacher.pop("bar", cache_dir = tmp_dir), None)
|
|
finally:
|
|
shutil.rmtree(tmp_dir, True)
|
|
|
|
def test_clear_cache(self):
|
|
current_dir = self.Client._cacher.current_directory()
|
|
test_file = os.path.join(current_dir, "asdasd")
|
|
with open(test_file, "w") as f:
|
|
f.flush()
|
|
self.Client.clear_cache()
|
|
self.assertEqual(os.listdir(current_dir), [])
|
|
|
|
def test_contentsafety(self):
|
|
dbconn = self.Client._init_generic_temp_repository(
|
|
self.mem_repoid, self.mem_repo_desc, temp_file = ":memory:")
|
|
test_pkg = _misc.get_test_entropy_package5()
|
|
tmp_dir = tempfile.mkdtemp()
|
|
rc = entropy.tools.uncompress_tarball(test_pkg, extract_path = tmp_dir)
|
|
self.assertEqual(rc, 0)
|
|
|
|
data = self.Spm.extract_package_metadata(test_pkg)
|
|
idpackage = dbconn.addPackage(data)
|
|
db_data = dbconn.getPackageData(idpackage)
|
|
|
|
_misc.clean_pkg_metadata(data)
|
|
_misc.clean_pkg_metadata(db_data)
|
|
self.assertEqual(data, db_data)
|
|
|
|
cs_data = dbconn.retrieveContentSafety(idpackage)
|
|
for path, cs_info in cs_data.items():
|
|
real_path = os.path.join(tmp_dir, path.lstrip("/"))
|
|
self.assertEqual(os.path.getmtime(real_path), cs_info['mtime'])
|
|
shutil.rmtree(tmp_dir)
|
|
|
|
def test_memory_repository(self):
|
|
dbconn = self.Client._init_generic_temp_repository(
|
|
self.mem_repoid, self.mem_repo_desc, temp_file = ":memory:")
|
|
test_pkg = _misc.get_test_package()
|
|
data = self.Spm.extract_package_metadata(test_pkg)
|
|
idpackage = dbconn.addPackage(data)
|
|
db_data = dbconn.getPackageData(idpackage)
|
|
|
|
_misc.clean_pkg_metadata(data)
|
|
_misc.clean_pkg_metadata(db_data)
|
|
self.assertEqual(data, db_data)
|
|
|
|
self.Client.remove_repository(self.mem_repoid)
|
|
self.assertNotEqual(
|
|
self.Client._memory_db_instances.get(self.mem_repoid), dbconn)
|
|
def test_load():
|
|
set_mute(True)
|
|
self.Client.open_repository(self.mem_repoid)
|
|
set_mute(False)
|
|
self.assertRaises(RepositoryError, test_load)
|
|
|
|
def test_package_repository(self):
|
|
test_pkg = _misc.get_test_entropy_package()
|
|
# this might fail on 32bit arches
|
|
atoms_contained = []
|
|
try:
|
|
atoms_contained = self.Client.add_package_repository(test_pkg)
|
|
except EntropyPackageException as err:
|
|
if etpConst['currentarch'] == "amd64":
|
|
raise
|
|
self.assertEqual(str(err), "invalid architecture")
|
|
else:
|
|
self.assertNotEqual([], atoms_contained)
|
|
for idpackage, repoid in atoms_contained:
|
|
dbconn = self.Client.open_repository(repoid)
|
|
self.assertNotEqual(None, dbconn.getPackageData(idpackage))
|
|
self.assertNotEqual(None, dbconn.retrieveAtom(idpackage))
|
|
|
|
def test_package_installation(self):
|
|
for pkg_path, pkg_atom in self.test_pkgs:
|
|
self._do_pkg_test(pkg_path, pkg_atom)
|
|
|
|
def test_package_installation_new_api(self):
|
|
for pkg_path, pkg_atom in self.test_pkgs:
|
|
self._do_pkg_test_new_api(pkg_path, pkg_atom)
|
|
|
|
def test_shell_trigger(self):
|
|
dbconn = self.Client._init_generic_temp_repository(
|
|
self.mem_repoid, self.mem_repo_desc, temp_file = ":memory:")
|
|
test_pkg = _misc.get_test_package()
|
|
data = self.Spm.extract_package_metadata(test_pkg)
|
|
idpackage = dbconn.addPackage(data)
|
|
pkgdata = dbconn.getTriggerData(idpackage)
|
|
trigger = None
|
|
try:
|
|
pkgdata['affected_directories'] = set()
|
|
pkgdata['affected_infofiles'] = set()
|
|
pkgdata['trigger'] = """\
|
|
#!%s
|
|
echo $@
|
|
exit 42
|
|
""" % (etpConst['trigger_sh_interpreter'],)
|
|
trigger = self.Client.Triggers(
|
|
"install", 'postinstall', pkgdata, pkgdata)
|
|
trigger.prepare()
|
|
exit_st = trigger._do_trigger_call_ext_generic()
|
|
trigger.kill()
|
|
finally:
|
|
if trigger is not None:
|
|
trigger.kill()
|
|
|
|
self.assertEqual(exit_st, 42)
|
|
|
|
def test_python_trigger(self):
|
|
dbconn = self.Client._init_generic_temp_repository(
|
|
self.mem_repoid, self.mem_repo_desc, temp_file = ":memory:")
|
|
test_pkg = _misc.get_test_package()
|
|
data = self.Spm.extract_package_metadata(test_pkg)
|
|
idpackage = dbconn.addPackage(data)
|
|
pkgdata = dbconn.getTriggerData(idpackage)
|
|
trigger = None
|
|
try:
|
|
pkgdata['affected_directories'] = set()
|
|
pkgdata['affected_infofiles'] = set()
|
|
pkgdata['trigger'] = """\
|
|
import os
|
|
os.system("echo hello")
|
|
my_ext_status = 42
|
|
"""
|
|
trigger = self.Client.Triggers(
|
|
"install", 'postinstall', pkgdata, pkgdata)
|
|
trigger.prepare()
|
|
exit_st = trigger._do_trigger_call_ext_generic()
|
|
finally:
|
|
if trigger is not None:
|
|
trigger.kill()
|
|
self.assertEqual(exit_st, 42)
|
|
|
|
def test_python_trigger2(self):
|
|
dbconn = self.Client._init_generic_temp_repository(
|
|
self.mem_repoid, self.mem_repo_desc, temp_file = ":memory:")
|
|
test_pkg = _misc.get_test_package()
|
|
data = self.Spm.extract_package_metadata(test_pkg)
|
|
idpackage = dbconn.addPackage(data)
|
|
pkgdata = dbconn.getTriggerData(idpackage)
|
|
trigger = None
|
|
try:
|
|
pkgdata['affected_directories'] = set()
|
|
pkgdata['affected_infofiles'] = set()
|
|
pkgdata['trigger'] = """\
|
|
import os
|
|
import subprocess
|
|
from entropy.const import etpConst
|
|
|
|
def configure_correct_gcc():
|
|
gcc_target = "4.5"
|
|
uname_arch = os.uname()[4]
|
|
gcc_dir = etpConst['systemroot'] + "/etc/env.d/gcc"
|
|
gcc_profile_file_pfx = uname_arch + "-pc-linux-gnu-" + gcc_target
|
|
gcc_profile_file = None
|
|
for curdir, subs, files in os.walk(gcc_dir):
|
|
for fname in files:
|
|
if fname.startswith(gcc_profile_file_pfx):
|
|
gcc_profile_file = fname
|
|
break
|
|
break
|
|
if gcc_profile_file is not None:
|
|
subprocess.call(("echo", gcc_profile_file))
|
|
return 42
|
|
|
|
if stage == "postinstall":
|
|
my_ext_status = configure_correct_gcc()
|
|
else:
|
|
my_ext_status = 0
|
|
"""
|
|
trigger = self.Client.Triggers(
|
|
"install", 'postinstall', pkgdata, pkgdata)
|
|
trigger.prepare()
|
|
exit_st = trigger._do_trigger_call_ext_generic()
|
|
finally:
|
|
if trigger is not None:
|
|
trigger.kill()
|
|
|
|
self.assertEqual(exit_st, 42)
|
|
|
|
def _do_pkg_test(self, pkg_path, pkg_atom):
|
|
|
|
# this test might be considered controversial, for now, let's keep it
|
|
# here, we use equo stuff to make sure it keeps working
|
|
from solo.commands.pkg import SoloPkg
|
|
|
|
# we need to tweak the default unpack dir to make pkg install available
|
|
# for uids != 0
|
|
temp_unpack = tempfile.mkdtemp()
|
|
old_unpackdir = etpConst['entropyunpackdir']
|
|
etpConst['entropyunpackdir'] = temp_unpack
|
|
|
|
fake_root = tempfile.mkdtemp()
|
|
pkg_dir = tempfile.mkdtemp()
|
|
inst_dir = tempfile.mkdtemp()
|
|
|
|
s_pkg = SoloPkg(["inflate", pkg_path, "--savedir", pkg_dir])
|
|
func, func_args = s_pkg.parse()
|
|
# do not call func directly because the real method is
|
|
# wrapper around a lock call
|
|
rc = s_pkg._inflate(self.Client)
|
|
self.assertTrue(rc == 0)
|
|
self.assertTrue(os.listdir(pkg_dir))
|
|
|
|
etp_pkg = os.path.join(pkg_dir, os.listdir(pkg_dir)[0])
|
|
self.assertTrue(os.path.isfile(etp_pkg))
|
|
|
|
matches = []
|
|
try:
|
|
matches = self.Client.add_package_repository(etp_pkg)
|
|
except EntropyPackageException as err:
|
|
if etpConst['currentarch'] == "amd64":
|
|
raise
|
|
self.assertEqual(str(err), "invalid architecture")
|
|
else:
|
|
self.assertNotEqual(matches, [])
|
|
for match in matches:
|
|
my_p = self.Client.Package()
|
|
my_p.prepare(match, "install", {})
|
|
# unit testing metadata setting, of course, undocumented
|
|
my_p.pkgmeta['unittest_root'] = fake_root
|
|
rc = my_p.run()
|
|
self.assertTrue(rc == 0)
|
|
|
|
# remove pkg
|
|
idpackages = self.Client.installed_repository().listAllPackageIds()
|
|
for idpackage in idpackages:
|
|
my_p = self.Client.Package()
|
|
my_p.prepare((idpackage,), "remove", {})
|
|
rc = my_p.run()
|
|
self.assertTrue(rc == 0)
|
|
|
|
# done installing
|
|
shutil.rmtree(pkg_dir, True)
|
|
shutil.rmtree(temp_unpack, True)
|
|
shutil.rmtree(fake_root, True)
|
|
|
|
# restore orig const value
|
|
etpConst['entropyunpackdir'] = old_unpackdir
|
|
|
|
def _do_pkg_test_new_api(self, pkg_path, pkg_atom):
|
|
|
|
# this test might be considered controversial, for now, let's keep it
|
|
# here, we use equo stuff to make sure it keeps working
|
|
from solo.commands.pkg import SoloPkg
|
|
|
|
# we need to tweak the default unpack dir to make pkg install available
|
|
# for uids != 0
|
|
temp_unpack = tempfile.mkdtemp()
|
|
old_unpackdir = etpConst['entropyunpackdir']
|
|
etpConst['entropyunpackdir'] = temp_unpack
|
|
|
|
fake_root = tempfile.mkdtemp()
|
|
pkg_dir = tempfile.mkdtemp()
|
|
inst_dir = tempfile.mkdtemp()
|
|
|
|
s_pkg = SoloPkg(["inflate", pkg_path, "--savedir", pkg_dir])
|
|
func, func_args = s_pkg.parse()
|
|
# do not call func directly because the real method is
|
|
# wrapper around a lock call
|
|
rc = s_pkg._inflate(self.Client)
|
|
self.assertTrue(rc == 0)
|
|
self.assertTrue(os.listdir(pkg_dir))
|
|
|
|
etp_pkg = os.path.join(pkg_dir, os.listdir(pkg_dir)[0])
|
|
self.assertTrue(os.path.isfile(etp_pkg))
|
|
|
|
matches = []
|
|
try:
|
|
matches = self.Client.add_package_repository(etp_pkg)
|
|
except EntropyPackageException as err:
|
|
if etpConst['currentarch'] == "amd64":
|
|
raise
|
|
self.assertEqual(str(err), "invalid architecture")
|
|
else:
|
|
self.assertNotEqual(matches, [])
|
|
|
|
action_factory = self.Client.PackageActionFactory()
|
|
for match in matches:
|
|
my_p = action_factory.get(
|
|
action_factory.INSTALL_ACTION,
|
|
match)
|
|
my_p.setup()
|
|
|
|
# unit testing metadata setting, of course, undocumented
|
|
my_p.metadata()['unittest_root'] = fake_root
|
|
|
|
rc = my_p.start()
|
|
self.assertTrue(rc == 0)
|
|
my_p.finalize()
|
|
|
|
# remove pkg
|
|
idpackages = self.Client.installed_repository().listAllPackageIds()
|
|
for idpackage in idpackages:
|
|
my_p = action_factory.get(
|
|
action_factory.REMOVE_ACTION,
|
|
(idpackage, self.Client.installed_repository().name))
|
|
rc = my_p.start()
|
|
self.assertTrue(rc == 0)
|
|
my_p.finalize()
|
|
|
|
# done installing
|
|
shutil.rmtree(pkg_dir, True)
|
|
shutil.rmtree(temp_unpack, True)
|
|
shutil.rmtree(fake_root, True)
|
|
|
|
# restore orig const value
|
|
etpConst['entropyunpackdir'] = old_unpackdir
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
entropy.tools.kill_threads()
|
|
raise SystemExit(0)
|