Files
entropy/libraries/entropy/qa.py

1271 lines
46 KiB
Python

# -*- coding: utf-8 -*-
"""
@author: Fabio Erculiani <lxnay@sabayon.org>
@contact: lxnay@sabayon.org
@copyright: Fabio Erculiani
@license: GPL-2
B{Entropy Framework QA module}.
This module contains various Quality Assurance routines used by Entropy.
B{QAInterface} is the main class for QA routines used by Entropy Server
and Entropy Client such as binary packages health check, dependency
test, broken or missing library tes.
B{ErrorReportInterface} is the HTTP POST based class for Entropy Client
exceptions (errors) submission.
"""
import os
import sys
import subprocess
import tempfile
from entropy.output import TextInterface
from entropy.misc import Lifo
from entropy.const import etpConst, etpSys, const_debug_write, const_debug_write
from entropy.output import blue, darkgreen, red, darkred, bold, purple, brown
from entropy.exceptions import PermissionDenied, SystemDatabaseError
from entropy.i18n import _
from entropy.core import EntropyPluginStore
from entropy.core.settings.base import SystemSettings
from entropy.db.skel import EntropyRepositoryPlugin
import entropy.tools
class QAEntropyRepositoryPlugin(EntropyRepositoryPlugin):
def __init__(self, qa_interface, metadata = None):
"""
Entropy QA-side repository EntropyRepository Plugin class.
This class will be instantiated and automatically added to
EntropyRepository instances generated by Entropy QA.
@param qa_interface: Entropy Client interface instance
@type qa_interface: entropy.qa.QAInterface class
@param metadata: any dict form metadata map (key => value)
@type metadata: dict
"""
EntropyRepositoryPlugin.__init__(self)
self._qa = qa_interface
if metadata is None:
self._metadata = {}
else:
self._metadata = metadata
# make sure we set client_repo metadata to True, this indicates
# EntropyRepository that we are a client-side repository
# Of course, it shouldn't make any diff to not set this, but we
# really want to make sure it's always enforced.
self._metadata['client_repo'] = True
def get_metadata(self):
return self._metadata
def get_id(self):
return "__qa__"
def add_plugin_hook(self, entropy_repository_instance):
const_debug_write(__name__,
"QAEntropyRepositoryPlugin: calling add_plugin_hook => %s" % (
self,)
)
out_intf = self._metadata.get('output_interface')
if out_intf is not None:
entropy_repository_instance.output = out_intf.output
entropy_repository_instance.ask_question = out_intf.ask_question
return 0
class QAInterfacePlugin:
"""
Inherit this class to create a QAInterface tests Plugin.
You need to implement
"""
def get_tests(self):
"""
Return a list of callable functions that will be used by QAInterface,
for tests execution.
Note: the callable functions signature must be:
bool function(package_path)
@return: list of callable objects
@rtype: list
"""
raise NotImplementedError()
def get_id(self):
"""
Return an identifier string for this Plugin.
@return: identifier string
@rtype: string
"""
raise NotImplementedError()
class QAInterface(TextInterface, EntropyPluginStore):
"""
Entropy QA interface. This class contains all the Entropy
QA routines used by Entropy Server and Entropy Client.
An instance of QAInterface can be easily retrieved from
entropy.client.interfaces.Client or entropy.server.interfaces.Server
through an exposed QA() method.
This is anyway a stand-alone class.
@todo: remove non-QA methods
"""
def __init__(self):
"""
QAInterface constructor.
"""
EntropyPluginStore.__init__(self)
self._settings = SystemSettings()
def add_plugin(self, plugin):
"""
Add a QAInterface plugin to the testing list.
@param plugin: QAInterfacePlugin based instance
@type plugin: QAInterface based instance
@raise AttributeError: if plugin is not a QAInterfacePlugin instance
"""
if not isinstance(plugin, QAInterfacePlugin):
raise AttributeError("Specify a QAInterfacePlugin based class")
return EntropyPluginStore.add_plugin(self, plugin.get_id(), plugin)
def test_reverse_dependencies_linking(self, idpackages, dbconn,
repo = None):
"""
Scan for broken shared objects linking for the given idpackages on
the given entropy.db.EntropyRepository based instance.
Note: this only works for packages actually installed on the running
system.
It is used by Entropy Server during packages injection into database
to warn about potentially broken packages.
@param idpackages: list of valid idpackages (int) on the given dbconn
argument passed
@type idpackages: list
@param dbconn: entropy.db.EntropyRepository instance containing the
given idpackages list
@type dbconn: entropy.db.EntropyRepository
@keyword repo: repository identifer from which dbconn and idpackages
arguments belong. Note: at the moment it's only used for output
purposes.
@type repo: string
@return: True if any breakage is found, otherwise False
@rtype: bool
"""
if repo is None:
repo = self._settings['repositories']['default_repository']
scan_msg = blue(_("Now searching for broken reverse dependencies"))
self.output(
"[repo:%s] %s..." % (
darkgreen(repo),
scan_msg,
),
importance = 1,
level = "info",
header = red(" @@ ")
)
broken = False
count = 0
maxcount = len(idpackages)
# excluded_dep_types = [etpConst['dependency_type_ids']['bdepend_id']]
for idpackage in idpackages:
count += 1
atom = dbconn.retrieveAtom(idpackage)
scan_msg = "%s, %s:" % (
blue(_("scanning for broken reverse dependencies")),
darkgreen(atom),
)
self.output(
"[repo:%s] %s" % (
darkgreen(repo),
scan_msg,
),
importance = 1,
level = "info",
header = blue(" @@ "),
back = True,
count = (count, maxcount,)
)
mydepends = dbconn.retrieveReverseDependencies(idpackage)
#, exclude_deptypes = excluded_dep_types)
if not mydepends:
continue
for mydepend in mydepends:
myatom = dbconn.retrieveAtom(mydepend)
self.output(
"[repo:%s] %s => %s" % (
darkgreen(repo),
darkgreen(atom),
darkred(myatom),
),
importance = 0,
level = "info",
header = blue(" @@ "),
back = True,
count = (count, maxcount,)
)
mycontent = dbconn.retrieveContent(mydepend)
mybreakages = self._content_test(mycontent)
if not mybreakages:
continue
broken = True
self.output(
"[repo:%s] %s %s => %s" % (
darkgreen(repo),
darkgreen(atom),
darkred(myatom),
bold(_("broken libraries detected")),
),
importance = 1,
level = "warning",
header = purple(" @@ "),
count = (count, maxcount,)
)
for mylib in mybreakages:
self.output(
"%s %s:" % (
darkgreen(mylib),
red(_("needs")),
),
importance = 1,
level = "warning",
header = brown(" ## ")
)
for needed in mybreakages[mylib]:
self.output(
"%s" % (
red(needed),
),
importance = 1,
level = "warning",
header = purple(" # ")
)
return broken
def test_missing_dependencies(self, idpackages, dbconn, ask = True,
self_check = False, repo = None, black_list = None,
black_list_adder = None):
"""
Scan missing dependencies for the given idpackages on the given
entropy.db.EntropyRepository "dbconn" instance. In addition, this method
will allow the user through OutputInterface to interactively add (if ask
== True) missing dependencies or blacklist them.
@param idpackages: list of valid idpackages (int) on the given dbconn
argument passed
@type idpackages: list
@param dbconn: entropy.db.EntropyRepository instance containing the
given idpackages list
@type dbconn: entropy.db.EntropyRepository
@keyword ask: request user interaction when finding missing dependencies
@type ask: bool
@keyword self_check: also introspect inside the complaining package
(to avoid reporting false positives when circular dependencies
occur)
@type self_check: bool
@keyword repo: repository identifier of the given
entropy.db.EntropyRepository dbconn instance.
It is used to correctly place blacklisted items.
@type repo: string
@keyword black_list: list of dependencies already blacklisted.
@type black_list: set
@keyword black_list_adder: callable function that accepts two arguments:
(1) list (set) of new dependencies to blacklist for the
given (2) repository identifier.
@type black_list_adder: callable
@return: tainting status, if any dependency has been added
@rtype: bool
"""
if repo is None:
repo = self._settings['repositories']['default_repository']
if not isinstance(black_list, set):
black_list = set()
taint = False
scan_msg = blue(_("Now searching for missing RDEPENDs"))
self.output(
"[repo:%s] %s..." % (
darkgreen(repo),
scan_msg,
),
importance = 1,
level = "info",
header = red(" @@ ")
)
scan_msg = blue(_("scanning for missing RDEPENDs"))
count = 0
maxcount = len(idpackages)
for idpackage in idpackages:
count += 1
atom = dbconn.retrieveAtom(idpackage)
if not atom:
continue
self.output(
"[repo:%s] %s: %s" % (
darkgreen(repo),
scan_msg,
darkgreen(atom),
),
importance = 1,
level = "info",
header = blue(" @@ "),
back = True,
count = (count, maxcount,)
)
missing_extended, missing = self._get_missing_rdepends(dbconn,
idpackage, self_check = self_check)
old_missing = missing.copy()
missing -= black_list
for item in list(missing_extended.keys()):
missing_extended[item] -= black_list
if not missing_extended[item]:
del missing_extended[item]
if missing != old_missing:
# print big warning to make dev aware at least
old_missing -= missing
if old_missing:
self.output(
"[repo:%s] %s: %s %s:" % (
darkgreen(repo),
darkred("package"),
darkgreen(atom),
darkred(_("blacklisted dependencies !!!")),
),
importance = 1,
level = "warning",
header = bold(" @@ "),
count = (count, maxcount,)
)
for dep in sorted(old_missing):
self.output(
"%s" % (bold(dep),),
importance = 0,
level = "info",
header = blue(" # ")
)
if (not missing) or (not missing_extended):
continue
self.output(
"[repo:%s] %s: %s %s:" % (
darkgreen(repo),
blue("package"),
darkgreen(atom),
blue(_("is missing the following dependencies")),
),
importance = 1,
level = "info",
header = red(" @@ "),
count = (count, maxcount,)
)
for missing_data in missing_extended:
self.output(
"%s:" % (brown(repr(missing_data)),),
importance = 0,
level = "info",
header = purple(" ## ")
)
for dependency in missing_extended[missing_data]:
self.output(
"%s" % (darkred(dependency),),
importance = 0,
level = "info",
header = blue(" # ")
)
if ask:
rc_ask = self.ask_question(_("Do you want to add them?"))
if rc_ask == _("No"):
continue
rc_ask = self.ask_question(_("Selectively?"))
if rc_ask == _("Yes"):
newmissing = set()
new_blacklist = set()
for dependency in missing:
self.output(
"[repo:%s|%s] %s" % (
darkgreen(repo),
brown(atom),
blue(dependency),
),
importance = 0,
level = "info",
header = blue(" @@ ")
)
rc_ask = self.ask_question(_("Want to add?"))
if rc_ask == _("Yes"):
newmissing.add(dependency)
### XXX: disabled, devs are not able to use it properly
### needs usability fixes
#else:
#rc_ask = self.ask_question(
# _("Want to blacklist?"))
#if rc_ask == _("Yes"):
# new_blacklist.add(dependency)
if new_blacklist and (black_list_adder != None):
black_list_adder(new_blacklist, repo = repo)
missing = newmissing
if missing:
taint = True
dbconn.insertDependencies(idpackage, missing)
dbconn.commitChanges()
self.output(
"[repo:%s] %s: %s" % (
darkgreen(repo),
darkgreen(atom),
blue(_("missing dependencies added")),
),
importance = 1,
level = "info",
header = red(" @@ "),
count = (count, maxcount,)
)
return taint
def test_shared_objects(self, dbconn, broken_symbols = False,
task_bombing_func = None, self_dir_check = True,
dump_results_to_file = False):
"""
Scan system looking for broken shared object ELF library dependencies.
@param dbconn: entropy.db.EntropyRepository instance which contains
information on packages installed on the system (for example:
entropy.client.interfaces.Client.installed_repository() ).
@type dbconn: entropy.db.EntropyRepository instance
@keyword broken_symbols: enable or disable broken symbols extra check.
Symbols which are going to be checked have to be listed into:
/etc/entropy/brokensyms.conf (regexp supported).
@type broken_symbols: bool
@keyword task_bombing_func: callable that will be called on every
scan iteration to allow external routines to cleanly stop the
execution of this function.
@type task_bombing_func: callable
@keyword dump_results_to_file: dump test results to files (printed)
@type dump_results_to_file: bool
@return: tuple of length 3, composed by (1) a dict of matched packages,
(2) a list (set) of broken ELF objects and (3) the execution status
(int, 0 means success).
@rtype: tuple
"""
self.output(
blue(_("Libraries test")),
importance = 2,
level = "info",
header = red(" @@ ")
)
syms_list_path = None
files_list_path = None
if dump_results_to_file:
tmp_dir = tempfile.mkdtemp()
syms_list_path = os.path.join(tmp_dir, "libtest_syms.txt")
files_list_path = os.path.join(tmp_dir, "libtest_files.txt")
dmp_data = [
(_("Broken symbols packages list"), syms_list_path,),
(_("Broken executables list"), files_list_path,),
]
mytxt = "%s:" % (purple(_("Dumping results into these files")),)
self.output(
mytxt,
importance = 1,
level = "info",
header = blue(" @@ ")
)
for txt, path in dmp_data:
mytxt = "%s: %s" % (blue(txt), path,)
self.output(
mytxt,
importance = 0,
level = "info",
header = darkgreen(" ## ")
)
myroot = etpConst['systemroot'] + os.path.sep
if not etpConst['systemroot']:
myroot = os.path.sep
# run ldconfig first
subprocess.call("ldconfig -r %s &> /dev/null" % (myroot,), shell = True)
# open /etc/ld.so.conf
ld_conf = etpConst['systemroot'] + "/etc/ld.so.conf"
if not os.path.isfile(ld_conf):
self.output(
blue(_("Cannot find "))+red(ld_conf),
importance = 1,
level = "error",
header = red(" @@ ")
)
return {}, set(), -1
reverse_symlink_map = self._settings['system_rev_symlinks']
broken_syms_list = self._settings['broken_syms']
broken_libs_mask = self._settings['broken_libs_mask']
import re
broken_syms_list_regexp = []
for broken_sym in broken_syms_list:
if broken_sym.startswith("r:"):
broken_sym = broken_sym[2:]
if not broken_sym:
continue
reg_sym = re.compile(broken_sym)
else:
reg_sym = re.compile(re.escape(broken_sym))
broken_syms_list_regexp.append(reg_sym)
broken_libs_mask_regexp = []
broken_libs_paths_mask_regexp = []
for broken_lib in broken_libs_mask:
if broken_lib.startswith("r:"):
broken_lib = broken_lib[2:]
if not broken_lib:
continue
reg_lib = re.compile(broken_lib)
else:
reg_lib = re.compile(re.escape(broken_lib))
if os.path.sep in broken_lib:
# it's a path, not a lib name
broken_libs_paths_mask_regexp.append(reg_lib)
else:
broken_libs_mask_regexp.append(reg_lib)
ldpaths = set(entropy.tools.collect_linker_paths())
ldpaths.update(entropy.tools.collect_paths())
# some crappy packages put shit here too
ldpaths.add("/usr/share")
# always force /usr/libexec too
ldpaths.add("/usr/libexec")
# remove duplicated dirs (due to symlinks) to speed up scanning
for real_dir in list(reverse_symlink_map.keys()):
syms = reverse_symlink_map[real_dir]
for sym in syms:
if sym in ldpaths:
ldpaths.discard(real_dir)
self.output(
"%s: %s, %s: %s" % (
brown(_("discarding directory")),
purple(real_dir),
brown(_("because it's symlinked on")),
purple(sym),
),
importance = 0,
level = "info",
header = darkgreen(" @@ ")
)
break
executables = set()
total = len(ldpaths)
count = 0
sys_root_len = len(etpConst['systemroot'])
for ldpath in ldpaths:
if hasattr(task_bombing_func, '__call__'):
task_bombing_func()
count += 1
self.output(
blue("Tree: ")+red(etpConst['systemroot'] + ldpath),
importance = 0,
level = "info",
count = (count, total),
back = True,
percent = True,
header = " "
)
try:
ldpath = ldpath.encode('utf-8')
except (UnicodeEncodeError,):
ldpath = ldpath.encode(sys.getfilesystemencoding())
mywalk_iter = os.walk(etpConst['systemroot'] + ldpath)
def mywimf(dt):
currentdir, subdirs, files = dt
def mymf(item):
filepath = os.path.join(currentdir, item)
if not os.access(filepath, os.R_OK):
return 0
if not os.path.isfile(filepath):
return 0
if not entropy.tools.is_elf_file(filepath):
return 0
return filepath[sys_root_len:]
return set([x for x in map(mymf, files) if not isinstance(x, int)])
for x in map(mywimf, mywalk_iter):
executables |= x
self.output(
blue(_("Collecting broken executables")),
importance = 2,
level = "info",
header = red(" @@ ")
)
t = red(_("Attention")) + ": " + \
blue(_("don't worry about libraries that are shown here but not later."))
self.output(
t,
importance = 1,
level = "info",
header = red(" @@ ")
)
syms_list_f = None
if syms_list_path:
syms_list_f = open(syms_list_path, "w")
files_list_f = None
if files_list_path:
files_list_f = open(files_list_path, "w")
plain_brokenexecs = set()
total = len(executables)
count = 0
scan_txt = blue("%s ..." % (_("Scanning libraries"),))
for executable in executables:
# task bombing hook
if hasattr(task_bombing_func, '__call__'):
task_bombing_func()
count += 1
if (count % 10 == 0) or (count == total) or (count == 1):
self.output(
scan_txt,
importance = 0,
level = "info",
count = (count, total),
back = True,
percent = True,
header = " "
)
# filter broken paths
# there are paths known to be broken and must be
# excluded to avoid noisy false positives
exec_match = False
for reg_path in broken_libs_paths_mask_regexp:
if reg_path.match(executable):
exec_match = True
break
if exec_match:
continue
real_exec_path = etpConst['systemroot'] + executable
myelfs = entropy.tools.read_elf_dynamic_libraries(
real_exec_path)
mylibs = set()
for mylib in myelfs:
lib_path = entropy.tools.resolve_dynamic_library(mylib,
executable)
if not lib_path:
mylibs.add(mylib)
# filter broken libraries
if mylibs:
mylib_filter = set()
for mylib in mylibs:
mylib_matched = False
for reg_lib in broken_libs_mask_regexp:
if reg_lib.match(mylib):
mylib_matched = True
break
if mylib_matched: # filter out
mylib_filter.add(mylib)
elif self_dir_check:
# check inside the same directory of the failing ELF
# obviously, we're looking for another ELF object
my_real_exec_dir = os.path.dirname(real_exec_path)
mylib_guess = os.path.join(my_real_exec_dir, mylib)
if os.access(mylib_guess, os.R_OK) and \
os.path.isfile(mylib_guess):
if entropy.tools.is_elf_file(mylib_guess):
# we have found the missing library,
# which wasn't in LDPATH, booooo @ package
# developers !! boooo!
mylib_filter.add(mylib)
mylibs -= mylib_filter
broken_sym_found = set()
if broken_symbols and not mylibs:
read_broken_syms = entropy.tools.read_elf_broken_symbols(
real_exec_path)
my_broken_syms = set()
for read_broken_sym in read_broken_syms:
for reg_sym in broken_syms_list_regexp:
if reg_sym.match(read_broken_sym):
my_broken_syms.add(read_broken_sym)
break
broken_sym_found.update(my_broken_syms)
if not (mylibs or broken_sym_found):
continue
if mylibs:
if files_list_f:
files_list_f.write(executable + "\n")
alllibs = blue(' :: ').join(sorted(mylibs))
self.output(
red(real_exec_path)+" [ "+alllibs+" ]",
importance = 1,
level = "info",
percent = True,
count = (count, total),
header = " "
)
elif broken_sym_found:
allsyms = darkred(' :: ').join([brown(x) for x in \
broken_sym_found])
if len(allsyms) > 50:
allsyms = brown(_('various broken symbols'))
if syms_list_f and broken_sym_found:
syms_list_f.write("%s => %s\n" % (real_exec_path,
sorted(broken_sym_found),))
self.output(
red(real_exec_path)+" { "+allsyms+" }",
importance = 1,
level = "info",
percent = True,
count = (count, total),
header = " "
)
plain_brokenexecs.add(executable)
# close open files
if syms_list_f:
syms_list_f.flush()
syms_list_f.close()
if files_list_f:
files_list_f.flush()
files_list_f.close()
del executables
pkgs_matched = {}
if not etpSys['serverside']:
# we are client side
# this is hackish and must be fixed sooner or later
# but for now, it works
# Client class is singleton and is surely already
# loaded when we get here
from entropy.client.interfaces import Client
client = Client()
self.output(
blue(_("Matching broken libraries/executables")),
importance = 1,
level = "info",
header = red(" @@ ")
)
matched = set()
for brokenlib in plain_brokenexecs:
idpackages = dbconn.searchBelongs(brokenlib) # test with /usr/lib
for idpackage in idpackages:
key, slot = dbconn.retrieveKeySlot(idpackage)
mymatch = client.atom_match(key, match_slot = slot)
if mymatch[0] == -1:
matched.add(brokenlib)
continue
cmpstat = client.get_package_action(mymatch)
if cmpstat == 0:
continue
if brokenlib not in pkgs_matched:
pkgs_matched[brokenlib] = set()
pkgs_matched[brokenlib].add(mymatch)
matched.add(brokenlib)
plain_brokenexecs -= matched
return pkgs_matched, plain_brokenexecs, 0
def _content_test(self, mycontent):
"""
Test whether the given list of files contain files
with broken shared object links.
@param mycontent: list of file paths
@type mycontent: list or set
@return: dict containing a map between file path
and list (set) of broken libraries (just the library name,
the same that is contained inside ELF metadata)
@rtype: dict
"""
def is_contained(needed, content):
for item in content:
if os.path.basename(item) == needed:
return True
return False
mylibs = {}
for myfile in mycontent:
myfile = myfile.encode('raw_unicode_escape')
if not os.access(myfile, os.R_OK):
continue
if not os.path.isfile(myfile):
continue
if not entropy.tools.is_elf_file(myfile):
continue
mylibs[myfile] = entropy.tools.read_elf_dynamic_libraries(
myfile)
broken_libs = {}
for mylib in mylibs:
for myneeded in mylibs[mylib]:
# is this inside myself ?
if is_contained(myneeded, mycontent):
continue
found = entropy.tools.resolve_dynamic_library(myneeded,
mylib)
if found:
continue
if mylib not in broken_libs:
broken_libs[mylib] = set()
broken_libs[mylib].add(myneeded)
return broken_libs
def _get_missing_rdepends(self, dbconn, idpackage, self_check = False):
"""
Service method able to determine whether dependencies are missing
on the given idpackage (belonging to the given
entropy.db.EntropyRepository "dbconn" argument) using shared objects
linking information between packages.
@todo: swap the first two arguments?
@param dbconn: entropy.db.EntropyRepository instance from which idpackage
argument belongs
@type dbconn: entropy.db.EntropyRepository instance
@param idpackage: entropy.db.EntropyRepository package identifier
@type idpackage: int
@keyword self_check: also check inside the given package
(idpackage) itself
@type self_check: bool
@return: tuple of length 2, composed by a dictionary with the
following structure:
{('KEY', 'SLOT': set([list of missing deps for the given key])}
and a "plain" list (set) of missing dependencies
set([list of missing dependencies])
@rtype: tuple
"""
rdepends = {}
rdepends_plain = set()
neededs = dbconn.retrieveNeeded(idpackage, extended = True)
ldpaths = set(entropy.tools.collect_linker_paths())
deps_content = set()
dependencies = self.get_deep_dependency_list(dbconn, idpackage,
atoms = True)
scope_cache = set()
def update_depscontent(mycontent, dbconn, ldpaths):
return set( \
[x for x in mycontent if os.path.dirname(x) in ldpaths \
and (dbconn.isNeededAvailable(os.path.basename(x)) > 0) ])
def is_in_content(myneeded, content):
for item in content:
item = os.path.basename(item)
if myneeded == item:
return True
return False
for dependency in dependencies:
match = dbconn.atomMatch(dependency)
if match[0] != -1:
mycontent = dbconn.retrieveContent(match[0])
deps_content |= update_depscontent(mycontent, dbconn, ldpaths)
key, slot = dbconn.retrieveKeySlot(match[0])
scope_cache.add((key, slot))
key, slot = dbconn.retrieveKeySlot(idpackage)
mycontent = dbconn.retrieveContent(idpackage)
deps_content |= update_depscontent(mycontent, dbconn, ldpaths)
scope_cache.add((key, slot))
idpackages_cache = set()
idpackage_map = {}
idpackage_map_reverse = {}
for needed, elfclass in neededs:
data_solved = dbconn.resolveNeeded(needed, elfclass = elfclass,
extended = True)
data_size = len(data_solved)
data_solved = set([x for x in data_solved if x[0] \
not in idpackages_cache])
if not data_solved or (data_size != len(data_solved)):
continue
if self_check:
if is_in_content(needed, mycontent):
continue
found = False
for data in data_solved:
if data[1] in deps_content:
found = True
break
if not found:
for data in data_solved:
r_idpackage = data[0]
key, slot = dbconn.retrieveKeySlot(r_idpackage)
if (key, slot) not in scope_cache:
if not dbconn.isSystemPackage(r_idpackage):
if (needed, elfclass) not in rdepends:
rdepends[(needed, elfclass)] = set()
if (needed, elfclass) not in idpackage_map:
idpackage_map[(needed, elfclass)] = set()
keyslot = "%s%s%s" % (key,
etpConst['entropyslotprefix'], slot,)
obj = idpackage_map_reverse.setdefault(
keyslot, set())
obj.add((needed, elfclass,))
rdepends[(needed, elfclass)].add(keyslot)
idpackage_map[(needed, elfclass)].add(r_idpackage)
rdepends_plain.add(keyslot)
idpackages_cache.add(r_idpackage)
# now reduce dependencies
r_deplist = set()
for key in idpackage_map:
r_idpackages = idpackage_map.get(key)
for r_idpackage in r_idpackages:
r_deplist |= dbconn.retrieveDependencies(r_idpackage,
exclude_deptypes = \
[etpConst['dependency_type_ids']['bdepend_id']])
r_keyslots = set()
for r_dep in r_deplist:
m_idpackage, m_rc = dbconn.atomMatch(r_dep)
if m_rc != 0:
continue
keyslot = dbconn.retrieveKeySlotAggregated(m_idpackage)
if keyslot in rdepends_plain:
r_keyslots.add(keyslot)
rdepends_plain -= r_keyslots
for r_keyslot in r_keyslots:
keys = [x for x in idpackage_map_reverse.get(keyslot, set()) if \
x in rdepends]
for key in keys:
rdepends[key].discard(r_keyslot)
if not rdepends[key]:
del rdepends[key]
return rdepends, rdepends_plain
def get_deep_dependency_list(self, dbconn, idpackage, atoms = False):
"""
Service method which returns a complete, expanded list of dependencies
for the given idpackage on the given entropy.db.EntropyRepository
"dbconn" instance.
NOTE: this method will only return dependencies that are NOT build
dependencies.
@param dbconn: entropy.db.EntropyRepository instance which contains
the given idpackage item.
@type dbconn: entropy.db.EntropyRepository instance
@param idpackage: Entropy database package key
@type idpackage: int
@keyword atoms: !! return type modifier !! , make method returning
a list of atom strings instead of list of db match tuples.
@type atoms: bool
@return: list of dependencies in form of matching tuple list
set([123,321,...idpackage,]) or plain dependency list (if
atom == True -- set([atom_string1, atom_string2, atom_string3])
@rtype: list or set
"""
excluded_dep_types = [etpConst['dependency_type_ids']['bdepend_id']]
mybuffer = Lifo()
matchcache = set()
depcache = set()
mydeps = dbconn.retrieveDependencies(idpackage,
exclude_deptypes = excluded_dep_types)
for mydep in mydeps:
mybuffer.push(mydep)
try:
mydep = mybuffer.pop()
except ValueError:
mydep = None # stack empty
while mydep:
if mydep in depcache:
try:
mydep = mybuffer.pop()
except ValueError:
break # stack empty
continue
my_idpackage, my_rc = dbconn.atomMatch(mydep)
if atoms:
matchcache.add(mydep)
else:
matchcache.add(my_idpackage)
if my_idpackage != -1:
owndeps = dbconn.retrieveDependencies(my_idpackage,
exclude_deptypes = excluded_dep_types)
for owndep in owndeps:
mybuffer.push(owndep)
depcache.add(mydep)
try:
mydep = mybuffer.pop()
except ValueError:
break # stack empty
# always discard -1 in set
matchcache.discard(-1)
return matchcache
def __analyze_package_edb(self, pkg_path):
"""
Check if the physical Entropy package file contains
a valid Entropy embedded database.
@param pkg_path: path to physical entropy package file
@type pkg_path: string
@return: package validity
@rtype: bool
"""
from entropy.db import EntropyRepository
from entropy.db.exceptions import Error
fd, tmp_path = tempfile.mkstemp()
dump_rc = entropy.tools.dump_entropy_metadata(pkg_path, tmp_path)
if not dump_rc:
os.remove(tmp_path)
os.close(fd)
return False # error!
try:
dbc = EntropyRepository(
readOnly = False,
dbFile = tmp_path,
dbname = 'qa_testing',
xcache = False,
indexing = False,
skipChecks = False
)
etp_repo_meta = {
'output_interface': self,
}
repo_plug = QAEntropyRepositoryPlugin(self,
metadata = etp_repo_meta)
dbc.add_plugin(repo_plug)
except Error:
os.remove(tmp_path)
os.close(fd)
return False
valid = True
try:
dbc.validateDatabase()
except SystemDatabaseError:
valid = False
if valid:
try:
for idpackage in dbc.listAllIdpackages():
dbc.retrieveContent(idpackage, extended = True,
formatted = True, insert_formatted = True)
except Error:
valid = False
dbc.closeDB()
os.remove(tmp_path)
os.close(fd)
return valid
def entropy_package_checks(self, package_path):
"""
Main method for the execution of QA tests on physical Entropy
package files.
@param package_path: path to physical Entropy package file path
@type package_path: string
@return: True, if all checks passed
@rtype: bool
"""
# built-in ones
qa_methods = [self.__analyze_package_edb]
# plugged ones
for plug_id, plug_inst in self.get_plugins().items():
qa_methods += plug_inst.get_tests()
# let's play!
for method in qa_methods:
qa_rc = method(package_path)
if not qa_rc:
return False
return True
class ErrorReportInterface:
"""
Interface used by Entropy Client to remotely send errors via HTTP POST.
Some anonymous info about the running system are collected and sent over,
once the user gives the acknowledgement for this operation.
User should be asked for valid credentials, such as name, surname and email.
This has two advantages: block stupid and lazy people and make possible
for Entropy developers to contact him/her back.
Moreover, the same applies for a simple description. To improve the
ability to debug an issue, it is also asked the user to describe his/her
action prior to the error.
Sample code:
>>> from entropy.qa import ErrorReportInterface
>>> error = ErrorReportInterface('http://url_for_http_post')
>>> error.prepare('traceback_text', 'John Foo', 'john@foo.com',
report_data = 'extra traceback info',
description = 'I was installing foo!')
>>> error.submit()
"""
def __init__(self, post_url):
"""
ErrorReportInterface constructor.
@param post_url: HTTP post url where to submit data
@type post_url: string
"""
from entropy.misc import MultipartPostHandler
if sys.hexversion >= 0x3000000:
import urllib.request as urlmod
else:
import urllib2 as urlmod
self.url = post_url
self.opener = urlmod.build_opener(MultipartPostHandler)
self.generated = False
self.params = {}
sys_settings = SystemSettings()
proxy_settings = sys_settings['system']['proxy']
mydict = {}
if proxy_settings['ftp']:
mydict['ftp'] = proxy_settings['ftp']
if proxy_settings['http']:
mydict['http'] = proxy_settings['http']
if mydict:
mydict['username'] = proxy_settings['username']
mydict['password'] = proxy_settings['password']
entropy.tools.add_proxy_opener(urlmod, mydict)
else:
# unset
urlmod._opener = None
def prepare(self, tb_text, name, email, report_data = "", description = ""):
"""
This method must be called prior to submit(). It is used to prepare
and collect system information before the submission.
It is intentionally split from submit() to allow easy reimplementation.
@param tb_text: Python traceback text to send
@type tb_text: string
@param name: submitter name
@type name: string
@param email: submitter email address
@type email: string
@keyword report_data: extra information
@type report_data: string
@keyword description: submitter action description
@type description: string
@return: None
@rtype: None
"""
from entropy.tools import getstatusoutput
self.params['arch'] = etpConst['currentarch']
self.params['stacktrace'] = tb_text
self.params['name'] = name
self.params['email'] = email
self.params['version'] = etpConst['entropyversion']
self.params['errordata'] = report_data
self.params['description'] = description
self.params['arguments'] = ' '.join(sys.argv)
self.params['uid'] = etpConst['uid']
self.params['system_version'] = "N/A"
if os.access(etpConst['systemreleasefile'], os.R_OK):
f_rel = open(etpConst['systemreleasefile'], "r")
self.params['system_version'] = f_rel.readline().strip()
f_rel.close()
self.params['processes'] = getstatusoutput('ps auxf')[1]
self.params['lsof'] = getstatusoutput('lsof -p %s' % (os.getpid(),))[1]
self.params['lspci'] = getstatusoutput('/usr/sbin/lspci')[1]
self.params['dmesg'] = getstatusoutput('dmesg')[1]
self.params['locale'] = getstatusoutput('locale -v')[1]
self.generated = True
# params is a dict, key(HTTP post item name): value
def submit(self):
"""
Submit collected data remotely via HTTP POST.
@raise PermissionDenied: when prepare() hasn't been called.
@return: None
@rtype: None
"""
if self.generated:
result = self.opener.open(self.url, self.params).read()
if result.strip() == "1":
return True
return False
else:
mytxt = _("Not prepared yet")
raise PermissionDenied("PermissionDenied: %s" % (mytxt,))