717 lines
23 KiB
Python
717 lines
23 KiB
Python
# -*- coding: utf-8 -*-
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, '.')
|
|
sys.path.insert(0, '../')
|
|
import unittest
|
|
from entropy.const import const_convert_to_rawstring, const_convert_to_unicode
|
|
import entropy.tools as et
|
|
from entropy.client.interfaces import Client
|
|
from entropy.output import print_generic
|
|
import tests._misc as _misc
|
|
import tempfile
|
|
import subprocess
|
|
import shutil
|
|
import stat
|
|
|
|
class ToolsTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
sys.stdout.write("%s called\n" % (self,))
|
|
sys.stdout.flush()
|
|
self.test_pkg = _misc.get_test_entropy_package()
|
|
self.test_pkg2 = _misc.get_test_entropy_package2()
|
|
self.test_pkg3 = _misc.get_test_entropy_package3()
|
|
self.test_pkgs = [self.test_pkg, self.test_pkg2, self.test_pkg3]
|
|
|
|
def tearDown(self):
|
|
"""
|
|
tearDown is run after each test
|
|
"""
|
|
sys.stdout.write("%s ran\n" % (self,))
|
|
sys.stdout.flush()
|
|
|
|
|
|
def test_dump_entropy_metadata(self):
|
|
|
|
client = Client(noclientdb = 2, indexing = False, xcache = False,
|
|
repo_validation = False)
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
|
|
for test_pkg in self.test_pkgs:
|
|
et.dump_entropy_metadata(test_pkg, tmp_path)
|
|
self.assertNotEqual(tmp_path, None)
|
|
dbconn = client.open_generic_database(tmp_path)
|
|
dbconn.validateDatabase()
|
|
dbconn.listAllIdpackages()
|
|
dbconn.closeDB()
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
|
|
def test_remove_entropy_metadata(self):
|
|
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
os.close(fd)
|
|
|
|
for test_pkg in self.test_pkgs:
|
|
self.assert_(et.is_entropy_package_file(test_pkg))
|
|
ext_rc = et.remove_entropy_metadata(test_pkg, tmp_path)
|
|
self.assertNotEqual(ext_rc, False)
|
|
self.assert_(os.path.isfile(tmp_path))
|
|
self.assert_(not et.is_entropy_package_file(tmp_path))
|
|
|
|
os.remove(tmp_path)
|
|
|
|
|
|
def test_tb(self):
|
|
# traceback test
|
|
tb = None
|
|
try:
|
|
raise ValueError()
|
|
except ValueError:
|
|
tb = et.get_traceback()
|
|
self.assert_(tb)
|
|
|
|
|
|
def test_get_remote_data(self):
|
|
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
msg = const_convert_to_rawstring("helloàòè\n", 'utf-8')
|
|
os.write(fd, msg)
|
|
os.fsync(fd)
|
|
|
|
recv_msg = et.get_remote_data("file://"+tmp_path)
|
|
self.assertEqual([msg], recv_msg)
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
|
|
def test_supported_img_file(self):
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
|
|
# create gif
|
|
os.write(fd, const_convert_to_rawstring("GIF89xxx"))
|
|
os.fsync(fd)
|
|
self.assert_(et.is_supported_image_file(tmp_path))
|
|
self.assert_(et._is_gif_file(tmp_path))
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
png_file = _misc.get_png()
|
|
self.assert_(et.is_supported_image_file(png_file))
|
|
self.assert_(et._is_png_file(png_file))
|
|
|
|
|
|
def test_valid_ascii(self):
|
|
valid = "ciao"
|
|
non_valid = "òèàò"
|
|
self.assert_(et.is_valid_ascii(valid))
|
|
self.assert_(not et.is_valid_ascii(non_valid))
|
|
|
|
|
|
def test_is_valid_unicode(self):
|
|
valid = "ciao"
|
|
valid2 = const_convert_to_unicode("òèàò", 'utf-8')
|
|
self.assert_(et.is_valid_unicode(valid))
|
|
self.assert_(et.is_valid_unicode(valid2))
|
|
|
|
|
|
def test_is_valid_email(self):
|
|
valid = "entropy@entropy.it"
|
|
non_valid = "entropy.entropy.it"
|
|
self.assert_(et.is_valid_email(valid))
|
|
self.assert_(not et.is_valid_email(non_valid))
|
|
|
|
|
|
def test_islive(self):
|
|
self.assert_(not et.islive())
|
|
|
|
|
|
def test_get_file_size(self):
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
os.write(fd, const_convert_to_rawstring("ciao"))
|
|
os.fsync(fd)
|
|
self.assertEqual(et.get_file_size(tmp_path), 4)
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
|
|
def test_sum_file_sizes(self):
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
fd2, tmp_path2 = tempfile.mkstemp()
|
|
os.write(fd, const_convert_to_rawstring("ciao"))
|
|
os.write(fd2, const_convert_to_rawstring("ciao"))
|
|
|
|
self.assertEqual(et.sum_file_sizes([tmp_path, tmp_path2]), 8)
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.close(fd2)
|
|
os.remove(tmp_path2)
|
|
|
|
def test_check_required_space(self):
|
|
self.assert_(et.check_required_space("/", 10), True)
|
|
|
|
def test_getstatusoutput(self):
|
|
cmd = "echo hello"
|
|
out = et.getstatusoutput(cmd)
|
|
self.assertEqual(out, (0, "hello",))
|
|
|
|
def test_movefile(self):
|
|
|
|
# move file
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
os.close(fd)
|
|
|
|
dest_path = tmp_path + "foo"
|
|
self.assert_(et.movefile(tmp_path, dest_path))
|
|
self.assert_(os.stat(dest_path))
|
|
|
|
os.remove(dest_path)
|
|
|
|
# move symlink
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
dst_link = tmp_path + "foo2"
|
|
dst_final_link = dst_link + "foo3"
|
|
os.symlink(tmp_path, dst_link)
|
|
self.assert_(et.movefile(dst_link, dst_final_link))
|
|
self.assert_(os.stat(dst_final_link))
|
|
|
|
os.close(fd)
|
|
os.remove(dst_final_link)
|
|
|
|
def test_get_random_number(self):
|
|
rand1 = et.get_random_number()
|
|
rand2 = et.get_random_number()
|
|
self.assert_(isinstance(rand1, int))
|
|
self.assert_(rand1 != rand2)
|
|
|
|
def test_split_indexable_into_chunks(self):
|
|
|
|
indexable = "asdXasdXasdXasdXasdXasdXasdXasdXasd"
|
|
result = ['asd', 'Xas', 'dXa', 'sdX', 'asd', 'Xas', 'dXa', 'sdX',
|
|
'asd', 'Xas', 'dXa', 'sd']
|
|
out = et.split_indexable_into_chunks(indexable, 3)
|
|
self.assert_(out)
|
|
self.assert_(out == result)
|
|
|
|
def test_shaXXX(self):
|
|
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
|
|
os.write(fd, const_convert_to_rawstring("this is the life"))
|
|
os.fsync(fd)
|
|
|
|
sha1 = et.sha1(tmp_path)
|
|
sha256 = et.sha256(tmp_path)
|
|
sha512 = et.sha512(tmp_path)
|
|
r_sha1 = "105de2055ac81db7b02a27623b7e73932788df95"
|
|
r_sha256 = "ccb134af19d748c9c845b26a1e3a29e6ea356d1f1e0ad47d57b83f38c5492988"
|
|
r_sha512 = "a1e88ef22f5884cd8b205c2e6e2b17ae0f7597df81fabc6a4e77454ac688102df8986353c9f97981b4274f61c6a0ca4c74ec2fd49997b4a35d4091d94dc8a64e"
|
|
|
|
self.assertEqual(sha1, r_sha1)
|
|
self.assertEqual(sha256, r_sha256)
|
|
self.assertEqual(sha512, r_sha512)
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
def test_md5sum_directory(self):
|
|
tmp_dir = tempfile.mkdtemp()
|
|
f = open(os.path.join(tmp_dir, "foo"), "w")
|
|
f.write("hello world")
|
|
f.flush()
|
|
f.close()
|
|
|
|
r_md5dir = "5eb63bbbe01eeed093cb22bb8f5acdc3"
|
|
md5dir = et.md5sum_directory(tmp_dir)
|
|
self.assertEqual(md5dir, r_md5dir)
|
|
mobj = et.md5obj_directory(tmp_dir)
|
|
self.assertEqual(mobj.hexdigest(), r_md5dir)
|
|
|
|
shutil.rmtree(tmp_dir)
|
|
|
|
def test_XXcompress_file(self):
|
|
|
|
import bz2
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
os.write(fd, const_convert_to_rawstring("this is the life"))
|
|
os.fsync(fd)
|
|
orig_md5 = et.md5sum(tmp_path)
|
|
|
|
new_path = tmp_path + ".bz2"
|
|
opener = bz2.BZ2File
|
|
et.compress_file(tmp_path, new_path, opener)
|
|
|
|
comp_md5 = "69b9fc26f7cb561a067a10b06d890242"
|
|
md5 = et.md5sum(new_path)
|
|
self.assertEqual(md5, comp_md5)
|
|
os.close(fd)
|
|
|
|
et.uncompress_file(new_path, tmp_path, opener)
|
|
self.assertEqual(orig_md5, et.md5sum(tmp_path))
|
|
|
|
os.remove(tmp_path)
|
|
os.remove(new_path)
|
|
|
|
def test_unpack_gzip(self):
|
|
import gzip
|
|
fd, tmp_path = tempfile.mkstemp(suffix = ".gz")
|
|
|
|
gz_f = gzip.GzipFile(tmp_path, "wb")
|
|
gz_f.write(const_convert_to_rawstring("ciao ciao ciao"))
|
|
gz_f.close()
|
|
|
|
new_path = et.unpack_gzip(tmp_path)
|
|
self.assert_(os.stat(new_path))
|
|
orig_md5 = "b40d18c97e6461678f264c4524f6cc7c"
|
|
self.assertEqual(orig_md5, et.md5sum(new_path))
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(new_path)
|
|
|
|
def test_unpack_bzip2(self):
|
|
import bz2
|
|
fd, tmp_path = tempfile.mkstemp(suffix = ".bz2")
|
|
|
|
gz_f = bz2.BZ2File(tmp_path, "wb")
|
|
gz_f.write(const_convert_to_rawstring("ciao ciao ciao"))
|
|
gz_f.close()
|
|
|
|
new_path = et.unpack_bzip2(tmp_path)
|
|
self.assert_(os.stat(new_path))
|
|
orig_md5 = "b40d18c97e6461678f264c4524f6cc7c"
|
|
self.assertEqual(orig_md5, et.md5sum(new_path))
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(new_path)
|
|
|
|
def test_remove_entropy_metadata2(self):
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
os.close(fd)
|
|
|
|
ext_rc = et.remove_entropy_metadata(
|
|
_misc.get_test_entropy_package4(), tmp_path)
|
|
self.assert_(ext_rc)
|
|
orig_md5 = "ab1f147202838c6ee9c6fc3511537279"
|
|
self.assertEqual(orig_md5, et.md5sum(tmp_path))
|
|
|
|
os.remove(tmp_path)
|
|
|
|
def test_create_md5_file(self):
|
|
fd, tmp_path = tempfile.mkstemp(
|
|
suffix = const_convert_to_unicode("òèà", 'utf-8'))
|
|
|
|
os.write(fd, const_convert_to_rawstring("hello"))
|
|
os.fsync(fd)
|
|
|
|
cksum = "5d41402abc4b2a76b9719d911017c592"
|
|
md5_path = et.create_md5_file(tmp_path)
|
|
md5_f = open(md5_path, "rb")
|
|
orig_cont = const_convert_to_rawstring(
|
|
cksum + " " + \
|
|
os.path.basename(tmp_path) + "\n",
|
|
'utf-8')
|
|
self.assertEqual(orig_cont, md5_f.read())
|
|
self.assert_(et.compare_md5(tmp_path, cksum))
|
|
|
|
md5_f.close()
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(md5_path)
|
|
|
|
def test_create_sha1_file(self):
|
|
fd, tmp_path = tempfile.mkstemp(
|
|
suffix = const_convert_to_unicode("òèà", 'utf-8'))
|
|
|
|
os.write(fd, const_convert_to_rawstring("hello"))
|
|
os.fsync(fd)
|
|
|
|
cksum = "aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d"
|
|
sha_path = et.create_sha1_file(tmp_path)
|
|
sha_f = open(sha_path, "rb")
|
|
orig_cont = const_convert_to_rawstring(
|
|
cksum + " " + \
|
|
os.path.basename(tmp_path) + "\n",
|
|
'utf-8')
|
|
self.assertEqual(orig_cont, sha_f.read())
|
|
self.assert_(et.compare_sha1(tmp_path, cksum))
|
|
|
|
sha_f.close()
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(sha_path)
|
|
|
|
def test_create_sha256_file(self):
|
|
fd, tmp_path = tempfile.mkstemp(
|
|
suffix = const_convert_to_unicode("òèà", 'utf-8'))
|
|
|
|
os.write(fd, const_convert_to_rawstring("hello"))
|
|
os.fsync(fd)
|
|
|
|
cksum = "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
|
|
sha_path = et.create_sha256_file(tmp_path)
|
|
sha_f = open(sha_path, "rb")
|
|
orig_cont = const_convert_to_rawstring(
|
|
cksum + " " + \
|
|
os.path.basename(tmp_path) + "\n",
|
|
'utf-8')
|
|
self.assertEqual(orig_cont, sha_f.read())
|
|
self.assert_(et.compare_sha256(tmp_path, cksum))
|
|
|
|
sha_f.close()
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(sha_path)
|
|
|
|
def test_create_sha512_file(self):
|
|
fd, tmp_path = tempfile.mkstemp(
|
|
suffix = const_convert_to_unicode("òèà", 'utf-8'))
|
|
|
|
os.write(fd, const_convert_to_rawstring("hello"))
|
|
os.fsync(fd)
|
|
|
|
cksum = "9b71d224bd62f3785d96d46ad3ea3d73319bfbc2890caadae2dff72519673ca72323c3d99ba5c11d7c7acc6e14b8c5da0c4663475c2e5c3adef46f73bcdec043"
|
|
sha_path = et.create_sha512_file(tmp_path)
|
|
sha_f = open(sha_path, "rb")
|
|
orig_cont = const_convert_to_rawstring(
|
|
cksum+" " + \
|
|
os.path.basename(tmp_path) + "\n",
|
|
'utf-8')
|
|
self.assertEqual(orig_cont, sha_f.read())
|
|
self.assert_(et.compare_sha512(tmp_path, cksum))
|
|
|
|
sha_f.close()
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(sha_path)
|
|
|
|
def test_isjustpkgname(self):
|
|
self.assert_(et.isjustpkgname("foo"))
|
|
self.assert_(not et.isjustpkgname("app-misc/foo-1.2.3"))
|
|
|
|
def test_isvalidatom(self):
|
|
self.assert_(not et.isvalidatom('media-libs/test-3.0'))
|
|
self.assert_(et.isvalidatom('>=media-libs/test-3.0'))
|
|
|
|
def test_md5string(self):
|
|
mystring = "ciao"
|
|
out_str = et.md5string(mystring)
|
|
self.assertEqual(out_str, "6e6bc4e49dd477ebc98ef4046c067b5f")
|
|
|
|
def test_get_operator(self):
|
|
ops = {
|
|
'>=': "media-libs/foo",
|
|
'<': "media-libs/foo",
|
|
'<=': "media-libs/foo",
|
|
'~': "media-libs/foo",
|
|
}
|
|
for op, key in ops.items():
|
|
self.assertEqual(op, et.get_operator(op + key))
|
|
|
|
def test_isjustname(self):
|
|
self.assert_(not et.isjustname("app-foo/foo-1.2.3"))
|
|
self.assert_(et.isjustname("app-foo/foo"))
|
|
|
|
def test_isspecific(self):
|
|
self.assert_(et.isspecific("app-foo/foo-1.2.3"))
|
|
self.assert_(not et.isspecific("app-foo/foo"))
|
|
|
|
def test_catpkgsplit(self):
|
|
data = {
|
|
'app-foo/foo-1.2.3': ["app-foo", "foo", "1.2.3", "r0"],
|
|
}
|
|
for atom, split_data in data.items():
|
|
pkgsplit = et.catpkgsplit(atom)
|
|
self.assertEqual(split_data, pkgsplit)
|
|
|
|
def test_dep_getkey(self):
|
|
pkg = "app-foo/foo-1.2.3"
|
|
key = "app-foo/foo"
|
|
self.assertEqual(key, et.dep_getkey(pkg))
|
|
|
|
def test_dep_getcpv(self):
|
|
pkg = ">=app-foo/foo-1.2.3"
|
|
cpv = "app-foo/foo-1.2.3"
|
|
self.assertEqual(cpv, et.dep_getcpv(pkg))
|
|
|
|
def test_dep_getslot(self):
|
|
pkg = ">=app-foo/foo-1.2.3:2.3.4"
|
|
slot = "2.3.4"
|
|
self.assertEqual(slot, et.dep_getslot(pkg))
|
|
|
|
def test_dep_getusedeps(self):
|
|
pkg = ">=app-foo/foo-1.2.3:2.3.4[ciao,come,va]"
|
|
usedeps = ("ciao", "come", "va")
|
|
self.assertEqual(usedeps, et.dep_getusedeps(pkg))
|
|
|
|
def test_remove_usedeps(self):
|
|
pkg = ">=app-foo/foo-1.2.3:2.3.4[ciao,come,va]"
|
|
result = ">=app-foo/foo-1.2.3:2.3.4"
|
|
self.assertEqual(result, et.remove_usedeps(pkg))
|
|
|
|
def test_remove_slot(self):
|
|
pkg = ">=app-foo/foo-1.2.3:2.3.4"
|
|
result = ">=app-foo/foo-1.2.3"
|
|
self.assertEqual(result, et.remove_slot(pkg))
|
|
|
|
def test_remove_revision(self):
|
|
pkg = "app-foo/foo-1.2.3-r1"
|
|
result = "app-foo/foo-1.2.3"
|
|
self.assertEqual(result, et.remove_revision(pkg))
|
|
|
|
def test_remove_tag(self):
|
|
pkg = "app-foo/foo-1.2.3-r1#2.2.2-foo"
|
|
result = "app-foo/foo-1.2.3-r1"
|
|
self.assertEqual(result, et.remove_tag(pkg))
|
|
|
|
def test_remove_entropy_revision(self):
|
|
pkg = "app-foo/foo-1.2.3-r1#2.2.2-foo~1"
|
|
result = "app-foo/foo-1.2.3-r1#2.2.2-foo"
|
|
self.assertEqual(result, et.remove_entropy_revision(pkg))
|
|
|
|
def test_dep_get_entropy_revision(self):
|
|
pkg = "app-foo/foo-1.2.3-r1#2.2.2-foo~1"
|
|
result = 1
|
|
self.assertEqual(result, et.dep_get_entropy_revision(pkg))
|
|
|
|
def test_dep_get_spm_revision(self):
|
|
pkg = "app-foo/foo-1.2.3-r1"
|
|
result = "r1"
|
|
self.assertEqual(result, et.dep_get_spm_revision(pkg))
|
|
|
|
def test_dep_get_match_in_repos(self):
|
|
pkg = "app-foo/foo-1.2.3-r1@foorepo"
|
|
result = ("app-foo/foo-1.2.3-r1", ["foorepo"])
|
|
self.assertEqual(result, et.dep_get_match_in_repos(pkg))
|
|
|
|
def test_dep_gettag(self):
|
|
pkg = "app-foo/foo-1.2.3-r1#2.2.2-foo~1"
|
|
result = "2.2.2-foo"
|
|
self.assertEqual(result, et.dep_gettag(pkg))
|
|
|
|
def test_remove_package_operators(self):
|
|
pkg = ">=app-foo/foo-1.2.3:2.3.4~1"
|
|
result = "app-foo/foo-1.2.3:2.3.4~1"
|
|
self.assertEqual(result, et.remove_package_operators(pkg))
|
|
|
|
def test_compare_versions(self):
|
|
ver_a = ("1.0.0", "1.0.0", 0,)
|
|
ver_b = ("1.0.1", "1.0.0", 0.10000000000000001,)
|
|
ver_c = ("1.0.0", "1.0.1", -0.10000000000000001,)
|
|
|
|
self.assertEqual(et.compare_versions(ver_a[0], ver_a[1]), ver_a[2])
|
|
self.assertEqual(et.compare_versions(ver_b[0], ver_b[1]), ver_b[2])
|
|
self.assertEqual(et.compare_versions(ver_c[0], ver_c[1]), ver_c[2])
|
|
|
|
def test_get_newer_version(self):
|
|
vers = ["1.0", "3.4", "0.5", "999", "9999", "10.0"]
|
|
out_vers = ['9999', '999', '10.0', '3.4', '1.0', '0.5']
|
|
self.assertEqual(et.get_newer_version(vers), out_vers)
|
|
|
|
def test_get_entropy_newer_version(self):
|
|
vers = [("1.0", "2222", 1,), ("3.4", "2222", 0,), ("1.0", "2223", 1,),
|
|
("1.0", "2223", 3,)]
|
|
out_vers = [('1.0', '2223', 3), ('1.0', '2223', 1),
|
|
('3.4', '2222', 0), ('1.0', '2222', 1)]
|
|
self.assertEqual(et.get_entropy_newer_version(vers), out_vers)
|
|
|
|
def test_istextfile(self):
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
|
|
os.write(fd, const_convert_to_rawstring("this is new"))
|
|
os.fsync(fd)
|
|
self.assert_(et.istextfile(tmp_path))
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
def test_spliturl(self):
|
|
begin = "http://www.sabayon.org/download"
|
|
end = ['http', 'www.sabayon.org', '/download', '', '']
|
|
self.assertEqual(list(et.spliturl(begin)), end)
|
|
|
|
def test_spawn_function(self):
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
def myfunc(fd):
|
|
os.write(fd, const_convert_to_rawstring("this is new"))
|
|
os.fsync(fd)
|
|
return "ok"
|
|
|
|
rc = et.spawn_function(myfunc, fd)
|
|
self.assertEqual(rc, "ok")
|
|
os.close(fd)
|
|
|
|
f = open(tmp_path, "r")
|
|
self.assertEqual("this is new", f.read())
|
|
f.close()
|
|
os.remove(tmp_path)
|
|
|
|
def test_bytes_into_human(self):
|
|
begin = 102400020
|
|
end = '97.7MB'
|
|
self.assertEqual(et.bytes_into_human(begin), end)
|
|
|
|
def test_get_random_temp_file(self):
|
|
self.assert_(et.get_random_temp_file())
|
|
|
|
def test_convert_unix_time_to_human_time(self):
|
|
unixtime = 1
|
|
self.assertEqual(et.convert_unix_time_to_human_time(unixtime),
|
|
'1970-01-01 01:00:01')
|
|
|
|
def test_convert_unix_time_to_datetime(self):
|
|
unixtime = 1
|
|
import datetime
|
|
self.assertEqual(et.convert_unix_time_to_datetime(unixtime),
|
|
datetime.datetime(1970, 1, 1, 1, 0, 1))
|
|
|
|
def test_convert_seconds_to_fancy_output(self):
|
|
seconds = 2740
|
|
self.assertEqual(et.convert_seconds_to_fancy_output(seconds),
|
|
'45m:40s')
|
|
|
|
def test_is_valid_string(self):
|
|
valid = "ciasdoad"
|
|
non_valid = "òèàòè"
|
|
self.assert_(et.is_valid_string(valid))
|
|
self.assert_(not et.is_valid_string(non_valid))
|
|
|
|
def test_is_valid_md5(self):
|
|
valid = "5d41402abc4b2a76b9719d911017c592"
|
|
non_valid = "asdasdasdasdasdasdaszzzzzzza"
|
|
self.assert_(et.is_valid_md5(valid))
|
|
self.assert_(not et.is_valid_md5(non_valid))
|
|
|
|
def test_read_elf_class(self):
|
|
elf_obj = _misc.get_dl_so_amd()
|
|
elf_class = 2
|
|
self.assertEqual(et.read_elf_class(elf_obj), elf_class)
|
|
|
|
def test_is_elf_file(self):
|
|
self.assert_(et.is_elf_file(_misc.get_dl_so_amd()))
|
|
|
|
def test_read_elf_dyn_libs(self):
|
|
elf_obj = _misc.get_dl_so_amd_2()
|
|
known_meta = set(['libcom_err.so.2', 'libkrb5.so.3',
|
|
'libkrb5support.so.0', 'libgssrpc.so.4', 'libk5crypto.so.3',
|
|
'libc.so.6'])
|
|
metadata = et.read_elf_dynamic_libraries(elf_obj)
|
|
self.assertEqual(metadata, known_meta)
|
|
|
|
def test_read_elf_linker_paths(self):
|
|
elf_obj = _misc.get_dl_so_amd_2()
|
|
known_meta = ['/usr/lib64', '/usr/lib64']
|
|
metadata = et.read_elf_linker_paths(elf_obj)
|
|
self.assertEqual(metadata, known_meta)
|
|
|
|
def test_xml_from_dict_extended(self):
|
|
data = {
|
|
"foo": 1,
|
|
"foo2": None,
|
|
"foo3": set([1, 2, 3]),
|
|
"foo4": [1, 2, 3],
|
|
"foo5": "ciao",
|
|
"foo6": (1, 2, 3),
|
|
"foo6": 1.25,
|
|
"foo7": {"a": const_convert_to_unicode("ciaoèò", 'utf-8'),},
|
|
"foo8": frozenset([1, 2, 3])
|
|
}
|
|
xml_data = et.xml_from_dict_extended(data)
|
|
new_dict = et.dict_from_xml_extended(xml_data)
|
|
self.assertEqual(data, new_dict)
|
|
|
|
def test_xml_from_dict(self):
|
|
data = {
|
|
"foo": "abc",
|
|
"foo2": "def",
|
|
"foo3": "asdas asdasd sdasad",
|
|
"foo4": const_convert_to_unicode("òèà", 'utf-8'),
|
|
"foo5": "ciao",
|
|
}
|
|
xml_data = et.xml_from_dict(data)
|
|
new_dict = et.dict_from_xml(xml_data)
|
|
self.assertEqual(data, new_dict)
|
|
|
|
def test_create_package_filename(self):
|
|
category = "app-foo"
|
|
name = "foo"
|
|
version = "1.2.3"
|
|
package_tag = "abc"
|
|
result = 'app-foo:foo-1.2.3#abc.tbz2'
|
|
self.assertEqual(et.create_package_filename(category, name, version,
|
|
package_tag), result)
|
|
|
|
def test_create_package_atom_string(self):
|
|
category = "app-foo"
|
|
name = "foo"
|
|
version = "1.2.3"
|
|
package_tag = "abc"
|
|
result = 'app-foo/foo-1.2.3#abc'
|
|
self.assertEqual(et.create_package_atom_string(category, name, version,
|
|
package_tag), result)
|
|
|
|
def test_uncompress_tarball(self):
|
|
|
|
pkgs = [_misc.get_test_entropy_package4(),
|
|
] #_misc.get_footar_package() disabled for now
|
|
for pkg in pkgs:
|
|
self._do_uncompress_tarball(pkg)
|
|
|
|
def _do_uncompress_tarball(self, pkg_path):
|
|
|
|
tmp_dir = tempfile.mkdtemp()
|
|
fd, tmp_file = tempfile.mkstemp()
|
|
|
|
path_perms = {}
|
|
|
|
# try with tar first
|
|
args = ["tar", "xjfp", pkg_path, "-C", tmp_dir]
|
|
proc = subprocess.Popen(args, stdout = fd, stderr = fd)
|
|
rc = proc.wait()
|
|
self.assert_(not rc)
|
|
os.close(fd)
|
|
|
|
for currentdir, subdirs, files in os.walk(tmp_dir):
|
|
for xfile in files:
|
|
path = os.path.join(currentdir, xfile)
|
|
fstat = os.lstat(path)
|
|
mode = stat.S_IMODE(fstat.st_mode)
|
|
uid, gid = fstat.st_uid, fstat.st_gid
|
|
path_perms[path] = (mode, uid, gid,)
|
|
|
|
self.assert_(path_perms)
|
|
|
|
shutil.rmtree(tmp_dir)
|
|
os.makedirs(tmp_dir)
|
|
|
|
# now try with our function
|
|
rc = et.uncompress_tarball(pkg_path, extract_path = tmp_dir)
|
|
self.assert_(not rc)
|
|
|
|
new_path_perms = {}
|
|
|
|
for currentdir, subdirs, files in os.walk(tmp_dir):
|
|
for xfile in files:
|
|
path = os.path.join(currentdir, xfile)
|
|
fstat = os.lstat(path)
|
|
mode = stat.S_IMODE(fstat.st_mode)
|
|
uid, gid = fstat.st_uid, fstat.st_gid
|
|
mystat = (mode, uid, gid,)
|
|
new_path_perms[path] = (mode, uid, gid,)
|
|
|
|
self.assertEqual(path_perms, new_path_perms)
|
|
shutil.rmtree(tmp_dir)
|
|
|
|
if __name__ == '__main__':
|
|
if "--debug" in sys.argv:
|
|
sys.argv.remove("--debug")
|
|
from entropy.const import etpUi
|
|
etpUi['debug'] = True
|
|
unittest.main()
|