# -*- coding: utf-8 -*- import sys import os sys.path.insert(0, '.') sys.path.insert(0, '../') import unittest from entropy.const import const_convert_to_rawstring, const_convert_to_unicode import entropy.tools as et from entropy.client.interfaces import Client from entropy.output import print_generic import tests._misc as _misc import tempfile import subprocess import shutil import stat class ToolsTest(unittest.TestCase): def setUp(self): sys.stdout.write("%s called\n" % (self,)) sys.stdout.flush() self.test_pkg = _misc.get_test_entropy_package() self.test_pkg2 = _misc.get_test_entropy_package2() self.test_pkg3 = _misc.get_test_entropy_package3() self.test_pkgs = [self.test_pkg, self.test_pkg2, self.test_pkg3] def tearDown(self): """ tearDown is run after each test """ sys.stdout.write("%s ran\n" % (self,)) sys.stdout.flush() def test_dump_entropy_metadata(self): client = Client(noclientdb = 2, indexing = False, xcache = False, repo_validation = False) fd, tmp_path = tempfile.mkstemp() for test_pkg in self.test_pkgs: et.dump_entropy_metadata(test_pkg, tmp_path) self.assertNotEqual(tmp_path, None) dbconn = client.open_generic_repository(tmp_path) dbconn.validateDatabase() dbconn.listAllPackageIds() dbconn.closeDB() os.close(fd) os.remove(tmp_path) def test_remove_entropy_metadata(self): fd, tmp_path = tempfile.mkstemp() os.close(fd) for test_pkg in self.test_pkgs: self.assert_(et.is_entropy_package_file(test_pkg)) ext_rc = et.remove_entropy_metadata(test_pkg, tmp_path) self.assertNotEqual(ext_rc, False) self.assert_(os.path.isfile(tmp_path)) self.assert_(not et.is_entropy_package_file(tmp_path)) os.remove(tmp_path) def test_tb(self): # traceback test tb = None try: raise ValueError() except ValueError: tb = et.get_traceback() self.assert_(tb) def test_get_remote_data(self): fd, tmp_path = tempfile.mkstemp() msg = const_convert_to_rawstring("helloàòè\n", 'utf-8') os.write(fd, msg) os.fsync(fd) recv_msg = et.get_remote_data("file://"+tmp_path) self.assertEqual([msg], recv_msg) os.close(fd) os.remove(tmp_path) def test_supported_img_file(self): fd, tmp_path = tempfile.mkstemp() # create gif os.write(fd, const_convert_to_rawstring("GIF89xxx")) os.fsync(fd) self.assert_(et.is_supported_image_file(tmp_path)) self.assert_(et._is_gif_file(tmp_path)) os.close(fd) os.remove(tmp_path) png_file = _misc.get_png() self.assert_(et.is_supported_image_file(png_file)) self.assert_(et._is_png_file(png_file)) def test_valid_ascii(self): valid = "ciao" non_valid = "òèàò" self.assert_(et.is_valid_ascii(valid)) self.assert_(not et.is_valid_ascii(non_valid)) def test_valid_package_tag(self): valid = "ciao" invalids = ["òpl", "hello,hello", "#hello"] self.assert_(et.is_valid_package_tag(valid)) for invalid in invalids: self.assert_(not et.is_valid_package_tag(invalid)) def test_is_valid_unicode(self): valid = "ciao" valid2 = const_convert_to_unicode("òèàò", 'utf-8') self.assert_(et.is_valid_unicode(valid)) self.assert_(et.is_valid_unicode(valid2)) def test_is_valid_email(self): valid = "entropy@entropy.it" non_valid = "entropy.entropy.it" self.assert_(et.is_valid_email(valid)) self.assert_(not et.is_valid_email(non_valid)) def test_islive(self): self.assert_(not et.islive()) def test_get_file_size(self): fd, tmp_path = tempfile.mkstemp() os.write(fd, const_convert_to_rawstring("ciao")) os.fsync(fd) self.assertEqual(et.get_file_size(tmp_path), 4) os.close(fd) os.remove(tmp_path) def test_sum_file_sizes(self): fd, tmp_path = tempfile.mkstemp() fd2, tmp_path2 = tempfile.mkstemp() os.write(fd, const_convert_to_rawstring("ciao")) os.write(fd2, const_convert_to_rawstring("ciao")) self.assertEqual(et.sum_file_sizes([tmp_path, tmp_path2]), 8) os.close(fd) os.remove(tmp_path) os.close(fd2) os.remove(tmp_path2) def test_check_required_space(self): self.assert_(et.check_required_space("/", 10), True) def test_getstatusoutput(self): cmd = "echo hello" out = et.getstatusoutput(cmd) self.assertEqual(out, (0, "hello",)) def test_movefile(self): # move file fd, tmp_path = tempfile.mkstemp() os.close(fd) dest_path = tmp_path + "foo" self.assert_(et.movefile(tmp_path, dest_path)) self.assert_(os.stat(dest_path)) os.remove(dest_path) # move symlink fd, tmp_path = tempfile.mkstemp() dst_link = tmp_path + "foo2" dst_final_link = dst_link + "foo3" os.symlink(tmp_path, dst_link) self.assert_(et.movefile(dst_link, dst_final_link)) self.assert_(os.stat(dst_final_link)) os.close(fd) os.remove(dst_final_link) def test_get_random_number(self): rand1 = et.get_random_number() rand2 = et.get_random_number() self.assert_(isinstance(rand1, int)) self.assert_(rand1 != rand2) def test_split_indexable_into_chunks(self): indexable = "asdXasdXasdXasdXasdXasdXasdXasdXasd" result = ['asd', 'Xas', 'dXa', 'sdX', 'asd', 'Xas', 'dXa', 'sdX', 'asd', 'Xas', 'dXa', 'sd'] out = et.split_indexable_into_chunks(indexable, 3) self.assert_(out) self.assert_(out == result) def test_shaXXX(self): fd, tmp_path = tempfile.mkstemp() os.write(fd, const_convert_to_rawstring("this is the life")) os.fsync(fd) sha1 = et.sha1(tmp_path) sha256 = et.sha256(tmp_path) sha512 = et.sha512(tmp_path) r_sha1 = "105de2055ac81db7b02a27623b7e73932788df95" r_sha256 = "ccb134af19d748c9c845b26a1e3a29e6ea356d1f1e0ad47d57b83f38c5492988" r_sha512 = "a1e88ef22f5884cd8b205c2e6e2b17ae0f7597df81fabc6a4e77454ac688102df8986353c9f97981b4274f61c6a0ca4c74ec2fd49997b4a35d4091d94dc8a64e" self.assertEqual(sha1, r_sha1) self.assertEqual(sha256, r_sha256) self.assertEqual(sha512, r_sha512) os.close(fd) os.remove(tmp_path) def test_md5sum_directory(self): tmp_dir = tempfile.mkdtemp() f = open(os.path.join(tmp_dir, "foo"), "w") f.write("hello world") f.flush() f.close() r_md5dir = "5eb63bbbe01eeed093cb22bb8f5acdc3" md5dir = et.md5sum_directory(tmp_dir) self.assertEqual(md5dir, r_md5dir) mobj = et.md5obj_directory(tmp_dir) self.assertEqual(mobj.hexdigest(), r_md5dir) shutil.rmtree(tmp_dir) def test_XXcompress_file(self): import bz2 fd, tmp_path = tempfile.mkstemp() os.write(fd, const_convert_to_rawstring("this is the life")) os.fsync(fd) orig_md5 = et.md5sum(tmp_path) new_path = tmp_path + ".bz2" opener = bz2.BZ2File et.compress_file(tmp_path, new_path, opener) comp_md5 = "69b9fc26f7cb561a067a10b06d890242" md5 = et.md5sum(new_path) self.assertEqual(md5, comp_md5) os.close(fd) et.uncompress_file(new_path, tmp_path, opener) self.assertEqual(orig_md5, et.md5sum(tmp_path)) os.remove(tmp_path) os.remove(new_path) def test_unpack_gzip(self): import gzip fd, tmp_path = tempfile.mkstemp(suffix = ".gz") gz_f = gzip.GzipFile(tmp_path, "wb") gz_f.write(const_convert_to_rawstring("ciao ciao ciao")) gz_f.close() new_path = et.unpack_gzip(tmp_path) self.assert_(os.stat(new_path)) orig_md5 = "b40d18c97e6461678f264c4524f6cc7c" self.assertEqual(orig_md5, et.md5sum(new_path)) os.close(fd) os.remove(tmp_path) os.remove(new_path) def test_unpack_bzip2(self): import bz2 fd, tmp_path = tempfile.mkstemp(suffix = ".bz2") gz_f = bz2.BZ2File(tmp_path, "wb") gz_f.write(const_convert_to_rawstring("ciao ciao ciao")) gz_f.close() new_path = et.unpack_bzip2(tmp_path) self.assert_(os.stat(new_path)) orig_md5 = "b40d18c97e6461678f264c4524f6cc7c" self.assertEqual(orig_md5, et.md5sum(new_path)) os.close(fd) os.remove(tmp_path) os.remove(new_path) def test_remove_entropy_metadata2(self): fd, tmp_path = tempfile.mkstemp() os.close(fd) ext_rc = et.remove_entropy_metadata( _misc.get_test_entropy_package4(), tmp_path) self.assert_(ext_rc) orig_md5 = "ab1f147202838c6ee9c6fc3511537279" self.assertEqual(orig_md5, et.md5sum(tmp_path)) os.remove(tmp_path) def test_create_md5_file(self): fd, tmp_path = tempfile.mkstemp( suffix = const_convert_to_unicode("òèà", 'utf-8')) os.write(fd, const_convert_to_rawstring("hello")) os.fsync(fd) cksum = "5d41402abc4b2a76b9719d911017c592" md5_path = et.create_md5_file(tmp_path) md5_f = open(md5_path, "rb") orig_cont = const_convert_to_rawstring( cksum + " " + \ os.path.basename(tmp_path) + "\n", 'utf-8') self.assertEqual(orig_cont, md5_f.read()) self.assert_(et.compare_md5(tmp_path, cksum)) md5_f.close() os.close(fd) os.remove(tmp_path) os.remove(md5_path) def test_create_sha1_file(self): fd, tmp_path = tempfile.mkstemp( suffix = const_convert_to_unicode("òèà", 'utf-8')) os.write(fd, const_convert_to_rawstring("hello")) os.fsync(fd) cksum = "aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d" sha_path = et.create_sha1_file(tmp_path) sha_f = open(sha_path, "rb") orig_cont = const_convert_to_rawstring( cksum + " " + \ os.path.basename(tmp_path) + "\n", 'utf-8') self.assertEqual(orig_cont, sha_f.read()) self.assert_(et.compare_sha1(tmp_path, cksum)) sha_f.close() os.close(fd) os.remove(tmp_path) os.remove(sha_path) def test_create_sha256_file(self): fd, tmp_path = tempfile.mkstemp( suffix = const_convert_to_unicode("òèà", 'utf-8')) os.write(fd, const_convert_to_rawstring("hello")) os.fsync(fd) cksum = "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" sha_path = et.create_sha256_file(tmp_path) sha_f = open(sha_path, "rb") orig_cont = const_convert_to_rawstring( cksum + " " + \ os.path.basename(tmp_path) + "\n", 'utf-8') self.assertEqual(orig_cont, sha_f.read()) self.assert_(et.compare_sha256(tmp_path, cksum)) sha_f.close() os.close(fd) os.remove(tmp_path) os.remove(sha_path) def test_create_sha512_file(self): fd, tmp_path = tempfile.mkstemp( suffix = const_convert_to_unicode("òèà", 'utf-8')) os.write(fd, const_convert_to_rawstring("hello")) os.fsync(fd) cksum = "9b71d224bd62f3785d96d46ad3ea3d73319bfbc2890caadae2dff72519673ca72323c3d99ba5c11d7c7acc6e14b8c5da0c4663475c2e5c3adef46f73bcdec043" sha_path = et.create_sha512_file(tmp_path) sha_f = open(sha_path, "rb") orig_cont = const_convert_to_rawstring( cksum+" " + \ os.path.basename(tmp_path) + "\n", 'utf-8') self.assertEqual(orig_cont, sha_f.read()) self.assert_(et.compare_sha512(tmp_path, cksum)) sha_f.close() os.close(fd) os.remove(tmp_path) os.remove(sha_path) def test_md5string(self): mystring = "ciao" out_str = et.md5string(mystring) self.assertEqual(out_str, "6e6bc4e49dd477ebc98ef4046c067b5f") def test_isjustname(self): self.assert_(not et.isjustname("app-foo/foo-1.2.3")) self.assert_(et.isjustname("app-foo/foo")) def test_catpkgsplit(self): data = { 'app-foo/foo-1.2.3': ("app-foo", "foo", "1.2.3", "r0"), 'www-apps/389-foo-1.2.3': ("www-apps", "389-foo", "1.2.3", "r0"), } for atom, split_data in data.items(): pkgsplit = et.catpkgsplit(atom) self.assertEqual(split_data, pkgsplit) def test_dep_getkey(self): pkg = "app-foo/foo-1.2.3" key = "app-foo/foo" self.assertEqual(key, et.dep_getkey(pkg)) def test_dep_getcpv(self): pkg = ">=app-foo/foo-1.2.3" cpv = "app-foo/foo-1.2.3" self.assertEqual(cpv, et.dep_getcpv(pkg)) def test_dep_getslot(self): pkg = ">=app-foo/foo-1.2.3:2.3.4" slot = "2.3.4" self.assertEqual(slot, et.dep_getslot(pkg)) def test_dep_getusedeps(self): pkg = ">=app-foo/foo-1.2.3:2.3.4[ciao,come,va]" usedeps = ("ciao", "come", "va") self.assertEqual(usedeps, et.dep_getusedeps(pkg)) def test_remove_usedeps(self): pkg = ">=app-foo/foo-1.2.3:2.3.4[ciao,come,va]" result = ">=app-foo/foo-1.2.3:2.3.4" self.assertEqual(result, et.remove_usedeps(pkg)) def test_remove_slot(self): pkg = ">=app-foo/foo-1.2.3:2.3.4" result = ">=app-foo/foo-1.2.3" self.assertEqual(result, et.remove_slot(pkg)) def test_remove_revision(self): pkg = "app-foo/foo-1.2.3-r1" result = "app-foo/foo-1.2.3" self.assertEqual(result, et.remove_revision(pkg)) def test_remove_tag(self): pkg = "app-foo/foo-1.2.3-r1#2.2.2-foo" result = "app-foo/foo-1.2.3-r1" self.assertEqual(result, et.remove_tag(pkg)) def test_remove_entropy_revision(self): pkg = "app-foo/foo-1.2.3-r1#2.2.2-foo~1" result = "app-foo/foo-1.2.3-r1#2.2.2-foo" self.assertEqual(result, et.remove_entropy_revision(pkg)) def test_dep_get_entropy_revision(self): pkg = "app-foo/foo-1.2.3-r1#2.2.2-foo~1" result = 1 self.assertEqual(result, et.dep_get_entropy_revision(pkg)) def test_dep_get_spm_revision(self): pkg = "app-foo/foo-1.2.3-r1" result = "r1" self.assertEqual(result, et.dep_get_spm_revision(pkg)) def test_dep_get_match_in_repos(self): pkg = "app-foo/foo-1.2.3-r1@foorepo" result = ("app-foo/foo-1.2.3-r1", ["foorepo"]) self.assertEqual(result, et.dep_get_match_in_repos(pkg)) def test_dep_gettag(self): pkg = "app-foo/foo-1.2.3-r1#2.2.2-foo~1" result = "2.2.2-foo" self.assertEqual(result, et.dep_gettag(pkg)) def test_remove_package_operators(self): pkg = ">=app-foo/foo-1.2.3:2.3.4~1" result = "app-foo/foo-1.2.3:2.3.4~1" self.assertEqual(result, et.remove_package_operators(pkg)) def test_compare_versions(self): ver_a = ("1.0.0", "1.0.0", 0,) ver_b = ("1.0.1", "1.0.0", 0.10000000000000001,) ver_c = ("1.0.0", "1.0.1", -0.10000000000000001,) self.assertEqual(et.compare_versions(ver_a[0], ver_a[1]), ver_a[2]) self.assertEqual(et.compare_versions(ver_b[0], ver_b[1]), ver_b[2]) self.assertEqual(et.compare_versions(ver_c[0], ver_c[1]), ver_c[2]) def test_get_newer_version(self): vers = ["1.0", "3.4", "0.5", "999", "9999", "10.0"] out_vers = ['9999', '999', '10.0', '3.4', '1.0', '0.5'] self.assertEqual(et.get_newer_version(vers), out_vers) def test_get_entropy_newer_version(self): vers = [("1.0", "2222", 1,), ("3.4", "2222", 0,), ("1.0", "2223", 1,), ("1.0", "2223", 3,)] out_vers = [('1.0', '2223', 3), ('1.0', '2223', 1), ('3.4', '2222', 0), ('1.0', '2222', 1)] self.assertEqual(et.get_entropy_newer_version(vers), out_vers) def test_istextfile(self): fd, tmp_path = tempfile.mkstemp() os.write(fd, const_convert_to_rawstring("this is new")) os.fsync(fd) self.assert_(et.istextfile(tmp_path)) os.close(fd) os.remove(tmp_path) def test_spliturl(self): begin = "http://www.sabayon.org/download" end = ['http', 'www.sabayon.org', '/download', '', ''] self.assertEqual(list(et.spliturl(begin)), end) def test_spawn_function(self): fd, tmp_path = tempfile.mkstemp() def myfunc(fd): os.write(fd, const_convert_to_rawstring("this is new")) os.fsync(fd) return "ok" rc = et.spawn_function(myfunc, fd) self.assertEqual(rc, "ok") os.close(fd) f = open(tmp_path, "r") self.assertEqual("this is new", f.read()) f.close() os.remove(tmp_path) def test_bytes_into_human(self): begin = 102400020 end = '97.7MB' self.assertEqual(et.bytes_into_human(begin), end) def test_get_random_temp_file(self): self.assert_(et.get_random_temp_file()) def test_convert_unix_time_to_human_time(self): unixtime = 1 self.assertEqual(et.convert_unix_time_to_human_time(unixtime), '1970-01-01 01:00:01') def test_convert_seconds_to_fancy_output(self): seconds = 2740 self.assertEqual(et.convert_seconds_to_fancy_output(seconds), '45m:40s') def test_is_valid_string(self): valid = "ciasdoad" non_valid = "òèàòè" self.assert_(et.is_valid_string(valid)) self.assert_(not et.is_valid_string(non_valid)) def test_is_valid_md5(self): valid = "5d41402abc4b2a76b9719d911017c592" non_valid = "asdasdasdasdasdasdaszzzzzzza" self.assert_(et.is_valid_md5(valid)) self.assert_(not et.is_valid_md5(non_valid)) def test_read_elf_class(self): elf_obj = _misc.get_dl_so_amd() elf_class = 2 self.assertEqual(et.read_elf_class(elf_obj), elf_class) def test_is_elf_file(self): self.assert_(et.is_elf_file(_misc.get_dl_so_amd())) def test_read_elf_dyn_libs(self): elf_obj = _misc.get_dl_so_amd_2() known_meta = set(['libcom_err.so.2', 'libkrb5.so.3', 'libkrb5support.so.0', 'libgssrpc.so.4', 'libk5crypto.so.3', 'libc.so.6']) metadata = et.read_elf_dynamic_libraries(elf_obj) self.assertEqual(metadata, known_meta) def test_read_elf_linker_paths(self): elf_obj = _misc.get_dl_so_amd_2() known_meta = ['/usr/lib64', '/usr/lib64'] metadata = et.read_elf_linker_paths(elf_obj) self.assertEqual(metadata, known_meta) def test_xml_from_dict_extended(self): data = { "foo": 1, "foo2": None, "foo3": set([1, 2, 3]), "foo4": [1, 2, 3], "foo5": "ciao", "foo6": (1, 2, 3), "foo6": 1.25, "foo7": {"a": const_convert_to_unicode("ciaoèò", 'utf-8'),}, "foo8": frozenset([1, 2, 3]) } xml_data = et.xml_from_dict_extended(data) new_dict = et.dict_from_xml_extended(xml_data) self.assertEqual(data, new_dict) def test_xml_from_dict(self): data = { "foo": "abc", "foo2": "def", "foo3": "asdas asdasd sdasad", "foo4": const_convert_to_unicode("òèà", 'utf-8'), "foo5": "ciao", } xml_data = et.xml_from_dict(data) new_dict = et.dict_from_xml(xml_data) self.assertEqual(data, new_dict) def test_create_package_filename(self): category = "app-foo" name = "foo" version = "1.2.3" package_tag = "abc" result = 'app-foo:foo-1.2.3#abc.tbz2' self.assertEqual(et.create_package_filename(category, name, version, package_tag), result) def test_create_package_atom_string(self): category = "app-foo" name = "foo" version = "1.2.3" package_tag = "abc" result = 'app-foo/foo-1.2.3#abc' self.assertEqual(et.create_package_atom_string(category, name, version, package_tag), result) def test_uncompress_tarball(self): pkgs = [_misc.get_test_entropy_package4(), ] #_misc.get_footar_package() disabled for now for pkg in pkgs: self._do_uncompress_tarball(pkg) def _do_uncompress_tarball(self, pkg_path): tmp_dir = tempfile.mkdtemp() fd, tmp_file = tempfile.mkstemp() path_perms = {} # try with tar first args = ["tar", "xjfp", pkg_path, "-C", tmp_dir] proc = subprocess.Popen(args, stdout = fd, stderr = fd) rc = proc.wait() self.assert_(not rc) os.close(fd) for currentdir, subdirs, files in os.walk(tmp_dir): for xfile in files: path = os.path.join(currentdir, xfile) fstat = os.lstat(path) mode = stat.S_IMODE(fstat.st_mode) uid, gid = fstat.st_uid, fstat.st_gid path_perms[path] = (mode, uid, gid,) self.assert_(path_perms) shutil.rmtree(tmp_dir) os.makedirs(tmp_dir) # now try with our function rc = et.uncompress_tarball(pkg_path, extract_path = tmp_dir) self.assert_(not rc) new_path_perms = {} for currentdir, subdirs, files in os.walk(tmp_dir): for xfile in files: path = os.path.join(currentdir, xfile) fstat = os.lstat(path) mode = stat.S_IMODE(fstat.st_mode) uid, gid = fstat.st_uid, fstat.st_gid mystat = (mode, uid, gid,) new_path_perms[path] = (mode, uid, gid,) self.assertEqual(path_perms, new_path_perms) shutil.rmtree(tmp_dir) if __name__ == '__main__': if "--debug" in sys.argv: sys.argv.remove("--debug") from entropy.const import etpUi etpUi['debug'] = True unittest.main() et.kill_threads() raise SystemExit(0)