diff --git a/client/solo/commands/security.py b/client/solo/commands/security.py new file mode 100644 index 000000000..2d3ef1593 --- /dev/null +++ b/client/solo/commands/security.py @@ -0,0 +1,628 @@ +# -*- coding: utf-8 -*- +""" + + @author: Fabio Erculiani + @contact: lxnay@sabayon.org + @copyright: Fabio Erculiani + @license: GPL-2 + + B{Entropy Command Line Client}. + +""" +import os +import sys +import argparse + +from entropy.i18n import _ +from entropy.output import darkgreen, darkred, brown, blue, red, \ + darkblue, bold, purple, teal + +import entropy.tools + +from solo.commands.descriptor import SoloCommandDescriptor +from solo.commands.install import SoloInstall +from solo.utils import print_table + + +class SoloSecurity(SoloInstall): + """ + Main Solo Security command. + """ + + NAME = "security" + ALIASES = ["sec"] + ALLOW_UNPRIVILEGED = False + + INTRODUCTION = """\ +System security tools. +""" + SEE_ALSO = "" + + def __init__(self, args): + SoloInstall.__init__(self, args) + self._nsargs = None + self._commands = {} + + def man(self): + """ + Overridden from SoloCommand. + """ + return self._man() + + def _get_parser(self): + """ + Overridden from SoloCommand. + """ + _commands = {} + + descriptor = SoloCommandDescriptor.obtain_descriptor( + SoloSecurity.NAME) + parser = argparse.ArgumentParser( + description=descriptor.get_description(), + formatter_class=argparse.RawDescriptionHelpFormatter, + prog="%s %s" % (sys.argv[0], SoloSecurity.NAME)) + + self._setup_verbose_quiet_parser(parser) + + subparsers = parser.add_subparsers( + title="action", description=_("system security tools"), + help=_("available commands")) + + oscheck_parser = subparsers.add_parser( + "oscheck", + help=_("verify installed files using stored checksums")) + self._setup_verbose_quiet_parser(oscheck_parser) + oscheck_parser.add_argument( + "--mtime", action="store_true", default=False, + help=_("consider mtime instead of SHA256 " + "(false positives ahead)")) + oscheck_parser.add_argument( + "--assimilate", action="store_true", default=False, + help=_("update hashes and mtime (useful after " + "editing config files)")) + oscheck_parser.add_argument( + "--reinstall", action="store_true", default=False, + help=_("reinstall faulty packages")) + + mg_group = oscheck_parser.add_mutually_exclusive_group() + mg_group.add_argument( + "--ask", action="store_true", default=False, + help=_("ask before making any changes")) + mg_group.add_argument( + "--pretend", action="store_true", default=False, + help=_("show what would be done")) + oscheck_parser.add_argument( + "--fetch", action="store_true", default=False, + help=_("just download packages")) + + oscheck_parser.set_defaults(func=self._oscheck) + _commands["oscheck"] = { + "--mtime": {}, + "--assimilate": {}, + "--reinstall": {}, + "--pretend": {}, + "--ask": {}, + "--fetch": {}, + } + + update_parser = subparsers.add_parser( + "update", + help=_("download the latest Security Advisories")) + self._setup_verbose_quiet_parser(update_parser) + update_parser.add_argument( + "--force", action="store_true", default=False, + help=_("force download")) + + update_parser.set_defaults(func=self._update) + _commands["update"] = { + "--force": {}, + } + + list_parser = subparsers.add_parser( + "list", + help=_("list all the available Security Advisories")) + self._setup_verbose_quiet_parser(list_parser) + + mg_group = list_parser.add_mutually_exclusive_group() + mg_group.add_argument( + "--affected", action="store_true", default=False, + help=_("list only affected")) + mg_group.add_argument( + "--unaffected", action="store_true", default=False, + help=_("list only unaffected")) + list_parser.set_defaults(func=self._list) + _commands["list"] = { + "--affected": {}, + "--unaffected": {}, + } + + info_parser = subparsers.add_parser( + "info", + help=_("show information about provided " + "advisories identifiers")) + self._setup_verbose_quiet_parser(info_parser) + info_parser.add_argument( + "ids", nargs='+', + metavar="", help=_("advistory indentifier")) + info_parser.set_defaults(func=self._info) + _commands["info"] = {} + + + install_parser = subparsers.add_parser( + "install", + help=_("automatically install all the " + "available security updates")) + self._setup_verbose_quiet_parser(install_parser) + + mg_group = install_parser.add_mutually_exclusive_group() + mg_group.add_argument( + "--ask", action="store_true", default=False, + help=_("ask before making any changes")) + mg_group.add_argument( + "--pretend", action="store_true", default=False, + help=_("show what would be done")) + install_parser.add_argument( + "--fetch", action="store_true", default=False, + help=_("just download packages")) + + install_parser.set_defaults(func=self._install) + _commands["install"] = { + "--ask": {}, + "--fetch": {}, + "--pretend": {}, + } + + self._commands = _commands + return parser + + def parse(self): + """ + Parse command + """ + parser = self._get_parser() + try: + nsargs = parser.parse_args(self._args) + except IOError as err: + sys.stderr.write("%s\n" % (err,)) + return parser.print_help, [] + + self._nsargs = nsargs + return self._call_locked, [nsargs.func] + + def bashcomp(self, last_arg): + """ + Overridden from SoloCommand. + """ + self._get_parser() # this will generate self._commands + outcome = ["--quiet", "--verbose"] + return self._hierarchical_bashcomp( + last_arg, outcome, self._commands) + + def _print_advisory_information(self, entropy_client, + advisory_data, key): + """ + Print Security Advisory. + """ + toc = [] + + # print advisory code + toc.append( + blue(" @@ ") + \ + red("%s " % (_("Advisory Identifier"),)) + bold(key) + \ + red(" | ")+blue(advisory_data['url'])) + + # title + toc.append((darkgreen(" %s:" % (_("Title"),)), + darkred(advisory_data['title']))) + + # description + description = advisory_data['description'].split("\n") + desc_text = darkgreen(" %s:" % (_("Description"),) ) + for x in description: + toc.append((desc_text, x.strip())) + desc_text = " " + + for item in advisory_data['description_items']: + desc_text = " %s " % (darkred("(*)"),) + count = 8 + mystr = [] + for word in item.split(): + count -= 1 + mystr.append(word) + if count < 1: + toc.append((" ", desc_text+' '.join(mystr))) + desc_text = " " + mystr = [] + count = 8 + if count < 8: + toc.append((" ", desc_text+' '.join(mystr))) + + # background + if advisory_data['background']: + background = advisory_data['background'].split("\n") + bg_text = darkgreen(" %s:" % (_("Background"),)) + for x in background: + toc.append((bg_text, purple(x.strip()))) + bg_text = " " + + # access + if advisory_data['access']: + toc.append((darkgreen(" %s:" % (_("Exploitable"),)), + bold(advisory_data['access']))) + + # impact + if advisory_data['impact']: + impact = advisory_data['impact'].split("\n") + imp_text = darkgreen(" %s:" % (_("Impact"),)) + for x in impact: + toc.append((imp_text, brown(x.strip()))) + imp_text = " " + + # impact type + if advisory_data['impacttype']: + toc.append((darkgreen(" %s:" % (_("Impact type"),)), + bold(advisory_data['impacttype']))) + + # revised + if advisory_data['revised']: + toc.append((darkgreen(" %s:" % (_("Revised"),)), + brown(advisory_data['revised']))) + + # announced + if advisory_data['announced']: + toc.append((darkgreen(" %s:" % (_("Announced"),)), + brown(advisory_data['announced']))) + + # synopsis + synopsis = advisory_data['synopsis'].split("\n") + syn_text = darkgreen(" %s:" % (_("Synopsis"),)) + for x in synopsis: + toc.append((syn_text, x.strip())) + syn_text = " " + + # references + if advisory_data['references']: + toc.append(darkgreen(" %s:" % (_("References"),))) + for reference in advisory_data['references']: + toc.append((" ", darkblue(reference))) + + # gentoo bugs + if advisory_data['bugs']: + toc.append(darkgreen(" %s:" % (_("Upstream bugs"),))) + for bug in advisory_data['bugs']: + toc.append((" ", darkblue(bug))) + + # affected + if advisory_data['affected']: + toc.append(darkgreen(" %s:" % (_("Affected"),))) + for key in advisory_data['affected']: + toc.append((" ", darkred(key))) + affected_data = advisory_data['affected'][key][0] + vul_vers = affected_data['vul_vers'] + unaff_vers = affected_data['unaff_vers'] + if vul_vers: + toc.append((" ", brown("%s: " % ( + _("vulnerable versions"),))+", ".join(vul_vers))) + if unaff_vers: + toc.append((" ", brown("%s: " % ( + _("unaffected versions"),))+", ".join(unaff_vers))) + + # workaround + workaround = advisory_data['workaround'].split("\n") + if advisory_data['workaround']: + work_text = darkgreen(" %s:" % (_("Workaround"),)) + for x in workaround: + toc.append((work_text, darkred(x.strip()))) + work_text = " " + + # resolution + if advisory_data['resolution']: + res_text = darkgreen(" %s:" % (_("Resolution"),)) + resolutions = advisory_data['resolution'] + for resolution in resolutions: + for x in resolution.split("\n"): + toc.append((res_text, x.strip())) + res_text = " " + + print_table(entropy_client, toc, cell_spacing=3) + + def _oscheck(self, entropy_client): + """ + Solo Security Oscheck command. + """ + mtime = self._nsargs.mtime + assimilate = self._nsargs.assimilate + reinstall = self._nsargs.reinstall + verbose = self._nsargs.verbose + quiet = self._nsargs.quiet + ask = self._nsargs.ask + pretend = self._nsargs.pretend + fetch = self._nsargs.fetch + + inst_repo = entropy_client.installed_repository() + + if not quiet: + entropy_client.output( + "%s..." % ( + blue(_("Checking system files")),), + header=darkred(" @@ ")) + + pkg_ids = inst_repo.listAllPackageIds() + total = len(pkg_ids) + faulty_pkg_ids = [] + + for count, pkg_id in enumerate(pkg_ids, 1): + + pkg_atom = inst_repo.retrieveAtom(pkg_id) + sts_txt = "%s%s/%s%s %s" % ( + blue("["), + darkgreen(str(count)), + purple(str(total)), + blue("]"), + brown(pkg_atom)) + + if not quiet: + entropy_client.output( + sts_txt, + header=blue(" @@ "), back=True) + + cont_s = inst_repo.retrieveContentSafety(pkg_id) + if not cont_s: + if not quiet and verbose: + entropy_client.output( + "%s: %s" % ( + brown(pkg_atom), + _("no checksum information"),), + header=darkred(" @@ ")) + # if pkg provides content! + continue + + paths_tainted = [] + paths_unavailable = [] + for path, safety_data in cont_s.items(): + tainted = False + mtime = None + sha256 = None + + if not os.path.lexists(path): + # file does not exist + # NOTE: current behaviour is to ignore + # file not available + # this might change in future. + paths_unavailable.append(path) + continue + + elif not mtime: + # verify sha256 + sha256 = entropy.tools.sha256(path) + tainted = sha256 != safety_data['sha256'] + if tainted: + cont_s[path]['sha256'] = sha256 + else: + # verify mtime + mtime = os.path.getmtime(path) + tainted = mtime != safety_data['mtime'] + if tainted: + cont_s[path]['mtime'] = mtime + + if assimilate: + if mtime is None: + cont_s[path]['mtime'] = os.path.getmtime(path) + elif sha256 is None: + cont_s[path]['sha256'] = entropy.tools.sha256(path) + + if tainted: + paths_tainted.append(path) + + if paths_tainted: + faulty_pkg_ids.append(pkg_id) + paths_tainted.sort() + if not quiet: + entropy_client.output( + "%s: %s" % ( + teal(pkg_atom), + _("found altered files"),), + header=darkred(" @@ ")) + + for path in paths_tainted: + if quiet: + entropy_client.output( + path, + level="generic") + else: + txt = " %s" % (purple(path),) + entropy_client.output( + purple(path), + header=" ") + + if assimilate: + if not quiet: + entropy_client.output( + "%s, %s" % ( + sts_txt, + teal(_("assimilated new " + "hashes and mtime"))), + header=blue(" @@ ")) + inst_repo.setContentSafety(pkg_id, cont_s) + + if paths_unavailable: + paths_unavailable.sort() + if not quiet and verbose: + for path in paths_unavailable: + txt = " %s [%s]" % ( + teal(path), + purple(_("unavailable")) + ) + entropy_client.output(txt) + + if not faulty_pkg_ids: + if not quiet: + entropy_client.output( + darkgreen(_("No altered files found")), + header=darkred(" @@ ")) + return 0 + + rc = 0 + if faulty_pkg_ids: + rc = 10 + valid_matches = set() + + if reinstall and faulty_pkg_ids: + for pkg_id in faulty_pkg_ids: + key_slot = inst_repo.retrieveKeySlotAggregated(pkg_id) + package_id, repository_id = entropy_client.atom_match( + key_slot) + if package_id != -1: + valid_matches.add((package_id, repository_id)) + + if valid_matches: + rc= self._install_action( + entropy_client, True, True, + pretend, ask, False, quiet, False, + False, False, fetch, False, + False, 1, [], package_matches=list(valid_matches)) + + if not quiet: + entropy_client.output( + purple(_("Altered files have been found")), + header=darkred(" @@ ")) + if reinstall and (rc == 0) and valid_matches: + entropy_client.output( + purple(_("Packages have been " + "reinstalled successfully")), + header=darkred(" @@ ")) + + return rc + + def _update(self, entropy_client): + """ + Solo Security Update command. + """ + sec = entropy_client.Security() + return sec.sync(force=self._nsargs.force) + + def _list(self, entropy_client): + """ + Solo Security List command. + """ + affected = self._nsargs.affected + unaffected = self._nsargs.unaffected + sec = entropy_client.Security() + + adv_metadata = None + if not (affected or unaffected): + adv_metadata = sec.get_advisories_metadata() + elif affected: + adv_metadata = sec.get_vulnerabilities() + else: + adv_metadata = sec.get_fixed_vulnerabilities() + + if not adv_metadata: + entropy_client.output( + "%s." % ( + darkgreen(_("No advisories available or applicable")), + ), + header=brown(" :: ")) + return 0 + + for key in sorted(adv_metadata.keys()): + is_affected = sec.is_affected(key) + if affected and not is_affected: + continue + if unaffected and is_affected: + continue + if is_affected: + affection_string = darkred("A") + else: + affection_string = darkgreen("N") + affected_data = adv_metadata[key]['affected'] + if affected_data: + for a_key in list(affected_data.keys()): + k_data = adv_metadata[key]['affected'][a_key] + vulnerables = ', '.join(k_data[0]['vul_vers']) + description = "[Id:%s:%s][%s] %s: %s" % ( + darkgreen(key), + affection_string, + brown(vulnerables), + darkred(a_key), + blue(adv_metadata[key]['title'])) + entropy_client.output(description) + return 0 + + def _info(self, entropy_client): + """ + Solo Security Info command. + """ + advisories = self._nsargs.ids + + sec = entropy_client.Security() + adv_metadata = sec.get_advisories_metadata() + + exit_st = 1 + for advisory in advisories: + if advisory not in adv_metadata: + entropy_client.output( + "%s: %s." % ( + darkred(_("Advisory does not exist")), + blue(advisory),), + header=brown(" :: ")) + continue + self._print_advisory_information( + entropy_client, + adv_metadata[advisory], + key=advisory) + exit_st = 0 + + return exit_st + + def _install(self, entropy_client): + """ + Solo Security Install command. + """ + quiet = self._nsargs.quiet + pretend = self._nsargs.pretend + ask = self._nsargs.ask + fetch = self._nsargs.fetch + + sec = entropy_client.Security() + inst_repo = entropy_client.installed_repository() + + entropy_client.output( + "%s..." % ( + blue(_("Calculating security updates")),), + header=darkred(" @@ ")) + + affected_atoms = sec.get_affected_packages() + + valid_matches = set() + for atom in affected_atoms: + inst_package_id, pkg_rc = inst_repo.atomMatch(atom) + if pkg_rc != 0: + continue + + key_slot = inst_repo.retrieveKeySlotAggregated(inst_package_id) + package_id, repository_id = entropy_client.atom_match(key_slot) + if package_id != -1: + valid_matches.add((package_id, repository_id)) + + if not valid_matches: + entropy_client.output( + "%s." % ( + blue(_("All the available updates " + "have been already installed")),), + header=darkred(" @@ ")) + return 0 + + return self._install_action( + entropy_client, True, True, + pretend, ask, False, quiet, False, + False, False, fetch, False, + False, 1, [], package_matches=list(valid_matches)) + + +SoloCommandDescriptor.register( + SoloCommandDescriptor( + SoloSecurity, + SoloSecurity.NAME, + _("system security tools")) + ) diff --git a/docs/TODO b/docs/TODO index 68efbe7c9..017f83351 100644 --- a/docs/TODO +++ b/docs/TODO @@ -2,25 +2,6 @@ Backlog (raw) 1.0: - security - oscheck - --mtime - --assimilate - --reinstall - --quiet - --verbose - update - --force - list - --affected - --unaffected - info - install - --ask - --fetch - --pretend - --quiet - drop "append_xpak()" in xpaktools equo unused -> add to "equo query unused"