652 lines
22 KiB
Python
652 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, '.')
|
|
sys.path.insert(0, '../')
|
|
import unittest
|
|
from entropy.const import const_convert_to_rawstring, const_convert_to_unicode
|
|
import entropy.tools as et
|
|
from entropy.client.interfaces import Client
|
|
from entropy.output import print_generic
|
|
import tests._misc as _misc
|
|
import tempfile
|
|
import subprocess
|
|
import shutil
|
|
import stat
|
|
|
|
class ToolsTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
sys.stdout.write("%s called\n" % (self,))
|
|
sys.stdout.flush()
|
|
self.test_pkg = _misc.get_test_entropy_package()
|
|
self.test_pkg2 = _misc.get_test_entropy_package2()
|
|
self.test_pkg3 = _misc.get_test_entropy_package3()
|
|
self.test_pkgs = [self.test_pkg, self.test_pkg2, self.test_pkg3]
|
|
|
|
def tearDown(self):
|
|
"""
|
|
tearDown is run after each test
|
|
"""
|
|
sys.stdout.write("%s ran\n" % (self,))
|
|
sys.stdout.flush()
|
|
|
|
def test_dump_entropy_metadata(self):
|
|
|
|
client = Client(installed_repo = -1, indexing = False, xcache = False,
|
|
repo_validation = False)
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
|
|
for test_pkg in self.test_pkgs:
|
|
et.dump_entropy_metadata(test_pkg, tmp_path)
|
|
self.assertNotEqual(tmp_path, None)
|
|
dbconn = client.open_generic_repository(tmp_path)
|
|
dbconn.validate()
|
|
dbconn.integrity_check()
|
|
dbconn.listAllPackageIds()
|
|
dbconn.close()
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
# calling destroy() and shutdown()
|
|
# need to call destroy() directly to remove all the SystemSettings
|
|
# plugins because shutdown() doesn't, since it's meant to be called
|
|
# right before terminating the process
|
|
client.destroy()
|
|
client.shutdown()
|
|
|
|
def test_remove_entropy_metadata(self):
|
|
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
os.close(fd)
|
|
|
|
for test_pkg in self.test_pkgs:
|
|
self.assertTrue(et.is_entropy_package_file(test_pkg))
|
|
ext_rc = et.remove_entropy_metadata(test_pkg, tmp_path)
|
|
self.assertNotEqual(ext_rc, False)
|
|
self.assertTrue(os.path.isfile(tmp_path))
|
|
self.assertTrue(not et.is_entropy_package_file(tmp_path))
|
|
|
|
os.remove(tmp_path)
|
|
|
|
def test_tb(self):
|
|
# traceback test
|
|
tb = None
|
|
try:
|
|
raise ValueError()
|
|
except ValueError:
|
|
tb = et.get_traceback()
|
|
self.assertTrue(tb)
|
|
|
|
def test_get_remote_data(self):
|
|
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
msg = const_convert_to_rawstring("helloàòè\n", 'utf-8')
|
|
os.write(fd, msg)
|
|
os.fsync(fd)
|
|
|
|
recv_msg = et.get_remote_data("file://"+tmp_path)
|
|
self.assertEqual([msg], recv_msg)
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
def test_supported_img_file(self):
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
|
|
# create gif
|
|
os.write(fd, const_convert_to_rawstring("GIF89xxx"))
|
|
os.fsync(fd)
|
|
self.assertTrue(et.is_supported_image_file(tmp_path))
|
|
self.assertTrue(et._is_gif_file(tmp_path))
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
png_file = _misc.get_png()
|
|
self.assertTrue(et.is_supported_image_file(png_file))
|
|
self.assertTrue(et._is_png_file(png_file))
|
|
|
|
def test_valid_ascii(self):
|
|
valid = "ciao"
|
|
non_valid = "òèàò"
|
|
self.assertTrue(et.is_valid_ascii(valid))
|
|
self.assertTrue(not et.is_valid_ascii(non_valid))
|
|
|
|
def test_is_valid_unicode(self):
|
|
valid = "ciao"
|
|
valid2 = const_convert_to_unicode("òèàò", 'utf-8')
|
|
self.assertTrue(et.is_valid_unicode(valid))
|
|
self.assertTrue(et.is_valid_unicode(valid2))
|
|
|
|
def test_is_valid_email(self):
|
|
valid = "entropy@entropy.it"
|
|
non_valid = "entropy.entropy.it"
|
|
self.assertTrue(et.is_valid_email(valid))
|
|
self.assertTrue(not et.is_valid_email(non_valid))
|
|
|
|
def test_islive(self):
|
|
self.assertTrue(not et.islive())
|
|
|
|
def test_get_file_size(self):
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
os.write(fd, const_convert_to_rawstring("ciao"))
|
|
os.fsync(fd)
|
|
self.assertEqual(et.get_file_size(tmp_path), 4)
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
def test_sum_file_sizes(self):
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
fd2, tmp_path2 = tempfile.mkstemp()
|
|
os.write(fd, const_convert_to_rawstring("ciao"))
|
|
os.write(fd2, const_convert_to_rawstring("ciao"))
|
|
|
|
self.assertEqual(et.sum_file_sizes([tmp_path, tmp_path2]), 8)
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.close(fd2)
|
|
os.remove(tmp_path2)
|
|
|
|
def test_check_required_space(self):
|
|
self.assertTrue(et.check_required_space("/", 10), True)
|
|
|
|
def test_getstatusoutput(self):
|
|
cmd = "echo hello"
|
|
out = et.getstatusoutput(cmd)
|
|
self.assertEqual(out, (0, "hello",))
|
|
|
|
def test_movefile(self):
|
|
|
|
# move file
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
os.close(fd)
|
|
|
|
dest_path = tmp_path + "foo"
|
|
self.assertTrue(et.movefile(tmp_path, dest_path))
|
|
self.assertTrue(os.stat(dest_path))
|
|
|
|
os.remove(dest_path)
|
|
|
|
# move symlink
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
dst_link = tmp_path + "foo2"
|
|
dst_final_link = dst_link + "foo3"
|
|
os.symlink(tmp_path, dst_link)
|
|
self.assertTrue(et.movefile(dst_link, dst_final_link))
|
|
self.assertTrue(os.stat(dst_final_link))
|
|
|
|
os.close(fd)
|
|
os.remove(dst_final_link)
|
|
|
|
def test_get_random_number(self):
|
|
rand1 = et.get_random_number()
|
|
rand2 = et.get_random_number()
|
|
self.assertTrue(isinstance(rand1, int))
|
|
self.assertTrue(rand1 != rand2)
|
|
|
|
def test_split_indexable_into_chunks(self):
|
|
|
|
indexable = "asdXasdXasdXasdXasdXasdXasdXasdXasd"
|
|
result = ['asd', 'Xas', 'dXa', 'sdX', 'asd', 'Xas', 'dXa', 'sdX',
|
|
'asd', 'Xas', 'dXa', 'sd']
|
|
out = et.split_indexable_into_chunks(indexable, 3)
|
|
self.assertTrue(out)
|
|
self.assertTrue(out == result)
|
|
|
|
def test_shaXXX(self):
|
|
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
|
|
os.write(fd, const_convert_to_rawstring("this is the life"))
|
|
os.fsync(fd)
|
|
|
|
sha1 = et.sha1(tmp_path)
|
|
sha256 = et.sha256(tmp_path)
|
|
sha512 = et.sha512(tmp_path)
|
|
r_sha1 = "105de2055ac81db7b02a27623b7e73932788df95"
|
|
r_sha256 = "ccb134af19d748c9c845b26a1e3a29e6ea356d1f1e0ad47d57b83f38c5492988"
|
|
r_sha512 = "a1e88ef22f5884cd8b205c2e6e2b17ae0f7597df81fabc6a4e77454ac688102df8986353c9f97981b4274f61c6a0ca4c74ec2fd49997b4a35d4091d94dc8a64e"
|
|
|
|
self.assertEqual(sha1, r_sha1)
|
|
self.assertEqual(sha256, r_sha256)
|
|
self.assertEqual(sha512, r_sha512)
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
def test_md5sum_directory(self):
|
|
tmp_dir = tempfile.mkdtemp()
|
|
f = open(os.path.join(tmp_dir, "foo"), "w")
|
|
f.write("hello world")
|
|
f.flush()
|
|
f.close()
|
|
|
|
r_md5dir = "5eb63bbbe01eeed093cb22bb8f5acdc3"
|
|
md5dir = et.md5sum_directory(tmp_dir)
|
|
self.assertEqual(md5dir, r_md5dir)
|
|
mobj = et.md5obj_directory(tmp_dir)
|
|
self.assertEqual(mobj.hexdigest(), r_md5dir)
|
|
|
|
shutil.rmtree(tmp_dir)
|
|
|
|
def test_XXcompress_file(self):
|
|
|
|
import bz2
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
os.write(fd, const_convert_to_rawstring("this is the life"))
|
|
os.fsync(fd)
|
|
orig_md5 = et.md5sum(tmp_path)
|
|
|
|
new_path = tmp_path + ".bz2"
|
|
opener = bz2.BZ2File
|
|
et.compress_file(tmp_path, new_path, opener)
|
|
|
|
comp_md5 = "69b9fc26f7cb561a067a10b06d890242"
|
|
md5 = et.md5sum(new_path)
|
|
self.assertEqual(md5, comp_md5)
|
|
os.close(fd)
|
|
|
|
et.uncompress_file(new_path, tmp_path, opener)
|
|
self.assertEqual(orig_md5, et.md5sum(tmp_path))
|
|
|
|
os.remove(tmp_path)
|
|
os.remove(new_path)
|
|
|
|
def test_unpack_gzip(self):
|
|
import gzip
|
|
fd, tmp_path = tempfile.mkstemp(suffix = ".gz")
|
|
|
|
gz_f = gzip.GzipFile(tmp_path, "wb")
|
|
gz_f.write(const_convert_to_rawstring("ciao ciao ciao"))
|
|
gz_f.close()
|
|
|
|
new_path = et.unpack_gzip(tmp_path)
|
|
self.assertTrue(os.stat(new_path))
|
|
orig_md5 = "b40d18c97e6461678f264c4524f6cc7c"
|
|
self.assertEqual(orig_md5, et.md5sum(new_path))
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(new_path)
|
|
|
|
def test_unpack_bzip2(self):
|
|
import bz2
|
|
fd, tmp_path = tempfile.mkstemp(suffix = ".bz2")
|
|
|
|
gz_f = bz2.BZ2File(tmp_path, "wb")
|
|
gz_f.write(const_convert_to_rawstring("ciao ciao ciao"))
|
|
gz_f.close()
|
|
|
|
new_path = et.unpack_bzip2(tmp_path)
|
|
self.assertTrue(os.stat(new_path))
|
|
orig_md5 = "b40d18c97e6461678f264c4524f6cc7c"
|
|
self.assertEqual(orig_md5, et.md5sum(new_path))
|
|
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(new_path)
|
|
|
|
def test_remove_entropy_metadata2(self):
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
os.close(fd)
|
|
|
|
ext_rc = et.remove_entropy_metadata(
|
|
_misc.get_test_entropy_package4(), tmp_path)
|
|
self.assertTrue(ext_rc)
|
|
orig_md5 = "ab1f147202838c6ee9c6fc3511537279"
|
|
self.assertEqual(orig_md5, et.md5sum(tmp_path))
|
|
|
|
os.remove(tmp_path)
|
|
|
|
def test_create_md5_file(self):
|
|
fd, tmp_path = tempfile.mkstemp(
|
|
suffix = const_convert_to_unicode("òèà", 'utf-8'))
|
|
|
|
os.write(fd, const_convert_to_rawstring("hello"))
|
|
os.fsync(fd)
|
|
|
|
cksum = "5d41402abc4b2a76b9719d911017c592"
|
|
md5_path = et.create_md5_file(tmp_path)
|
|
md5_f = open(md5_path, "rb")
|
|
orig_cont = const_convert_to_rawstring(
|
|
cksum + " " + \
|
|
os.path.basename(tmp_path) + "\n",
|
|
'utf-8')
|
|
self.assertEqual(orig_cont, md5_f.read())
|
|
self.assertTrue(et.compare_md5(tmp_path, cksum))
|
|
|
|
md5_f.close()
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(md5_path)
|
|
|
|
def test_create_sha1_file(self):
|
|
fd, tmp_path = tempfile.mkstemp(
|
|
suffix = const_convert_to_unicode("òèà", 'utf-8'))
|
|
|
|
os.write(fd, const_convert_to_rawstring("hello"))
|
|
os.fsync(fd)
|
|
|
|
cksum = "aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d"
|
|
sha_path = et.create_sha1_file(tmp_path)
|
|
sha_f = open(sha_path, "rb")
|
|
orig_cont = const_convert_to_rawstring(
|
|
cksum + " " + \
|
|
os.path.basename(tmp_path) + "\n",
|
|
'utf-8')
|
|
self.assertEqual(orig_cont, sha_f.read())
|
|
self.assertTrue(et.compare_sha1(tmp_path, cksum))
|
|
|
|
sha_f.close()
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(sha_path)
|
|
|
|
def test_create_sha256_file(self):
|
|
fd, tmp_path = tempfile.mkstemp(
|
|
suffix = const_convert_to_unicode("òèà", 'utf-8'))
|
|
|
|
os.write(fd, const_convert_to_rawstring("hello"))
|
|
os.fsync(fd)
|
|
|
|
cksum = "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
|
|
sha_path = et.create_sha256_file(tmp_path)
|
|
sha_f = open(sha_path, "rb")
|
|
orig_cont = const_convert_to_rawstring(
|
|
cksum + " " + \
|
|
os.path.basename(tmp_path) + "\n",
|
|
'utf-8')
|
|
self.assertEqual(orig_cont, sha_f.read())
|
|
self.assertTrue(et.compare_sha256(tmp_path, cksum))
|
|
|
|
sha_f.close()
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(sha_path)
|
|
|
|
def test_create_sha512_file(self):
|
|
fd, tmp_path = tempfile.mkstemp(
|
|
suffix = const_convert_to_unicode("òèà", 'utf-8'))
|
|
|
|
os.write(fd, const_convert_to_rawstring("hello"))
|
|
os.fsync(fd)
|
|
|
|
cksum = "9b71d224bd62f3785d96d46ad3ea3d73319bfbc2890caadae2dff72519673ca72323c3d99ba5c11d7c7acc6e14b8c5da0c4663475c2e5c3adef46f73bcdec043"
|
|
sha_path = et.create_sha512_file(tmp_path)
|
|
sha_f = open(sha_path, "rb")
|
|
orig_cont = const_convert_to_rawstring(
|
|
cksum+" " + \
|
|
os.path.basename(tmp_path) + "\n",
|
|
'utf-8')
|
|
self.assertEqual(orig_cont, sha_f.read())
|
|
self.assertTrue(et.compare_sha512(tmp_path, cksum))
|
|
|
|
sha_f.close()
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
os.remove(sha_path)
|
|
|
|
def test_md5string(self):
|
|
mystring = "ciao"
|
|
out_str = et.md5string(mystring)
|
|
self.assertEqual(out_str, "6e6bc4e49dd477ebc98ef4046c067b5f")
|
|
|
|
def test_istextfile(self):
|
|
fd, tmp_path = tempfile.mkstemp()
|
|
|
|
os.write(fd, const_convert_to_rawstring("this is new"))
|
|
os.fsync(fd)
|
|
self.assertTrue(et.istextfile(tmp_path))
|
|
os.close(fd)
|
|
os.remove(tmp_path)
|
|
|
|
def test_spliturl(self):
|
|
begin = "http://www.sabayon.org/download"
|
|
end = ['http', 'www.sabayon.org', '/download', '', '']
|
|
self.assertEqual(list(et.spliturl(begin)), end)
|
|
|
|
def test_bytes_into_human(self):
|
|
begin = 102400020
|
|
end = '102.4MB'
|
|
self.assertEqual(et.bytes_into_human(begin), end)
|
|
|
|
"""
|
|
def test_convert_unix_time_to_human_time(self):
|
|
unixtime = 1
|
|
# FIXME: on UTC it must return 00:00:01
|
|
self.assertEqual(et.convert_unix_time_to_human_time(unixtime),
|
|
'1970-01-01 00:00:01')
|
|
"""
|
|
|
|
def test_convert_seconds_to_fancy_output(self):
|
|
seconds = 2740
|
|
self.assertEqual(et.convert_seconds_to_fancy_output(seconds),
|
|
'45m:40s')
|
|
|
|
def test_is_valid_string(self):
|
|
valid = "ciasdoad"
|
|
non_valid = "òèàòè"
|
|
self.assertTrue(et.is_valid_string(valid))
|
|
self.assertTrue(not et.is_valid_string(non_valid))
|
|
|
|
def test_is_valid_md5(self):
|
|
valid = "5d41402abc4b2a76b9719d911017c592"
|
|
non_valid = "asdasdasdasdasdasdaszzzzzzza"
|
|
self.assertTrue(et.is_valid_md5(valid))
|
|
self.assertTrue(not et.is_valid_md5(non_valid))
|
|
|
|
def test_entropy_delta(self):
|
|
pkg_path_a = _misc.get_test_entropy_package()
|
|
pkg_path_b = _misc.get_test_entropy_package2()
|
|
hash_tag = et.md5sum(pkg_path_a) + et.md5sum(pkg_path_b)
|
|
delta_path = et.generate_entropy_delta(pkg_path_a, pkg_path_b,
|
|
hash_tag, pkg_compression = "bz2")
|
|
self.assertNotEqual(None, delta_path) # missing bsdiff?
|
|
tmp_fd, tmp_path = tempfile.mkstemp()
|
|
os.close(tmp_fd)
|
|
try:
|
|
et.apply_entropy_delta(pkg_path_a, delta_path, tmp_path)
|
|
self.assertEqual(et.md5sum(pkg_path_b), et.md5sum(tmp_path))
|
|
finally:
|
|
os.remove(tmp_path)
|
|
|
|
def test_read_elf_class(self):
|
|
elf_obj = _misc.get_dl_so_amd()
|
|
elf_class = 2
|
|
self.assertEqual(et.read_elf_class(elf_obj), elf_class)
|
|
|
|
def test_is_elf_file(self):
|
|
self.assertTrue(et.is_elf_file(_misc.get_dl_so_amd()))
|
|
|
|
def test_read_elf_dyn_libs(self):
|
|
elf_obj = _misc.get_dl_so_amd_2()
|
|
known_meta = set(['libcom_err.so.2', 'libkrb5.so.3',
|
|
'libkrb5support.so.0', 'libgssrpc.so.4', 'libk5crypto.so.3',
|
|
'libc.so.6'])
|
|
metadata = et.read_elf_dynamic_libraries(elf_obj)
|
|
self.assertEqual(metadata, known_meta)
|
|
|
|
def test_read_elf_real_dynamic_libraries(self):
|
|
elf_obj = _misc.get_dl_so_amd_2()
|
|
known_meta = set(
|
|
['libresolv.so.2', 'libcom_err.so.2',
|
|
'libkeyutils.so.1', 'libkrb5.so.3',
|
|
'ld-linux-x86-64.so.2', 'libkrb5support.so.0',
|
|
'libpthread.so.0', 'libgssrpc.so.4',
|
|
'libgssapi_krb5.so.2', 'libdl.so.2',
|
|
'libk5crypto.so.3', 'libc.so.6']
|
|
)
|
|
metadata = et.read_elf_real_dynamic_libraries(elf_obj)
|
|
self.assertEqual(metadata, known_meta)
|
|
|
|
def test_read_elf_broken_symbols(self):
|
|
elf_obj = _misc.get_test_so_1()
|
|
known_meta = set(['PyDict_SetItem', 'PyArg_ParseTuple',
|
|
'PyList_New', 'PyList_Append', 'PyFunction_Type',
|
|
'PyExc_AttributeError', 'PyImport_Import', 'PyObject_Init',
|
|
'PyDict_Size', 'PyInt_AsLong', 'PyImport_ImportModule',
|
|
'PyUnicodeUCS4_DecodeASCII', 'PyExc_DeprecationWarning',
|
|
'PyErr_SetObject', 'PyExc_NotImplementedError', 'PyInt_Type',
|
|
'PyErr_Occurred', 'PyClass_Type', 'PyWrapperDescr_Type',
|
|
'PyDict_GetItemString', 'PyLong_FromLong',
|
|
'PyObject_AsCharBuffer', 'Py_AtExit', 'PyLong_FromLongLong',
|
|
'PyErr_Restore', 'PyErr_Print', 'PyObject_CallFunction',
|
|
'PyLong_AsLong', 'PyOS_snprintf', 'PyErr_SetNone',
|
|
'PyLong_AsUnsignedLongMask', 'PyEval_InitThreads',
|
|
'PyProperty_Type', 'PyFloat_AsDouble', 'PyObject_GC_UnTrack',
|
|
'PyWeakref_NewRef', 'PySequence_GetItem', 'PyCObject_Type',
|
|
'PyFloat_FromDouble', 'PySlice_GetIndicesEx',
|
|
'PyExc_SystemError', 'PyErr_NoMemory', 'PyMethod_New',
|
|
'PyErr_SetString', 'PyLong_AsLongLong', 'PyObject_IsInstance',
|
|
'Py_InitModule4_64', 'PyInt_FromSsize_t', 'PyType_Type',
|
|
'PyCallable_Check', '_Py_NoneStruct', 'PyDescr_NewMethod',
|
|
'PyUnicodeUCS4_AsASCIIString', 'PyBool_FromLong',
|
|
'PyObject_CallObject', '_Py_NotImplementedStruct',
|
|
'PyObject_GetBuffer', 'PySlice_Type',
|
|
'PyUnicodeUCS4_DecodeLatin1', 'PyEval_CallObjectWithKeywords',
|
|
'PyCapsule_GetPointer', 'PyExc_BufferError',
|
|
'PyExc_ValueError', 'PyBuffer_Release',
|
|
'PyThread_get_thread_ident', 'PyExc_NameError',
|
|
'Py_FatalError', 'PyGILState_Ensure', 'PyObject_GetAttr',
|
|
'PyDictProxy_New', 'PyExc_RuntimeError',
|
|
'PyUnicodeUCS4_AsWideChar', 'PyInt_FromLong', 'PyTuple_Type',
|
|
'PyBool_Type', 'PyObject_Str', 'PyGILState_Release',
|
|
'PyUnicodeUCS4_FromObject', 'PyModule_GetDict',
|
|
'PyLong_AsVoidPtr', 'PyCObject_AsVoidPtr',
|
|
'PyString_FromString', 'PyObject_Call',
|
|
'PyObject_CheckReadBuffer', 'PyObject_Print',
|
|
'PyDict_GetItem', 'PyType_IsSubtype', 'PyObject_AsReadBuffer',
|
|
'PyCFunction_NewEx', 'PyMem_Free', 'PyDict_SetItemString',
|
|
'PyString_AsString', '_Py_TrueStruct',
|
|
'PyUnicodeUCS4_DecodeUTF8', 'PyNumber_AsSsize_t',
|
|
'PyErr_WarnEx', 'PyLong_FromVoidPtr', 'PyWeakref_GetObject',
|
|
'PyDict_Contains', 'PyUnicodeUCS4_AsUTF8String',
|
|
'PyErr_Fetch', 'PyInt_AsSsize_t', 'PyCapsule_New',
|
|
'PyType_Modified', 'PyErr_Clear', 'PyFloat_Type',
|
|
'PyType_Ready', 'PyExc_Exception', 'PyExc_TypeError',
|
|
'PyObject_Malloc', 'PyImport_GetModuleDict',
|
|
'PyLong_FromUnsignedLong', 'PyType_GenericAlloc',
|
|
'PyTuple_GetSlice', 'PyMethod_Type',
|
|
'PyLong_AsUnsignedLongLongMask', 'PyMem_Malloc',
|
|
'PyUnicodeUCS4_AsLatin1String', 'PyErr_Format',
|
|
'PyExc_IndexError', 'PyBuffer_FillInfo', 'PyTuple_Pack',
|
|
'PyString_FromStringAndSize', 'Py_BuildValue',
|
|
'PyObject_GetAttrString', 'PyTuple_New',
|
|
'PyUnicodeUCS4_FromWideChar', 'PyArg_ParseTupleAndKeywords',
|
|
'PyDict_Next', 'PyLong_FromUnsignedLongLong', 'PyDict_New',
|
|
'PyString_ConcatAndDel', 'PyCFunction_Type', '_Py_ZeroStruct',
|
|
'PyThreadState_Get', 'PyString_FromFormat',
|
|
'PyObject_CallFunctionObjArgs', 'PyCapsule_Type',
|
|
'PyErr_GivenExceptionMatches', 'PyCObject_FromVoidPtr',
|
|
'PyBaseObject_Type', 'PyObject_SetAttr', 'PySequence_Size'])
|
|
metadata = et.read_elf_broken_symbols(elf_obj)
|
|
self.assertEqual(metadata, known_meta)
|
|
|
|
def test_read_elf_linker_paths(self):
|
|
elf_obj = _misc.get_dl_so_amd_2()
|
|
known_meta = ['/usr/lib64']
|
|
metadata = et.read_elf_linker_paths(elf_obj)
|
|
self.assertEqual(metadata, known_meta)
|
|
|
|
def test_xml_from_dict_extended(self):
|
|
data = {
|
|
"foo": 1,
|
|
"foo2": None,
|
|
"foo3": set([1, 2, 3]),
|
|
"foo4": [1, 2, 3],
|
|
"foo5": "ciao",
|
|
"foo6": (1, 2, 3),
|
|
"foo6": 1.25,
|
|
"foo7": {"a": const_convert_to_unicode("ciaoèò", 'utf-8'),},
|
|
"foo8": frozenset([1, 2, 3])
|
|
}
|
|
xml_data = et.xml_from_dict_extended(data)
|
|
new_dict = et.dict_from_xml_extended(xml_data)
|
|
self.assertEqual(data, new_dict)
|
|
|
|
def test_validate_repository_id(self):
|
|
data = [
|
|
("server=repository", False),
|
|
("repository", True),
|
|
("=asd'?^ì$", False),
|
|
("[$", False),
|
|
("sabayonlinux.org", True),
|
|
("sabayon-limbo", True),
|
|
("mania.org", True)
|
|
]
|
|
for repository_id, expected in data:
|
|
self.assertEqual(
|
|
et.validate_repository_id(repository_id),
|
|
expected)
|
|
|
|
def test_xml_from_dict(self):
|
|
data = {
|
|
"foo": "abc",
|
|
"foo2": "def",
|
|
"foo3": "asdas asdasd sdasad",
|
|
"foo4": const_convert_to_unicode("òèà", 'utf-8'),
|
|
"foo5": "ciao",
|
|
}
|
|
xml_data = et.xml_from_dict(data)
|
|
new_dict = et.dict_from_xml(xml_data)
|
|
self.assertEqual(data, new_dict)
|
|
|
|
def test_uncompress_tarball(self):
|
|
|
|
pkgs = [_misc.get_test_entropy_package4(),
|
|
] #_misc.get_footar_package() disabled for now
|
|
for pkg in pkgs:
|
|
self._do_uncompress_tarball(pkg)
|
|
|
|
def _do_uncompress_tarball(self, pkg_path):
|
|
|
|
tmp_dir = tempfile.mkdtemp()
|
|
fd, tmp_file = tempfile.mkstemp()
|
|
|
|
path_perms = {}
|
|
|
|
# try with tar first
|
|
args = ["tar", "xjfp", pkg_path, "-C", tmp_dir]
|
|
proc = subprocess.Popen(args, stdout = fd, stderr = fd)
|
|
rc = proc.wait()
|
|
self.assertTrue(not rc)
|
|
os.close(fd)
|
|
|
|
for currentdir, subdirs, files in os.walk(tmp_dir):
|
|
for xfile in files:
|
|
path = os.path.join(currentdir, xfile)
|
|
fstat = os.lstat(path)
|
|
mode = stat.S_IMODE(fstat.st_mode)
|
|
uid, gid = fstat.st_uid, fstat.st_gid
|
|
path_perms[path] = (mode, uid, gid,)
|
|
|
|
self.assertTrue(path_perms)
|
|
|
|
shutil.rmtree(tmp_dir)
|
|
os.makedirs(tmp_dir)
|
|
|
|
# now try with our function
|
|
rc = et.uncompress_tarball(pkg_path, extract_path = tmp_dir)
|
|
self.assertTrue(not rc)
|
|
|
|
new_path_perms = {}
|
|
|
|
for currentdir, subdirs, files in os.walk(tmp_dir):
|
|
for xfile in files:
|
|
path = os.path.join(currentdir, xfile)
|
|
fstat = os.lstat(path)
|
|
mode = stat.S_IMODE(fstat.st_mode)
|
|
uid, gid = fstat.st_uid, fstat.st_gid
|
|
mystat = (mode, uid, gid,)
|
|
new_path_perms[path] = (mode, uid, gid,)
|
|
|
|
self.assertEqual(path_perms, new_path_perms)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
et.kill_threads()
|
|
raise SystemExit(0)
|