# -*- coding: utf-8 -*- import sys sys.path.insert(0, 'client') sys.path.insert(0, '../../client') sys.path.insert(0, '.') sys.path.insert(0, '../') import unittest import os import shutil import signal import time import tempfile from entropy.client.interfaces import Client from entropy.const import etpConst, etpUi, const_setup_entropy_pid from entropy.core.settings.base import SystemSettings from entropy.db import EntropyRepository from entropy.exceptions import RepositoryError, EntropyPackageException import entropy.tools import tests._misc as _misc class EntropyRepositoryTest(unittest.TestCase): def setUp(self): sys.stdout.write("%s called\n" % (self,)) sys.stdout.flush() self.mem_repoid = "mem_repo" self.mem_repo_desc = "This is a testing repository" self.Client = Client(installed_repo = -1, indexing = False, xcache = False, repo_validation = False) # fake clientDbconn self.Client._installed_repository = self.Client.open_temp_repository( name = etpConst['clientdbid'], temp_file = ":memory:") # as per GenericRepository specifications, enable generic handlePackage self.Client._installed_repository.override_handlePackage = True self.Spm = self.Client.Spm() self._settings = SystemSettings() self.test_pkgs = [_misc.get_entrofoo_test_package()] def tearDown(self): """ tearDown is run after each test """ sys.stdout.write("%s ran\n" % (self,)) sys.stdout.flush() # calling destroy() and shutdown() # need to call destroy() directly to remove all the SystemSettings # plugins because shutdown() doesn't, since it's meant to be called # right before terminating the process self.Client.destroy() self.Client.shutdown() def test_another_instance(self): pid = os.fork() if pid == 0: # child locked = self.Client.another_entropy_running() if locked: os._exit(1) os._exit(0) else: rc = os.waitpid(pid, 0)[1] self.assertNotEqual(rc, 0) def test_singleton(self): myclient = Client(installed_repo = -1) self.assert_(myclient is self.Client) myclient.shutdown() self.assert_(myclient.is_destroyed()) self.assert_(self.Client.is_destroyed()) myclient2 = Client(installed_repo = -1, indexing = False, xcache = False, repo_validation = False) self.assert_(myclient is not myclient2) myclient2.shutdown() self.assert_(myclient2.is_destroyed()) def test_syssetting_backup(self): key1 = 'foo_foo_foo2' key2 = 'asdasdadsadas' val1 = set([1, 2, 3]) val2 = None foo_data = { key1: val1, key2: val2, } self._settings.update(foo_data) self._settings.set_persistent_setting(foo_data) self._settings.clear() self.assertEqual(True, key1 in self._settings) self.assertEqual(True, key2 in self._settings) self.assertEqual(val1, self._settings.get(key1)) self.assertEqual(val2, self._settings.get(key2)) # now remove self._settings.unset_persistent_setting(key1) self._settings.clear() self.assertEqual(False, key1 in self._settings) self.assertEqual(True, key2 in self._settings) self._settings.unset_persistent_setting(key2) self._settings.clear() self.assertEqual(False, key1 in self._settings) self.assertEqual(False, key2 in self._settings) def test_entropy_cacher(self): self.Client._cacher.start() self.assert_(self.Client._cacher.is_started()) self.Client._cacher.stop() self.assert_(not self.Client._cacher.is_started()) def test_cacher_lock_usage(self): cacher = self.Client._cacher tmp_dir = tempfile.mkdtemp() cacher.start() try: with cacher: self.assert_(cacher._EntropyCacher__enter_context_lock._is_owned()) cacher.discard() # even if cacher is paused, this must be saved cacher.save("foo", "bar", cache_dir = tmp_dir) self.assertEqual(cacher.pop("foo", cache_dir = tmp_dir), "bar") finally: cacher.stop() shutil.rmtree(tmp_dir, True) def test_cacher_general_usage(self): cacher = self.Client._cacher tmp_dir = tempfile.mkdtemp() cacher.start() try: with cacher: self.assert_(cacher._EntropyCacher__enter_context_lock._is_owned()) cacher.discard() cacher.push("bar", "foo", cache_dir = tmp_dir) self.assert_(cacher._EntropyCacher__cache_buffer) self.assert_(cacher._EntropyCacher__stashing_cache) cacher.sync() self.assertEqual(cacher.pop("bar", cache_dir = tmp_dir), "foo") finally: cacher.stop() shutil.rmtree(tmp_dir, True) def test_cacher_push_pop_sync(self): cacher = self.Client._cacher tmp_dir = tempfile.mkdtemp() cacher.stop() try: cacher.push("bar", "foo", async = False, cache_dir = tmp_dir) # must return None cacher.sync() self.assertEqual(cacher.pop("bar", cache_dir = tmp_dir), None) finally: shutil.rmtree(tmp_dir, True) def test_clear_cache(self): current_dir = self.Client._cacher.current_directory() test_file = os.path.join(current_dir, "asdasd") with open(test_file, "w") as f: f.flush() self.Client.clear_cache() self.assertEqual(os.listdir(current_dir), []) def test_contentsafety(self): dbconn = self.Client._init_generic_temp_repository( self.mem_repoid, self.mem_repo_desc, temp_file = ":memory:") test_pkg = _misc.get_test_entropy_package5() tmp_dir = tempfile.mkdtemp() rc = entropy.tools.uncompress_tarball(test_pkg, extract_path = tmp_dir) self.assertEqual(rc, 0) data = self.Spm.extract_package_metadata(test_pkg) idpackage = dbconn.addPackage(data) self.assertEqual(data, dbconn.getPackageData(idpackage)) cs_data = dbconn.retrieveContentSafety(idpackage) for path, cs_info in cs_data.items(): real_path = os.path.join(tmp_dir, path.lstrip("/")) self.assertEqual(os.path.getmtime(real_path), cs_info['mtime']) shutil.rmtree(tmp_dir) def test_memory_repository(self): dbconn = self.Client._init_generic_temp_repository( self.mem_repoid, self.mem_repo_desc, temp_file = ":memory:") test_pkg = _misc.get_test_package() data = self.Spm.extract_package_metadata(test_pkg) idpackage = dbconn.addPackage(data) self.assertEqual(data, dbconn.getPackageData(idpackage)) self.Client.remove_repository(self.mem_repoid) self.assertNotEqual( self.Client._memory_db_instances.get(self.mem_repoid), dbconn) def test_load(): etpUi['mute'] = True self.Client.open_repository(self.mem_repoid) etpUi['mute'] = False self.assertRaises(RepositoryError, test_load) def test_package_repository(self): test_pkg = _misc.get_test_entropy_package() # this might fail on 32bit arches atoms_contained = [] try: atoms_contained = self.Client.add_package_repository(test_pkg) except EntropyPackageException as err: if etpConst['currentarch'] == "amd64": raise self.assertEqual(str(err), "invalid architecture") else: self.assertNotEqual([], atoms_contained) for idpackage, repoid in atoms_contained: dbconn = self.Client.open_repository(repoid) self.assertNotEqual(None, dbconn.getPackageData(idpackage)) self.assertNotEqual(None, dbconn.retrieveAtom(idpackage)) def test_package_installation(self): for pkg_path, pkg_atom in self.test_pkgs: self._do_pkg_test(pkg_path, pkg_atom) def test_shell_trigger(self): dbconn = self.Client._init_generic_temp_repository( self.mem_repoid, self.mem_repo_desc, temp_file = ":memory:") test_pkg = _misc.get_test_package() data = self.Spm.extract_package_metadata(test_pkg) idpackage = dbconn.addPackage(data) pkgdata = dbconn.getTriggerData(idpackage) pkgdata['trigger'] = """\ #!%s echo $@ exit 42 """ % (etpConst['trigger_sh_interpreter'],) trigger = self.Client.Triggers('postinstall', pkgdata) trigger.prepare() exit_st = trigger._do_trigger_call_ext_generic() trigger.kill() self.assertEqual(exit_st, 42) def _do_pkg_test(self, pkg_path, pkg_atom): # this test might be considered controversial, for now, let's keep it # here, we use equo stuff to make sure it keeps working import text_smart # we need to tweak the default unpack dir to make pkg install available # for uids != 0 temp_unpack = tempfile.mkdtemp() old_unpackdir = etpConst['entropyunpackdir'] etpConst['entropyunpackdir'] = temp_unpack fake_root = tempfile.mkdtemp() pkg_dir = tempfile.mkdtemp() inst_dir = tempfile.mkdtemp() rc = text_smart.inflate_handler(self.Client, [pkg_path], pkg_dir) self.assert_(rc == 0) self.assert_(os.listdir(pkg_dir)) etp_pkg = os.path.join(pkg_dir, os.listdir(pkg_dir)[0]) self.assert_(os.path.isfile(etp_pkg)) matches = [] try: matches = self.Client.add_package_repository(etp_pkg) except EntropyPackageException as err: if etpConst['currentarch'] == "amd64": raise self.assertEqual(str(err), "invalid architecture") else: self.assertNotEqual(matches, []) for match in matches: my_p = self.Client.Package() my_p.prepare(match, "install", {}) # unit testing metadata setting, of course, undocumented my_p.pkgmeta['unittest_root'] = fake_root rc = my_p.run() self.assert_(rc == 0) # remove pkg idpackages = self.Client.installed_repository().listAllPackageIds() for idpackage in idpackages: my_p = self.Client.Package() my_p.prepare((idpackage,), "remove", {}) rc = my_p.run() self.assert_(rc == 0) # done installing shutil.rmtree(pkg_dir, True) shutil.rmtree(temp_unpack, True) shutil.rmtree(fake_root, True) # restore orig const value etpConst['entropyunpackdir'] = old_unpackdir if __name__ == '__main__': if "--debug" in sys.argv: sys.argv.remove("--debug") from entropy.const import etpUi etpUi['debug'] = True unittest.main() entropy.tools.kill_threads() raise SystemExit(0)