Files
entropy/client/text_security.py

441 lines
15 KiB
Python

# -*- coding: utf-8 -*-
"""
@author: Fabio Erculiani <lxnay@sabayon.org>
@contact: lxnay@sabayon.org
@copyright: Fabio Erculiani
@license: GPL-2
B{Entropy Package Manager Client}.
"""
import os
from entropy.const import etpConst, etpUi
from entropy.output import red, darkred, blue, brown, darkgreen, darkblue, \
bold, purple, green, teal, print_error, print_warning, print_info, \
print_generic
from entropy.cli import print_table
from entropy.i18n import _
import entropy.tools
def security(options):
rc = 0
if not options:
return -10
only_affected = False
only_unaffected = False
fetch = False
force = False
mtime = False
reinstall = False
assimilate = False
for opt in options:
if opt == "--affected":
only_affected = True
elif opt == "--unaffected":
only_unaffected = True
elif opt == "--fetch":
fetch = True
elif opt == "--force":
force = True
elif opt == "--mtime":
mtime = True
elif opt == "--reinstall":
reinstall = True
elif opt == "--assimilate":
assimilate = True
cmd = options[0]
from entropy.client.interfaces import Client
entropy_client = None
try:
entropy_client = Client()
if cmd == "update":
security_intf = entropy_client.Security()
er_txt = darkred(_("You must be either root or in this group:")) + \
" " + etpConst['sysgroup']
if not entropy.tools.is_user_in_entropy_group():
print_error(er_txt)
return 1
rc = security_intf.sync(force = force)
elif cmd == "list":
security_intf = entropy_client.Security()
rc = list_advisories(security_intf, only_affected = only_affected,
only_unaffected = only_unaffected)
elif cmd == "oscheck":
if not entropy.tools.is_root():
er_txt = darkred(_("You must be an administrator."))
print_error(er_txt)
return 1
rc = oscheck(entropy_client, mtime_only = mtime,
reinstall = reinstall, assimilate = assimilate)
elif cmd == "install":
acquired = False
try:
acquired = entropy.tools.acquire_entropy_locks(entropy_client)
if not acquired:
print_error(darkgreen(
_("Another Entropy is currently running.")))
return 1
security_intf = entropy_client.Security()
rc = install_packages(entropy_client, security_intf,
fetch = fetch)
finally:
if acquired:
entropy.tools.release_entropy_locks(entropy_client)
elif cmd == "info":
security_intf = entropy_client.Security()
rc = show_advisories_info(security_intf, options[1:])
else:
rc = -10
finally:
if entropy_client is not None:
entropy_client.shutdown()
return rc
def show_advisories_info(security_intf, advisories):
if not advisories:
print_error(brown(" :: ")+darkgreen("%s." % (
_("No advisories provided"),)))
return 1
adv_metadata = security_intf.get_advisories_metadata()
for advisory in advisories:
if advisory not in adv_metadata:
print_warning(brown(" :: ") + darkred("%s " % (_("Advisory"),)) + \
blue(advisory) + darkred(" %s." % (_("does not exist"),)))
continue
print_advisory_information(adv_metadata[advisory], key = advisory)
return 0
def print_advisory_information(advisory_data, key):
toc = []
# print advisory code
toc.append(blue(" @@ ")+red("%s " % (_("GLSA Identifier"),))+bold(key) + \
red(" | ")+blue(advisory_data['url']))
# title
toc.append((darkgreen(" %s:" % (_("Title"),)),
darkred(advisory_data['title'])))
# description
description = advisory_data['description'].split("\n")
desc_text = darkgreen(" %s:" % (_("Description"),) )
for x in description:
toc.append((desc_text, x.strip()))
desc_text = " "
for item in advisory_data['description_items']:
desc_text = " %s " % (darkred("(*)"),)
count = 8
mystr = []
for word in item.split():
count -= 1
mystr.append(word)
if count < 1:
toc.append((" ", desc_text+' '.join(mystr)))
desc_text = " "
mystr = []
count = 8
if count < 8:
toc.append((" ", desc_text+' '.join(mystr)))
# background
if advisory_data['background']:
background = advisory_data['background'].split("\n")
bg_text = darkgreen(" %s:" % (_("Background"),))
for x in background:
toc.append((bg_text, purple(x.strip())))
bg_text = " "
# access
if advisory_data['access']:
toc.append((darkgreen(" %s:" % (_("Exploitable"),)),
bold(advisory_data['access'])))
# impact
if advisory_data['impact']:
impact = advisory_data['impact'].split("\n")
imp_text = darkgreen(" %s:" % (_("Impact"),))
for x in impact:
toc.append((imp_text, brown(x.strip())))
imp_text = " "
# impact type
if advisory_data['impacttype']:
toc.append((darkgreen(" %s:" % (_("Impact type"),)),
bold(advisory_data['impacttype'])))
# revised
if advisory_data['revised']:
toc.append((darkgreen(" %s:" % (_("Revised"),)),
brown(advisory_data['revised'])))
# announced
if advisory_data['announced']:
toc.append((darkgreen(" %s:" % (_("Announced"),)),
brown(advisory_data['announced'])))
# synopsis
synopsis = advisory_data['synopsis'].split("\n")
syn_text = darkgreen(" %s:" % (_("Synopsis"),))
for x in synopsis:
toc.append((syn_text, x.strip()))
syn_text = " "
# references
if advisory_data['references']:
toc.append(darkgreen(" %s:" % (_("References"),)))
for reference in advisory_data['references']:
toc.append((" ", darkblue(reference)))
# gentoo bugs
if advisory_data['bugs']:
toc.append(darkgreen(" %s:" % (_("Upstream bugs"),)))
for bug in advisory_data['bugs']:
toc.append((" ", darkblue(bug)))
# affected
if advisory_data['affected']:
toc.append(darkgreen(" %s:" % (_("Affected"),)))
for key in advisory_data['affected']:
toc.append((" ", darkred(key)))
affected_data = advisory_data['affected'][key][0]
vul_vers = affected_data['vul_vers']
unaff_vers = affected_data['unaff_vers']
if vul_vers:
toc.append((" ", brown("%s: " % (
_("vulnerable versions"),))+", ".join(vul_vers)))
if unaff_vers:
toc.append((" ", brown("%s: " % (
_("unaffected versions"),))+", ".join(unaff_vers)))
# workaround
workaround = advisory_data['workaround'].split("\n")
if advisory_data['workaround']:
work_text = darkgreen(" %s:" % (_("Workaround"),))
for x in workaround:
toc.append((work_text, darkred(x.strip())))
work_text = " "
# resolution
if advisory_data['resolution']:
res_text = darkgreen(" %s:" % (_("Resolution"),))
resolutions = advisory_data['resolution']
for resolution in resolutions:
for x in resolution.split("\n"):
toc.append((res_text, x.strip()))
res_text = " "
print_table(toc, cell_spacing = 3)
def list_advisories(security_intf, only_affected = False,
only_unaffected = False):
if (not only_affected and not only_unaffected) or \
(only_affected and only_unaffected):
adv_metadata = security_intf.get_advisories_metadata()
elif only_affected:
adv_metadata = security_intf.get_vulnerabilities()
else:
adv_metadata = security_intf.get_fixed_vulnerabilities()
if not adv_metadata:
print_info(brown(" :: ")+darkgreen("%s." % (
_("No advisories available or applicable"),)))
return 0
adv_keys = sorted(adv_metadata.keys())
for key in adv_keys:
affected = security_intf.is_affected(key)
if only_affected and not affected:
continue
if only_unaffected and affected:
continue
if affected:
affection_string = red("A")
else:
affection_string = green("N")
if adv_metadata[key]['affected']:
affected_data = list(adv_metadata[key]['affected'].keys())
if affected_data:
for a_key in affected_data:
k_data = adv_metadata[key]['affected'][a_key]
vulnerables = ', '.join(k_data[0]['vul_vers'])
description = "[GLSA:%s:%s][%s] %s: %s" % (
darkgreen(key),
affection_string,
brown(vulnerables),
darkred(a_key),
blue(adv_metadata[key]['title']))
print_info(description)
return 0
def oscheck(entropy_client, mtime_only = False, reinstall = False,
assimilate = False):
import text_ui
installed_repo = entropy_client.installed_repository()
if installed_repo is None:
if not etpUi['quiet']:
print_info(red(" @@ ")+blue("%s." % (
_("Installed packages repository is not available"),)))
return 1
if not etpUi['quiet']:
print_info(red(" @@ ")+blue("%s..." % (_("Checking system files"),)))
pkg_ids = installed_repo.listAllPackageIds()
total = len(pkg_ids)
count = 0
faulty_pkg_ids = []
for pkg_id in pkg_ids:
count += 1
pkg_atom = installed_repo.retrieveAtom(pkg_id)
sts_txt = "%s%s/%s%s %s" % (blue("["), darkgreen(str(count)),
purple(str(total)), blue("]"), brown(pkg_atom))
if not etpUi['quiet']:
print_info(blue("@@") + " " + sts_txt, back = True)
cont_s = installed_repo.retrieveContentSafety(pkg_id)
if not cont_s:
if (not etpUi['quiet']) and etpUi['verbose']:
atom_txt = " %s: " % (brown(pkg_atom),)
print_info(red("@@") + atom_txt + _("no checksum information"))
# if pkg provides content!
continue
paths_tainted = []
paths_unavailable = []
for path, safety_data in cont_s.items():
tainted = False
mtime = None
sha256 = None
if not os.path.lexists(path):
# file does not exist
# NOTE: current behaviour is to ignore file not available
# this might change in future.
paths_unavailable.append(path)
continue
elif not mtime_only:
# verify sha256
sha256 = entropy.tools.sha256(path)
tainted = sha256 != safety_data['sha256']
if tainted:
cont_s[path]['sha256'] = sha256
else:
# verify mtime
mtime = os.path.getmtime(path)
tainted = mtime != safety_data['mtime']
if tainted:
cont_s[path]['mtime'] = mtime
if assimilate:
if mtime is None:
cont_s[path]['mtime'] = os.path.getmtime(path)
elif sha256 is None:
cont_s[path]['sha256'] = entropy.tools.sha256(path)
if tainted:
paths_tainted.append(path)
if paths_tainted:
faulty_pkg_ids.append(pkg_id)
paths_tainted.sort()
if not etpUi['quiet']:
atom_txt = " %s: " % (teal(pkg_atom),)
print_info(red("@@") + atom_txt + _("altered files") + ":")
for path in paths_tainted:
if etpUi['quiet']:
print_generic(path)
else:
txt = " %s" % (purple(path),)
print_info(txt)
if assimilate:
if not etpUi['quiet']:
print_info(blue("@@") + " " + sts_txt + ", " + \
teal(_("assimilated new hashes and mtime")),)
installed_repo.setContentSafety(pkg_id, cont_s)
if paths_unavailable:
paths_unavailable.sort()
if (not etpUi['quiet']) and etpUi['verbose']:
for path in paths_unavailable:
txt = " %s [%s]" % (teal(path), purple(_("unavailable")))
print_info(txt)
if not faulty_pkg_ids:
if not etpUi['quiet']:
print_info(red(" @@ ") + darkgreen(_("No altered files found")))
return 0
rc = 0
if faulty_pkg_ids:
rc = 10
valid_matches = set()
if reinstall and faulty_pkg_ids:
for pkg_id in faulty_pkg_ids:
key, slot = installed_repo.retrieveKeySlot(pkg_id)
match = entropy_client.atom_match(key, match_slot = slot)
if match[0] != -1:
valid_matches.add(match)
if valid_matches:
rc, stat = text_ui.install_packages(entropy_client,
atomsdata = valid_matches)
if not etpUi['quiet']:
print_warning(red(" @@ ") + \
purple(_("Altered files have been found")))
if reinstall and (rc == 0) and valid_matches:
print_warning(red(" @@ ") + \
purple(_("Packages have been reinstalled successfully")))
return rc
def install_packages(entropy_client, security_intf, fetch = False):
import text_ui
print_info(red(" @@ ")+blue("%s..." % (_("Calculating security updates"),)))
affected_atoms = security_intf.get_affected_packages()
# match in client database
valid_matches = set()
for atom in affected_atoms:
match = entropy_client.installed_repository().atomMatch(atom)
if match[0] == -1:
continue
# get key + slot
key, slot = entropy_client.installed_repository().retrieveKeySlot(match[0])
# match in repos
match = entropy_client.atom_match(key, match_slot = slot)
if match[0] != -1:
valid_matches.add(match)
if not valid_matches:
print_info(red(" @@ ")+blue("%s." % (
_("All the available updates have been already installed"),)))
return 0
rc, stat = text_ui.install_packages(entropy_client,
atomsdata = valid_matches, onlyfetch = fetch)
return rc