[entropy.security] add GPG support

This commit is contained in:
Fabio Erculiani
2010-01-04 00:25:10 +01:00
parent 91626d9039
commit d25f05feb4
+247 -11
View File
@@ -16,13 +16,16 @@ import os
import shutil
import subprocess
import datetime
import tempfile
from entropy.exceptions import EntropyException
from entropy.misc import LogFile
from entropy.const import etpConst, etpUi, const_setup_perms, \
const_debug_write
from entropy.i18n import _
from entropy.output import blue, bold, red, darkgreen, darkred
from entropy.output import blue, bold, red, darkgreen, darkred, purple, brown
from entropy.cache import EntropyCacher
import entropy.tools
class System:
@@ -98,6 +101,13 @@ class System:
self.advisories_changed = None
self.adv_metadata = None
self.affected_atoms = set()
self.__gpg_keystore_dir = os.path.join(etpConst['confdir'],
"security-advisories-keys")
self._gpg_feature = True
env_gpg = os.getenv('ETP_DISBLE_GPG')
if env_gpg is not None:
self._gpg_feature = False
from xml.dom import minidom
self.minidom = minidom
@@ -125,9 +135,16 @@ class System:
self.security_url = security_url
self.unpacked_package = os.path.join(self.unpackdir, "glsa_package")
self.security_url_checksum = security_url + md5_ext
self.security_url_gpg_pubkey = os.path.join(
os.path.dirname(security_url), etpConst['etpdatabasegpgfile'])
self.security_url_gpg_sign = security_url + etpConst['etpgpgextension']
self.download_package = os.path.join(self.unpackdir, security_file)
self.download_package_checksum = self.download_package + md5_ext
self.download_package_gpg_pubkey = os.path.join(self.unpackdir,
"security-advisories#" + etpConst['etpdatabasegpgfile'])
self.download_package_gpg_sign = self.download_package + \
etpConst['etpgpgextension']
self.old_download_package_checksum = os.path.join(
System._CACHE_DIR, os.path.basename(security_url)) + md5_ext
@@ -188,6 +205,26 @@ class System:
return self.__generic_download(self.security_url_checksum,
self.download_package_checksum, show_speed = False)
def __download_glsa_package_gpg_sign(self):
"""
Download GLSA compressed package checksum (md5) from a trusted source.
"""
# remove old
if os.path.isfile(self.download_package_gpg_sign):
os.remove(self.download_package_gpg_sign)
return self.__generic_download(self.security_url_gpg_sign,
self.download_package_gpg_sign, show_speed = False)
def __download_glsa_package_gpg_pubkey(self):
"""
Download GLSA compressed package checksum (md5) from a trusted source.
"""
# remove old
if os.path.isfile(self.download_package_gpg_pubkey):
os.remove(self.download_package_gpg_pubkey)
return self.__generic_download(self.security_url_gpg_pubkey,
self.download_package_gpg_pubkey, show_speed = False)
def __generic_download(self, url, save_to, show_speed = True):
"""
Generic, secure, URL download method.
@@ -212,6 +249,192 @@ class System:
self.Entropy.setup_default_file_perms(save_to)
return True
def __load_gpg(self):
try:
repo_sec = self.Entropy.RepositorySecurity(
keystore_dir = self.__gpg_keystore_dir)
except RepositorySecurity.GPGError:
return None # GPG not available
return repo_sec
def __install_gpg_key(self, repo_sec):
pk_expired = False
try:
pk_avail = repo_sec.is_pubkey_available(self.security_url)
except repo_sec.KeyExpired:
pk_avail = False
pk_expired = True
def do_warn_user(fingerprint):
mytxt = purple(_("Make sure to verify the imported key and set an appropriate trust level"))
self.Entropy.updateProgress(
mytxt + ":",
type = "warning",
header = red(" # ")
)
mytxt = brown("gpg --homedir '%s' --edit-key '%s'" % (
self.__gpg_keystore_dir, fingerprint,)
)
self.Entropy.updateProgress(
"$ " + mytxt,
type = "warning",
header = red(" # ")
)
easy_url = entropy.tools.spliturl(self.security_url).netloc
if pk_avail:
tmp_dir = tempfile.mkdtemp()
repo_tmp_sec = self.Entropy.RepositorySecurity(
keystore_dir = tmp_dir)
# try to install and get fingerprint
try:
downloaded_key_fp = repo_tmp_sec.install_key(
self.security_url, self.download_package_gpg_pubkey)
except repo_sec.GPGError:
downloaded_key_fp = None
fingerprint = \
repo_sec.get_key_metadata(self.security_url)['fingerprint']
shutil.rmtree(tmp_dir, True)
if downloaded_key_fp != fingerprint and \
(downloaded_key_fp is not None):
mytxt = "%s: %s !!!" % (
purple(_("GPG key changed for")),
bold(easy_url),
)
self.Entropy.updateProgress(
mytxt,
type = "warning",
header = red(" # ")
)
mytxt = "[%s => %s]" % (
darkgreen(fingerprint),
purple(downloaded_key_fp),
)
self.Entropy.updateProgress(
mytxt,
type = "warning",
header = red(" # ")
)
else:
mytxt = "%s: %s" % (
purple(_("GPG key already installed for")),
bold(easy_url),
)
self.Entropy.updateProgress(
mytxt,
type = "info",
header = red(" # ")
)
do_warn_user(fingerprint)
return True
elif pk_expired:
mytxt = "%s: %s" % (
purple(_("GPG key EXPIRED for URL")),
bold(easy_url),
)
self.Entropy.updateProgress(
mytxt,
type = "warning",
header = red(" # ")
)
# actually install
mytxt = "%s: %s" % (
purple(_("Installing GPG key for URL")),
brown(easy_url),
)
self.Entropy.updateProgress(
mytxt,
type = "info",
header = red(" # "),
back = True
)
try:
fingerprint = repo_sec.install_key(self.security_url,
self.download_package_gpg_pubkey)
except repo_sec.GPGError as err:
mytxt = "%s: %s" % (
darkred(_("Error during GPG key installation")),
err,
)
self.Entropy.updateProgress(
mytxt,
type = "error",
header = red(" # ")
)
return False
mytxt = "%s: %s" % (
purple(_("Successfully installed GPG key for URL")),
brown(easy_url),
)
self.Entropy.updateProgress(
mytxt,
type = "info",
header = red(" # ")
)
mytxt = "%s: %s" % (
darkgreen(_("Fingerprint")),
bold(fingerprint),
)
self.Entropy.updateProgress(
mytxt,
type = "info",
header = red(" # ")
)
do_warn_user(fingerprint)
return True
def __verify_gpg(self):
repo_sec = self.__load_gpg()
if repo_sec is None:
return None
installed = self.__install_gpg_key(repo_sec)
if not installed:
return None
# verify GPG now
gpg_good, err_msg = repo_sec.verify_file(self.security_url,
self.download_package, self.download_package_gpg_sign)
if not gpg_good:
mytxt = "%s: %s" % (
purple(_("Error during GPG verification of")),
os.path.basename(self.download_package),
)
self.Entropy.updateProgress(
mytxt,
type = "error",
header = red(" # ") + bold(" !!! ")
)
mytxt = "%s: %s" % (
purple(_("It could mean a potential security risk")),
err_msg,
)
self.Entropy.updateProgress(
mytxt,
type = "error",
header = red(" # ") + bold(" !!! ")
)
return False
mytxt = "%s: %s." % (
bold(_("Security Advisories")),
purple(_("GPG key verification successful")),
)
self.Entropy.updateProgress(
mytxt,
type = "info",
header = red(" # ")
)
return True
def __get_downloaded_package_checksum(self):
if not os.path.isfile(self.download_package_checksum) or \
@@ -744,13 +967,7 @@ class System:
@return: availability
@rtype: bool
"""
if not os.path.lexists(etpConst['securitydir']):
return False
if not os.path.isdir(etpConst['securitydir']):
return False
else:
return True
return False
return os.path.isdir(etpConst['securitydir'])
def sync(self, do_cache = True, force = False):
"""
@@ -930,7 +1147,7 @@ class System:
elif status == 0:
mytxt = "%s: %s." % (
bold(_("Security Advisories")),
darkgreen(_("verification Successful")),
darkgreen(_("verification successful")),
)
self.Entropy.updateProgress(
mytxt,
@@ -939,8 +1156,27 @@ class System:
header = red(" # ")
)
else:
mytxt = _("Return status not valid")
raise InvalidData("InvalidData: %s." % (mytxt,))
raise System.UpdateError("Unhandled return code: %s" % (status,))
# download GPG key and package signature in a row
# env hook, disable GPG check
if self._gpg_feature:
gpg_sign_sts = self.__download_glsa_package_gpg_sign()
gpg_key_sts = self.__download_glsa_package_gpg_pubkey()
if gpg_sign_sts and gpg_key_sts:
verify_sts = self.__verify_gpg()
if verify_sts is None:
mytxt = "%s: %s." % (
bold(_("Security Advisories")),
purple(_("GPG service not available")),
)
self.Entropy.updateProgress(
mytxt,
type = "info",
header = red(" # ")
)
elif not verify_sts:
return 7
# save downloaded md5
if os.path.isfile(self.download_package_checksum) and \