unlocked_sync() works just like sync() but without acquiring the Entropy Resources Lock. This way it's possible to externally control the mutual exclusion.
2102 lines
72 KiB
Python
2102 lines
72 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
|
|
@author: Fabio Erculiani <lxnay@sabayon.org>
|
|
@contact: lxnay@sabayon.org
|
|
@copyright: Fabio Erculiani
|
|
@license: GPL-2
|
|
|
|
B{Entropy Framework Security module}.
|
|
|
|
This module contains Entropy GLSA-based Security interfaces.
|
|
|
|
|
|
"""
|
|
import os
|
|
import sys
|
|
import errno
|
|
import shutil
|
|
import subprocess
|
|
import datetime
|
|
import tempfile
|
|
import time
|
|
import codecs
|
|
|
|
from entropy.exceptions import EntropyException
|
|
from entropy.const import etpConst, etpUi, const_setup_perms, \
|
|
const_debug_write, const_setup_file, const_convert_to_unicode, \
|
|
const_convert_to_rawstring
|
|
from entropy.i18n import _
|
|
from entropy.output import blue, bold, red, darkgreen, darkred, purple, brown
|
|
from entropy.cache import EntropyCacher
|
|
from entropy.core.settings.base import SystemSettings
|
|
|
|
import entropy.tools
|
|
|
|
class System:
|
|
|
|
"""
|
|
~~ GIVES YOU WINGS ~~
|
|
|
|
@note: thanks to Gentoo "gentoolkit" package, License below:
|
|
@note: This program is licensed under the GPL, version 2
|
|
|
|
@note: WARNING: this code is not intended to replace any Security mechanism,
|
|
@note: but it's just a way to handle Gentoo GLSAs.
|
|
@note: There are possible security holes and probably bugs in this code.
|
|
|
|
This class implements the Entropy packages Security framework.
|
|
It can be used to retrieve security advisories, get information
|
|
about unapplied advisories, etc.
|
|
|
|
For specifications about security advisories metadata format, please see
|
|
docs/metadata/glsa.dtd. Your Source Package Manager must implement
|
|
advisories in this format, with file names ordered by your own criteria,
|
|
which will be matched 1:1 here.
|
|
You should provide a compressed .tar.gz or .tar.bz2 package containing such
|
|
xml files in a way that can be downloaded and installed by this class.
|
|
Your distribution should expose a publicly available URL as well as a valid
|
|
"securityurl" parameter inside repositories.conf.
|
|
|
|
To sum up, you as distributor should:
|
|
1. implement your security advisories xml files by looking at
|
|
docs/metadata/glsa.dtd specifications.
|
|
2. setup a cronjob that compresses your unpacked list of advisories
|
|
to a file inside a publicly available URL as well as a valid .md5
|
|
file.
|
|
3. provide a default repositories.conf file with securityurl| pointing
|
|
to that file (HTTP, FTP and FILE protocols supported).
|
|
4. Optionally, in the same dir you could make available a GPG public
|
|
key and a GPG signature of your security advisories .tar.* file.
|
|
The former MUST be named signature.asc while the latter must match
|
|
securityurl value plus ".asc"
|
|
|
|
"""
|
|
_CACHE_ID = 'advisories_cache_'
|
|
_CACHE_DIR = os.path.join(etpConst['entropyworkdir'], "security_cache")
|
|
SECURITY_DIR = etpConst['securitydir']
|
|
SECURITY_URL = None
|
|
|
|
class UpdateError(EntropyException):
|
|
"""Raised when security advisories couldn't be updated correctly"""
|
|
|
|
def __init__(self, entropy_client_instance):
|
|
|
|
"""
|
|
Instance constructor.
|
|
|
|
@param entropy_client_instance: a valid entropy.client.interfaces.Client
|
|
instance
|
|
@type entropy_client_instance: entropy.client.interfaces.Client instance
|
|
"""
|
|
|
|
# disabled for now
|
|
from entropy.client.interfaces import Client
|
|
if not isinstance(entropy_client_instance, Client):
|
|
raise AttributeError(
|
|
"entropy.client.interfaces.Client instance expected")
|
|
|
|
self._entropy = entropy_client_instance
|
|
|
|
self.__cacher = EntropyCacher()
|
|
self._settings = SystemSettings()
|
|
self.lastfetch = None
|
|
self.previous_checksum = "0"
|
|
self.advisories_changed = None
|
|
self.adv_metadata = None
|
|
self.affected_atoms = set()
|
|
self.__gpg_keystore_dir = os.path.join(etpConst['confdir'],
|
|
"security-advisories-keys")
|
|
|
|
self._gpg_feature = True
|
|
env_gpg = os.getenv('ETP_DISBLE_GPG')
|
|
if env_gpg is not None:
|
|
self._gpg_feature = False
|
|
|
|
from xml.dom import minidom
|
|
self.minidom = minidom
|
|
|
|
self.op_mappings = {
|
|
"le": "<=",
|
|
"lt": "<",
|
|
"eq": "=",
|
|
"gt": ">",
|
|
"ge": ">=",
|
|
"rge": ">=", # >=~
|
|
"rle": "<=", # <=~
|
|
"rgt": ">", # >~
|
|
"rlt": "<" # <~
|
|
}
|
|
|
|
if System.SECURITY_URL is None:
|
|
security_url = \
|
|
self._settings['repositories']['security_advisories_url']
|
|
else:
|
|
security_url = System.SECURITY_URL
|
|
security_file = os.path.basename(security_url)
|
|
md5_ext = etpConst['packagesmd5fileext']
|
|
sec_dir = System.SECURITY_DIR
|
|
|
|
self.unpackdir = os.path.join(etpConst['entropyunpackdir'],
|
|
"security-%s" % (entropy.tools.get_random_number(),))
|
|
self.security_url = security_url
|
|
self.unpacked_package = os.path.join(self.unpackdir, "glsa_package")
|
|
self.security_url_checksum = security_url + md5_ext
|
|
self.security_url_gpg_pubkey = os.path.join(
|
|
os.path.dirname(security_url), etpConst['etpdatabasegpgfile'])
|
|
self.security_url_gpg_sign = security_url + etpConst['etpgpgextension']
|
|
|
|
self.download_package = os.path.join(self.unpackdir, security_file)
|
|
self.download_package_checksum = self.download_package + md5_ext
|
|
self.download_package_gpg_pubkey = os.path.join(self.unpackdir,
|
|
"security-advisories#" + etpConst['etpdatabasegpgfile'])
|
|
self.download_package_gpg_sign = self.download_package + \
|
|
etpConst['etpgpgextension']
|
|
self.old_download_package_checksum = os.path.join(
|
|
System._CACHE_DIR, os.path.basename(security_url)) + md5_ext
|
|
|
|
self.security_package = os.path.join(sec_dir,
|
|
os.path.basename(security_url))
|
|
self.security_package_checksum = self.security_package + md5_ext
|
|
|
|
try:
|
|
if os.path.isfile(sec_dir) or os.path.islink(sec_dir):
|
|
os.remove(sec_dir)
|
|
if not os.path.isdir(sec_dir):
|
|
os.makedirs(sec_dir, 0o775)
|
|
except OSError:
|
|
pass
|
|
const_setup_perms(sec_dir, etpConst['entropygid'])
|
|
|
|
try:
|
|
if not os.path.isdir(System._CACHE_DIR):
|
|
os.makedirs(System._CACHE_DIR, 0o775)
|
|
except OSError:
|
|
pass
|
|
const_setup_perms(System._CACHE_DIR, etpConst['entropygid'])
|
|
|
|
if os.access(self.old_download_package_checksum, os.R_OK) and \
|
|
os.path.isfile(self.old_download_package_checksum):
|
|
|
|
with open(self.old_download_package_checksum, "r") as f_down:
|
|
try:
|
|
self.previous_checksum = \
|
|
f_down.readline().strip().split()[0]
|
|
except (IndexError, OSError, IOError,):
|
|
pass
|
|
|
|
|
|
def __prepare_unpack(self):
|
|
"""
|
|
Prepare GLSAs unpack directory and its permissions.
|
|
"""
|
|
if os.path.isfile(self.unpackdir) or os.path.islink(self.unpackdir):
|
|
os.remove(self.unpackdir)
|
|
|
|
if os.path.isdir(self.unpackdir):
|
|
shutil.rmtree(self.unpackdir, True)
|
|
try:
|
|
os.rmdir(self.unpackdir)
|
|
except OSError:
|
|
pass
|
|
|
|
os.makedirs(self.unpackdir, 0o775)
|
|
const_setup_perms(self.unpackdir, etpConst['entropygid'])
|
|
|
|
def __download_glsa_package(self):
|
|
"""
|
|
Download GLSA compressed package from a trusted source.
|
|
"""
|
|
return self.__generic_download(self.security_url, self.download_package)
|
|
|
|
def __download_glsa_package_cksum(self):
|
|
"""
|
|
Download GLSA compressed package checksum (md5) from a trusted source.
|
|
"""
|
|
return self.__generic_download(self.security_url_checksum,
|
|
self.download_package_checksum, show_speed = False)
|
|
|
|
def __download_glsa_package_gpg_sign(self):
|
|
"""
|
|
Download GLSA compressed package checksum (md5) from a trusted source.
|
|
"""
|
|
# remove old
|
|
if os.path.isfile(self.download_package_gpg_sign):
|
|
os.remove(self.download_package_gpg_sign)
|
|
return self.__generic_download(self.security_url_gpg_sign,
|
|
self.download_package_gpg_sign, show_speed = False)
|
|
|
|
def __download_glsa_package_gpg_pubkey(self):
|
|
"""
|
|
Download GLSA compressed package checksum (md5) from a trusted source.
|
|
"""
|
|
# remove old
|
|
if os.path.isfile(self.download_package_gpg_pubkey):
|
|
os.remove(self.download_package_gpg_pubkey)
|
|
return self.__generic_download(self.security_url_gpg_pubkey,
|
|
self.download_package_gpg_pubkey, show_speed = False)
|
|
|
|
def __generic_download(self, url, save_to, show_speed = True):
|
|
"""
|
|
Generic, secure, URL download method.
|
|
|
|
@param url: download URL
|
|
@type url: string
|
|
@param save_to: path to save file
|
|
@type save_to: string
|
|
@keyword show_speed: if True, download speed will be shown
|
|
@type show_speed: bool
|
|
@return: download status (True if download succeeded)
|
|
@rtype: bool
|
|
"""
|
|
fetcher = self._entropy._url_fetcher(url, save_to, resume = False,
|
|
show_speed = show_speed)
|
|
rc_fetch = fetcher.download()
|
|
del fetcher
|
|
if rc_fetch in ("-1", "-2", "-3", "-4"):
|
|
return False
|
|
# setup permissions
|
|
const_setup_file(save_to, etpConst['entropygid'], 0o664)
|
|
return True
|
|
|
|
def __load_gpg(self):
|
|
try:
|
|
repo_sec = self._entropy.RepositorySecurity(
|
|
keystore_dir = self.__gpg_keystore_dir)
|
|
except Repository.GPGError:
|
|
return None # GPG not available
|
|
return repo_sec
|
|
|
|
def __install_gpg_key(self, repo_sec):
|
|
pk_expired = False
|
|
try:
|
|
pk_avail = repo_sec.is_pubkey_available(self.security_url)
|
|
except repo_sec.KeyExpired:
|
|
pk_avail = False
|
|
pk_expired = True
|
|
|
|
def do_warn_user(fingerprint):
|
|
mytxt = purple(_("Make sure to verify the imported key and set an appropriate trust level"))
|
|
self._entropy.output(
|
|
mytxt + ":",
|
|
level = "warning",
|
|
header = red(" # ")
|
|
)
|
|
mytxt = brown("gpg --homedir '%s' --edit-key '%s'" % (
|
|
self.__gpg_keystore_dir, fingerprint,)
|
|
)
|
|
self._entropy.output(
|
|
"$ " + mytxt,
|
|
level = "warning",
|
|
header = red(" # ")
|
|
)
|
|
|
|
easy_url = "N/A"
|
|
splitres = entropy.tools.spliturl(self.security_url)
|
|
if hasattr(splitres, 'netloc'):
|
|
easy_url = splitres.netloc
|
|
|
|
if pk_avail:
|
|
|
|
tmp_dir = tempfile.mkdtemp()
|
|
repo_tmp_sec = self._entropy.RepositorySecurity(
|
|
keystore_dir = tmp_dir)
|
|
# try to install and get fingerprint
|
|
try:
|
|
downloaded_key_fp = repo_tmp_sec.install_key(
|
|
self.security_url, self.download_package_gpg_pubkey)
|
|
except repo_sec.GPGError:
|
|
downloaded_key_fp = None
|
|
|
|
fingerprint = repo_sec.get_key_metadata(
|
|
self.security_url)['fingerprint']
|
|
shutil.rmtree(tmp_dir, True)
|
|
|
|
if downloaded_key_fp != fingerprint and \
|
|
(downloaded_key_fp is not None):
|
|
mytxt = "%s: %s !!!" % (
|
|
purple(_("GPG key changed for")),
|
|
bold(easy_url),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "warning",
|
|
header = red(" # ")
|
|
)
|
|
mytxt = "[%s => %s]" % (
|
|
darkgreen(fingerprint),
|
|
purple(downloaded_key_fp),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "warning",
|
|
header = red(" # ")
|
|
)
|
|
else:
|
|
mytxt = "%s: %s" % (
|
|
purple(_("GPG key already installed for")),
|
|
bold(easy_url),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "info",
|
|
header = red(" # ")
|
|
)
|
|
do_warn_user(fingerprint)
|
|
return True
|
|
|
|
elif pk_expired:
|
|
mytxt = "%s: %s" % (
|
|
purple(_("GPG key EXPIRED for URL")),
|
|
bold(easy_url),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "warning",
|
|
header = red(" # ")
|
|
)
|
|
|
|
# actually install
|
|
mytxt = "%s: %s" % (
|
|
purple(_("Installing GPG key for URL")),
|
|
brown(easy_url),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "info",
|
|
header = red(" # "),
|
|
back = True
|
|
)
|
|
try:
|
|
fingerprint = repo_sec.install_key(self.security_url,
|
|
self.download_package_gpg_pubkey)
|
|
except repo_sec.GPGError as err:
|
|
mytxt = "%s: %s" % (
|
|
darkred(_("Error during GPG key installation")),
|
|
err,
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "error",
|
|
header = red(" # ")
|
|
)
|
|
return False
|
|
|
|
mytxt = "%s: %s" % (
|
|
purple(_("Successfully installed GPG key for URL")),
|
|
brown(easy_url),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "info",
|
|
header = red(" # ")
|
|
)
|
|
mytxt = "%s: %s" % (
|
|
darkgreen(_("Fingerprint")),
|
|
bold(fingerprint),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "info",
|
|
header = red(" # ")
|
|
)
|
|
do_warn_user(fingerprint)
|
|
return True
|
|
|
|
def __verify_gpg(self):
|
|
|
|
repo_sec = self.__load_gpg()
|
|
if repo_sec is None:
|
|
return None
|
|
installed = self.__install_gpg_key(repo_sec)
|
|
if not installed:
|
|
return None
|
|
|
|
# verify GPG now
|
|
gpg_good, err_msg = repo_sec.verify_file(self.security_url,
|
|
self.download_package, self.download_package_gpg_sign)
|
|
if not gpg_good:
|
|
mytxt = "%s: %s" % (
|
|
purple(_("Error during GPG verification of")),
|
|
os.path.basename(self.download_package),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "error",
|
|
header = red(" # ") + bold(" !!! ")
|
|
)
|
|
mytxt = "%s: %s" % (
|
|
purple(_("It could mean a potential security risk")),
|
|
err_msg,
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "error",
|
|
header = red(" # ") + bold(" !!! ")
|
|
)
|
|
return False
|
|
|
|
mytxt = "%s: %s." % (
|
|
bold(_("Security Advisories")),
|
|
purple(_("GPG key verification successful")),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "info",
|
|
header = red(" # ")
|
|
)
|
|
|
|
return True
|
|
|
|
def __get_downloaded_package_checksum(self):
|
|
|
|
if not os.path.isfile(self.download_package_checksum) or \
|
|
not os.access(self.download_package_checksum, os.R_OK):
|
|
return None
|
|
|
|
with open(self.download_package_checksum, "r") as f_down:
|
|
try:
|
|
return f_down.readline().strip().split()[0]
|
|
except (OSError, IOError, IndexError,):
|
|
return None
|
|
|
|
def __verify_checksum(self):
|
|
"""
|
|
Verify downloaded GLSA checksum against downloaded GLSA package.
|
|
"""
|
|
# read checksum
|
|
|
|
checksum = self.__get_downloaded_package_checksum()
|
|
if checksum is None:
|
|
return 1
|
|
|
|
self.advisories_changed = True
|
|
if checksum == self.previous_checksum:
|
|
self.advisories_changed = False
|
|
|
|
md5res = entropy.tools.compare_md5(self.download_package, checksum)
|
|
if not md5res:
|
|
return 3
|
|
return 0
|
|
|
|
def __unpack_advisories(self):
|
|
"""
|
|
Unpack downloaded GLSA package containing GLSA advisories.
|
|
"""
|
|
rc_unpack = entropy.tools.uncompress_tarball(
|
|
self.download_package,
|
|
extract_path = self.unpacked_package,
|
|
catch_empty = True
|
|
)
|
|
const_setup_perms(self.unpacked_package, etpConst['entropygid'])
|
|
return rc_unpack
|
|
|
|
def __clear_previous_advisories(self):
|
|
"""
|
|
Remove previously installed GLSA advisories.
|
|
"""
|
|
if os.listdir(System.SECURITY_DIR):
|
|
shutil.rmtree(System.SECURITY_DIR, True)
|
|
if not os.path.isdir(System.SECURITY_DIR):
|
|
os.makedirs(System.SECURITY_DIR, 0o775)
|
|
const_setup_perms(System.SECURITY_DIR,
|
|
etpConst['entropygid'])
|
|
const_setup_perms(self.unpackdir, etpConst['entropygid'])
|
|
|
|
def __put_advisories_in_place(self):
|
|
"""
|
|
Place unpacked advisories in place (into System.SECURITY_DIR).
|
|
"""
|
|
for advfile in os.listdir(self.unpacked_package):
|
|
from_file = os.path.join(self.unpacked_package, advfile)
|
|
to_file = os.path.join(System.SECURITY_DIR, advfile)
|
|
try:
|
|
os.rename(from_file, to_file)
|
|
except OSError as err:
|
|
if err.errno != errno.EXDEV:
|
|
raise
|
|
shutil.move(from_file, to_file)
|
|
|
|
def __cleanup_garbage(self):
|
|
"""
|
|
Remove GLSA unpack directory.
|
|
"""
|
|
shutil.rmtree(self.unpackdir, True)
|
|
|
|
def __validate_cache(self):
|
|
"""
|
|
Validate cache by looking at some checksum data
|
|
"""
|
|
inst_pkgs_cksum = self._entropy.installed_repository().checksum(
|
|
do_order = True, strict = False, strings = True)
|
|
repo_cksum = self._entropy._repositories_hash()
|
|
sys_hash = str(hash(repo_cksum + inst_pkgs_cksum))
|
|
|
|
cached = self.__cacher.pop(sys_hash, cache_dir = System._CACHE_DIR)
|
|
if cached is None:
|
|
self.clear() # kill the cache
|
|
self.__cacher.push(sys_hash, True, cache_dir = System._CACHE_DIR)
|
|
|
|
def clear(self):
|
|
"""
|
|
Clear instance cache (RAM and on-disk).
|
|
"""
|
|
self.adv_metadata = None
|
|
self.__cacher.discard()
|
|
EntropyCacher.clear_cache_item(System._CACHE_ID,
|
|
cache_dir = System._CACHE_DIR)
|
|
if not os.path.isdir(System._CACHE_DIR):
|
|
try:
|
|
os.makedirs(System._CACHE_DIR, 0o775)
|
|
except OSError as err:
|
|
# race condition handling
|
|
if err.errno != errno.EEXIST:
|
|
raise
|
|
const_setup_perms(System._CACHE_DIR, etpConst['entropygid'])
|
|
|
|
def _get_advisories_cache_hash(self):
|
|
dir_checksum = entropy.tools.md5sum_directory(
|
|
System.SECURITY_DIR)
|
|
c_hash = "%s%s" % (
|
|
System._CACHE_ID, hash("%s|%s|%s" % (
|
|
hash(self._settings['repositories']['branch']),
|
|
hash(dir_checksum),
|
|
hash(etpConst['systemroot']),
|
|
),))
|
|
return c_hash
|
|
|
|
def get_advisories_cache(self):
|
|
"""
|
|
Return cached advisories information metadata. It first tries to load
|
|
them from RAM and, in case of failure, it tries to gather the info
|
|
from disk, using EntropyCacher.
|
|
"""
|
|
if self.adv_metadata is not None:
|
|
return self.adv_metadata
|
|
|
|
# validate cache
|
|
self.__validate_cache()
|
|
|
|
c_hash = self._get_advisories_cache_hash()
|
|
adv_metadata = self.__cacher.pop(c_hash,
|
|
cache_dir = System._CACHE_DIR)
|
|
if adv_metadata is not None:
|
|
self.adv_metadata = adv_metadata.copy()
|
|
return self.adv_metadata
|
|
|
|
def set_advisories_cache(self, adv_metadata):
|
|
"""
|
|
Set advisories information metadata cache.
|
|
|
|
@param adv_metadata: advisories metadata to store
|
|
@type adv_metadata: dict
|
|
"""
|
|
self.adv_metadata = None
|
|
c_hash = self._get_advisories_cache_hash()
|
|
# async false to allow 3rd-party applications to not wait
|
|
# before getting cached results. A straight example: sulfur
|
|
# and its security cache generation separate thread.
|
|
self.__cacher.push(c_hash, adv_metadata,
|
|
cache_dir = System._CACHE_DIR, async = False)
|
|
|
|
def _get_advisories_list(self):
|
|
"""
|
|
Return a list of advisory files. Internal method.
|
|
"""
|
|
if not self.check_advisories_availability():
|
|
return []
|
|
xmls = os.listdir(System.SECURITY_DIR)
|
|
xmls = sorted([x for x in xmls if x.endswith(".xml") and \
|
|
x.startswith("glsa-")])
|
|
return xmls
|
|
|
|
def get_advisories_metadata(self, use_cache = True):
|
|
"""
|
|
Get security advisories metadata.
|
|
|
|
@return: advisories metadata
|
|
@rtype: dict
|
|
"""
|
|
if use_cache:
|
|
cached = self.get_advisories_cache()
|
|
if cached is not None:
|
|
return cached
|
|
|
|
adv_metadata = {}
|
|
xmls = self._get_advisories_list()
|
|
maxlen = len(xmls)
|
|
count = 0
|
|
t_up = time.time()
|
|
for xml in xmls:
|
|
|
|
count += 1
|
|
if not etpUi['quiet']:
|
|
cur_t = time.time()
|
|
if ((cur_t - t_up) > 1):
|
|
t_up = cur_t
|
|
self._entropy.output(":: " + \
|
|
str(round((float(count)/maxlen)*100, 1)) + "% ::",
|
|
importance = 0, level = "info", back = True)
|
|
|
|
xml_metadata = None
|
|
exc_string = ""
|
|
exc_err = ""
|
|
try:
|
|
xml_metadata = self.__get_xml_metadata(xml)
|
|
except KeyboardInterrupt:
|
|
return {}
|
|
except Exception as err:
|
|
exc_string = str(Exception)
|
|
exc_err = str(err)
|
|
if xml_metadata == None:
|
|
more_info = ""
|
|
if exc_string:
|
|
mytxt = _("Error")
|
|
more_info = " %s: %s: %s" % (mytxt, exc_string, exc_err,)
|
|
mytxt = "%s: %s: %s! %s" % (
|
|
blue(_("Warning")),
|
|
bold(xml),
|
|
blue(_("advisory broken")),
|
|
more_info,
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
importance = 1,
|
|
level = "warning",
|
|
header = red(" !!! ")
|
|
)
|
|
continue
|
|
elif not xml_metadata:
|
|
continue
|
|
adv_metadata.update(xml_metadata)
|
|
|
|
adv_metadata = self.filter_advisories(adv_metadata)
|
|
self.set_advisories_cache(adv_metadata)
|
|
self.adv_metadata = adv_metadata.copy()
|
|
return adv_metadata
|
|
|
|
def filter_advisories(self, adv_metadata):
|
|
"""
|
|
This function filters advisories metadata dict removing non-applicable
|
|
ones.
|
|
|
|
@param adv_metadata: security advisories metadata dict
|
|
@type adv_metadata: dict
|
|
@return: filtered security advisories metadata
|
|
@rtype: dict
|
|
"""
|
|
# do not match package repositories, never consider them in updates!
|
|
# that would be a nonsense, since package repos are temporary.
|
|
enabled_repos = self._entropy._filter_available_repositories()
|
|
match_repos = tuple([x for x in \
|
|
self._settings['repositories']['order'] if x in enabled_repos])
|
|
|
|
keys = list(adv_metadata.keys())
|
|
for key in keys:
|
|
valid = True
|
|
if adv_metadata[key]['affected']:
|
|
affected = adv_metadata[key]['affected']
|
|
affected_keys = list(affected.keys())
|
|
valid = False
|
|
skipping_keys = set()
|
|
for a_key in affected_keys:
|
|
match = self._entropy.atom_match(a_key,
|
|
match_repo = match_repos)
|
|
if match[0] != -1:
|
|
# it's in the repos, it's valid
|
|
valid = True
|
|
else:
|
|
skipping_keys.add(a_key)
|
|
if not valid:
|
|
del adv_metadata[key]
|
|
for a_key in skipping_keys:
|
|
try:
|
|
del adv_metadata[key]['affected'][a_key]
|
|
except KeyError:
|
|
continue
|
|
try:
|
|
if not adv_metadata[key]['affected']:
|
|
del adv_metadata[key]
|
|
except KeyError:
|
|
continue
|
|
|
|
return adv_metadata
|
|
|
|
def is_affected(self, adv_key, adv_data = None):
|
|
"""
|
|
Determine whether the system is affected by vulnerabilities listed
|
|
in the provided security advisory identifier.
|
|
|
|
@param adv_key: security advisories identifier
|
|
@type adv_key: string
|
|
@keyword adv_data: use the provided security advisories instead of
|
|
the stored one.
|
|
@type adv_data: dict
|
|
@return: True, if system is affected by vulnerabilities listed in the
|
|
provided security advisory.
|
|
@rtype: bool
|
|
"""
|
|
if not adv_data:
|
|
adv_data = self.get_advisories_metadata()
|
|
if adv_key not in adv_data:
|
|
return False
|
|
mydata = adv_data[adv_key].copy()
|
|
del adv_data
|
|
|
|
if not mydata['affected']:
|
|
return False
|
|
|
|
for key in mydata['affected']:
|
|
|
|
vul_atoms = mydata['affected'][key][0]['vul_atoms']
|
|
unaff_atoms = mydata['affected'][key][0]['unaff_atoms']
|
|
unaffected_atoms = set()
|
|
if not vul_atoms:
|
|
return False
|
|
for atom in unaff_atoms:
|
|
matches = self._entropy.installed_repository().atomMatch(atom,
|
|
multiMatch = True)
|
|
for idpackage in matches[0]:
|
|
unaffected_atoms.add((idpackage, 0))
|
|
|
|
for atom in vul_atoms:
|
|
match = self._entropy.installed_repository().atomMatch(atom)
|
|
if (match[0] != -1) and (match not in unaffected_atoms):
|
|
self.affected_atoms.add(atom)
|
|
return True
|
|
return False
|
|
|
|
def get_vulnerabilities(self):
|
|
"""
|
|
Return advisories metadata for installed packages containing
|
|
vulnerabilities.
|
|
|
|
@return: advisories metadata for vulnerable packages.
|
|
@rtype: dict
|
|
"""
|
|
return self.__get_affection()
|
|
|
|
def get_fixed_vulnerabilities(self):
|
|
"""
|
|
Return advisories metadata for installed packages not affected
|
|
by any vulnerability.
|
|
|
|
@return: advisories metadata for NON-vulnerable packages.
|
|
@rtype: dict
|
|
"""
|
|
return self.__get_affection(affected = False)
|
|
|
|
def __get_affection(self, affected = True):
|
|
"""
|
|
If not affected: not affected packages will be returned.
|
|
If affected: affected packages will be returned.
|
|
"""
|
|
adv_data = self.get_advisories_metadata()
|
|
adv_data_keys = list(adv_data.keys())
|
|
valid_keys = set()
|
|
for adv in adv_data_keys:
|
|
is_affected = self.is_affected(adv, adv_data)
|
|
if affected == is_affected:
|
|
valid_keys.add(adv)
|
|
# we need to filter our adv_data and return
|
|
for key in adv_data_keys:
|
|
if key not in valid_keys:
|
|
try:
|
|
del adv_data[key]
|
|
except KeyError:
|
|
pass
|
|
# now we need to filter packages in adv_dat
|
|
for adv in adv_data:
|
|
for key in list(adv_data[adv]['affected'].keys()):
|
|
atoms = adv_data[adv]['affected'][key][0]['vul_atoms']
|
|
applicable = True
|
|
for atom in atoms:
|
|
if atom in self.affected_atoms:
|
|
applicable = False
|
|
break
|
|
if applicable == affected:
|
|
del adv_data[adv]['affected'][key]
|
|
return adv_data
|
|
|
|
def get_affected_packages(self):
|
|
"""
|
|
Return a list of package names affected by vulnerabilities.
|
|
|
|
@return: list (set) of package names affected by vulnerabilities
|
|
@rtype: set
|
|
"""
|
|
adv_data = self.get_advisories_metadata()
|
|
adv_data_keys = list(adv_data.keys())
|
|
del adv_data
|
|
self.affected_atoms.clear()
|
|
for key in adv_data_keys:
|
|
self.is_affected(key)
|
|
return self.affected_atoms
|
|
|
|
def __get_xml_metadata(self, xmlfilename):
|
|
"""
|
|
Parses a Gentoo GLSA XML file extracting advisory metadata.
|
|
|
|
@param xmlfilename: GLSA filename
|
|
@type xmlfilename: string
|
|
@return: advisory metadata extracted
|
|
@rtype: dict
|
|
"""
|
|
xml_data = {}
|
|
xmlfile = os.path.join(System.SECURITY_DIR, xmlfilename)
|
|
try:
|
|
xmldoc = self.minidom.parse(xmlfile)
|
|
except (IOError, OSError, TypeError, AttributeError,):
|
|
return None
|
|
|
|
# get base data
|
|
glsa_tree = xmldoc.getElementsByTagName("glsa")[0]
|
|
glsa_product = glsa_tree.getElementsByTagName("product")[0]
|
|
if glsa_product.getAttribute("type") != "ebuild":
|
|
return {}
|
|
|
|
glsa_id = glsa_tree.getAttribute("id")
|
|
glsa_title = glsa_tree.getElementsByTagName("title")[0]
|
|
glsa_title = glsa_title.firstChild.data
|
|
glsa_synopsis = glsa_tree.getElementsByTagName("synopsis")[0]
|
|
glsa_synopsis = glsa_synopsis.firstChild.data
|
|
glsa_announced = glsa_tree.getElementsByTagName("announced")[0]
|
|
glsa_announced = glsa_announced.firstChild.data
|
|
glsa_revised = glsa_tree.getElementsByTagName("revised")[0]
|
|
glsa_revised = glsa_revised.firstChild.data
|
|
|
|
xml_data['filename'] = xmlfilename
|
|
xml_data['url'] = "http://www.gentoo.org/security/en/glsa/%s" % (
|
|
xmlfilename,)
|
|
xml_data['title'] = glsa_title.strip()
|
|
xml_data['synopsis'] = glsa_synopsis.strip()
|
|
xml_data['announced'] = glsa_announced.strip()
|
|
xml_data['revised'] = glsa_revised.strip()
|
|
xml_data['bugs'] = ["https://bugs.gentoo.org/" + \
|
|
x.firstChild.data.strip() for x in \
|
|
glsa_tree.getElementsByTagName("bug")]
|
|
|
|
try:
|
|
glsa_access = glsa_tree.getElementsByTagName("access")[0]
|
|
xml_data['access'] = glsa_access.firstChild.data.strip()
|
|
except IndexError:
|
|
xml_data['access'] = ""
|
|
|
|
# references
|
|
references = glsa_tree.getElementsByTagName("references")[0]
|
|
xml_data['references'] = [x.getAttribute("link").strip() for x in \
|
|
references.getElementsByTagName("uri")]
|
|
|
|
try:
|
|
xml_data['description_items'] = []
|
|
desc = glsa_tree.getElementsByTagName("description")[0]
|
|
desc = desc.getElementsByTagName("p")[0].firstChild.data.strip()
|
|
xml_data['description'] = desc
|
|
items = glsa_tree.getElementsByTagName("description")[0]
|
|
for item in items.getElementsByTagName("ul"):
|
|
li_items = item.getElementsByTagName("li")
|
|
for li_item in li_items:
|
|
xml_data['description_items'].append(' '.join(
|
|
[x.strip() for x in \
|
|
li_item.firstChild.data.strip().split("\n")])
|
|
)
|
|
except IndexError:
|
|
xml_data['description'] = ""
|
|
xml_data['description_items'] = []
|
|
|
|
try:
|
|
workaround = glsa_tree.getElementsByTagName("workaround")[0]
|
|
workaround_p = workaround.getElementsByTagName("p")[0]
|
|
xml_data['workaround'] = workaround_p.firstChild.data.strip()
|
|
except IndexError:
|
|
xml_data['workaround'] = ""
|
|
|
|
try:
|
|
xml_data['resolution'] = []
|
|
resolution = glsa_tree.getElementsByTagName("resolution")[0]
|
|
p_elements = resolution.getElementsByTagName("p")
|
|
for p_elem in p_elements:
|
|
xml_data['resolution'].append(p_elem.firstChild.data.strip())
|
|
except IndexError:
|
|
xml_data['resolution'] = []
|
|
|
|
try:
|
|
impact = glsa_tree.getElementsByTagName("impact")[0]
|
|
impact_p = impact.getElementsByTagName("p")[0]
|
|
xml_data['impact'] = impact_p.firstChild.data.strip()
|
|
except IndexError:
|
|
xml_data['impact'] = ""
|
|
impact_type = glsa_tree.getElementsByTagName("impact")[0]
|
|
xml_data['impacttype'] = impact_type.getAttribute("type").strip()
|
|
|
|
try:
|
|
background = glsa_tree.getElementsByTagName("background")[0]
|
|
background_p = background.getElementsByTagName("p")[0]
|
|
xml_data['background'] = background_p.firstChild.data.strip()
|
|
except IndexError:
|
|
xml_data['background'] = ""
|
|
|
|
# affection information
|
|
affected = glsa_tree.getElementsByTagName("affected")[0]
|
|
affected_packages = {}
|
|
# we will then filter affected_packages using repositories information
|
|
# if not affected_packages: advisory will be dropped
|
|
for pkg in affected.getElementsByTagName("package"):
|
|
name = pkg.getAttribute("name")
|
|
if name not in affected_packages:
|
|
affected_packages[name] = []
|
|
|
|
pdata = {}
|
|
pdata["arch"] = pkg.getAttribute("arch").strip()
|
|
pdata["auto"] = (pkg.getAttribute("auto") == "yes")
|
|
pdata["vul_vers"] = [self.__make_version(v) for v in \
|
|
pkg.getElementsByTagName("vulnerable")]
|
|
pdata["unaff_vers"] = [self.__make_version(v) for v in \
|
|
pkg.getElementsByTagName("unaffected")]
|
|
pdata["vul_atoms"] = [self.__make_atom(name, v) for v \
|
|
in pkg.getElementsByTagName("vulnerable")]
|
|
pdata["unaff_atoms"] = [self.__make_atom(name, v) for v \
|
|
in pkg.getElementsByTagName("unaffected")]
|
|
affected_packages[name].append(pdata)
|
|
xml_data['affected'] = affected_packages.copy()
|
|
|
|
return {glsa_id: xml_data}
|
|
|
|
def __make_version(self, vnode):
|
|
"""
|
|
creates from the information in the I{versionNode} a
|
|
version string (format <op><version>).
|
|
|
|
@param vnode: a <vulnerable> or <unaffected> Node that
|
|
contains the version information for this atom
|
|
@type vnode: xml.dom.Node
|
|
@return: the version string
|
|
@rtype: string
|
|
"""
|
|
return self.op_mappings[vnode.getAttribute("range")] + \
|
|
vnode.firstChild.data.strip()
|
|
|
|
def __make_atom(self, pkgname, vnode):
|
|
"""
|
|
creates from the given package name and information in the
|
|
I{versionNode} a (syntactical) valid portage atom.
|
|
|
|
@param pkgname: the name of the package for this atom
|
|
@type pkgname: string
|
|
@param vnode: a <vulnerable> or <unaffected> Node that
|
|
contains the version information for this atom
|
|
@type vnode: xml.dom.Node
|
|
@return: the portage atom
|
|
@rtype: string
|
|
"""
|
|
return str(self.op_mappings[vnode.getAttribute("range")] + pkgname + \
|
|
"-" + vnode.firstChild.data.strip())
|
|
|
|
def check_advisories_availability(self):
|
|
"""
|
|
Return whether security advisories are available.
|
|
|
|
@return: availability
|
|
@rtype: bool
|
|
"""
|
|
return os.path.isdir(System.SECURITY_DIR)
|
|
|
|
def unlocked_sync(self, do_cache = True, force = False):
|
|
"""
|
|
This is equivalent to sync() but doesn't acquire the
|
|
Entropy Resources Lock.
|
|
|
|
@keyword do_cache: generates advisories cache
|
|
@type do_cache: bool
|
|
@return: execution status (0 means all file)
|
|
@rtype: int
|
|
"""
|
|
mytxt = "%s: %s %s" % (
|
|
bold(_("Security Advisories")),
|
|
blue(_("getting latest advisories")),
|
|
red("..."),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
importance = 2,
|
|
level = "info",
|
|
header = red(" @@ ")
|
|
)
|
|
|
|
rc_lock = self.__run_fetch(force = force)
|
|
if rc_lock == 0:
|
|
if self.advisories_changed:
|
|
advtext = "%s: %s" % (
|
|
bold(_("Security Advisories")),
|
|
darkgreen(_("updated successfully")),
|
|
)
|
|
if do_cache:
|
|
self.get_advisories_metadata()
|
|
else:
|
|
advtext = "%s: %s" % (
|
|
bold(_("Security Advisories")),
|
|
darkgreen(_("already up to date")),
|
|
)
|
|
self._entropy.output(
|
|
advtext,
|
|
importance = 2,
|
|
level = "info",
|
|
header = red(" @@ ")
|
|
)
|
|
return rc_lock
|
|
|
|
def sync(self, do_cache = True, force = False):
|
|
"""
|
|
This is the service method for remotely fetch advisories metadata.
|
|
|
|
@keyword do_cache: generates advisories cache
|
|
@type do_cache: bool
|
|
@return: execution status (0 means all file)
|
|
@rtype: int
|
|
"""
|
|
gave_up = self._entropy.wait_resources()
|
|
if gave_up:
|
|
return 7
|
|
|
|
# acquired
|
|
try:
|
|
rc_lock = self.unlocked_sync(do_cache = do_cache,
|
|
force = force)
|
|
if rc_lock != 0:
|
|
return rc_lock
|
|
finally:
|
|
self._entropy.unlock_resources()
|
|
return 0
|
|
|
|
def __run_fetch(self, force = False):
|
|
# prepare directories
|
|
self.__prepare_unpack()
|
|
|
|
# download digest
|
|
status = self.__download_glsa_package_cksum()
|
|
if not status:
|
|
mytxt = "%s: %s." % (
|
|
bold(_("Security Advisories")),
|
|
darkred(_("cannot download checksum, sorry")),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
importance = 2,
|
|
level = "error",
|
|
header = red(" ## ")
|
|
)
|
|
return 2
|
|
|
|
# check if we need to go further
|
|
checksum = self.__get_downloaded_package_checksum()
|
|
if (checksum == self.previous_checksum) and not force:
|
|
# we're done
|
|
self.advisories_changed = False
|
|
return 0
|
|
|
|
# download package
|
|
status = self.__download_glsa_package()
|
|
self.lastfetch = status
|
|
if not status:
|
|
mytxt = "%s: %s." % (
|
|
bold(_("Security Advisories")),
|
|
darkred(_("unable to download advisories, sorry")),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
importance = 2,
|
|
level = "error",
|
|
header = red(" ## ")
|
|
)
|
|
return 1
|
|
|
|
mytxt = "%s: %s %s" % (
|
|
bold(_("Security Advisories")),
|
|
blue(_("Verifying checksum")),
|
|
red("..."),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
importance = 1,
|
|
level = "info",
|
|
header = red(" # "),
|
|
back = True
|
|
)
|
|
|
|
# verify digest
|
|
status = self.__verify_checksum()
|
|
|
|
if status == 1:
|
|
mytxt = "%s: %s." % (
|
|
bold(_("Security Advisories")),
|
|
darkred(_("cannot open packages, sorry")),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
importance = 2,
|
|
level = "error",
|
|
header = red(" ## ")
|
|
)
|
|
return 3
|
|
elif status == 2:
|
|
mytxt = "%s: %s." % (
|
|
bold(_("Security Advisories")),
|
|
darkred(_("cannot read the checksum, sorry")),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
importance = 2,
|
|
level = "error",
|
|
header = red(" ## ")
|
|
)
|
|
return 4
|
|
elif status == 3:
|
|
mytxt = "%s: %s." % (
|
|
bold(_("Security Advisories")),
|
|
darkred(_("digest verification failed, sorry")),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
importance = 2,
|
|
level = "error",
|
|
header = red(" ## ")
|
|
)
|
|
return 5
|
|
elif status == 0:
|
|
mytxt = "%s: %s." % (
|
|
bold(_("Security Advisories")),
|
|
darkgreen(_("verification successful")),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
importance = 1,
|
|
level = "info",
|
|
header = red(" # ")
|
|
)
|
|
else:
|
|
raise System.UpdateError("Unhandled return code: %s" % (status,))
|
|
|
|
# download GPG key and package signature in a row
|
|
# env hook, disable GPG check
|
|
if self._gpg_feature:
|
|
gpg_sign_sts = self.__download_glsa_package_gpg_sign()
|
|
gpg_key_sts = self.__download_glsa_package_gpg_pubkey()
|
|
if gpg_sign_sts and gpg_key_sts:
|
|
verify_sts = self.__verify_gpg()
|
|
if verify_sts is None:
|
|
mytxt = "%s: %s." % (
|
|
bold(_("Security Advisories")),
|
|
purple(_("GPG service not available")),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
level = "info",
|
|
header = red(" # ")
|
|
)
|
|
elif not verify_sts:
|
|
return 7
|
|
|
|
# save downloaded md5
|
|
if os.path.isfile(self.download_package_checksum):
|
|
try:
|
|
os.rename(self.download_package_checksum,
|
|
self.old_download_package_checksum)
|
|
except OSError as err:
|
|
if err.errno != errno.EXDEV:
|
|
raise
|
|
shutil.copy2(self.download_package_checksum,
|
|
self.old_download_package_checksum)
|
|
const_setup_file(self.old_download_package_checksum,
|
|
etpConst['entropygid'], 0o664)
|
|
|
|
# now unpack in place
|
|
status = self.__unpack_advisories()
|
|
if status != 0:
|
|
mytxt = "%s: %s." % (
|
|
bold(_("Security Advisories")),
|
|
darkred(_("digest verification failed, try again later")),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
importance = 2,
|
|
level = "error",
|
|
header = red(" ## ")
|
|
)
|
|
return 6
|
|
|
|
mytxt = "%s: %s %s" % (
|
|
bold(_("Security Advisories")),
|
|
blue(_("installing")),
|
|
red("..."),
|
|
)
|
|
self._entropy.output(
|
|
mytxt,
|
|
importance = 1,
|
|
level = "info",
|
|
header = red(" # ")
|
|
)
|
|
|
|
# clear previous
|
|
self.__clear_previous_advisories()
|
|
# copy over
|
|
self.__put_advisories_in_place()
|
|
# remove temp stuff
|
|
self.__cleanup_garbage()
|
|
# clear cache
|
|
self.clear()
|
|
return 0
|
|
|
|
|
|
class Repository:
|
|
|
|
"""
|
|
This class provides a very simple Entropy repositories authenticity
|
|
mechanism based on public-key authentication. Using this class you can
|
|
sign or verify repository files.
|
|
This is the core class for public-key based repository security support.
|
|
Encryption is based on the RSA 2048bit algorithm.
|
|
|
|
NOTE: default GNUPGHOME is set to "/etc/entropy/gpg-keys".
|
|
NOTE: this class requires gnupg installed.
|
|
NOTE: thanks to http://code.google.com/p/python-gnupg project for providing
|
|
a nice testing codebase.
|
|
"""
|
|
|
|
class GPGError(EntropyException):
|
|
"""Errors during GPG commands execution"""
|
|
|
|
class GPGServiceNotAvailable(GPGError):
|
|
"""A particular feature or service is not available"""
|
|
|
|
class NothingImported(GPGError):
|
|
"""Public/private key not imported"""
|
|
|
|
class KeyAlreadyInstalled(GPGError):
|
|
"""Public/private key already installed"""
|
|
|
|
class KeyExpired(GPGError):
|
|
"""Public/private key is expired!"""
|
|
|
|
class ListKeys(list):
|
|
''' Handle status messages for --list-keys.
|
|
|
|
Handle pub and uid (relating the latter to the former).
|
|
|
|
Don't care about (info from src/DETAILS):
|
|
|
|
crt = X.509 certificate
|
|
crs = X.509 certificate and private key available
|
|
sub = subkey (secondary key)
|
|
ssb = secret subkey (secondary key)
|
|
uat = user attribute (same as user id except for field 10).
|
|
sig = signature
|
|
rev = revocation signature
|
|
pkd = public key data (special field format, see below)
|
|
grp = reserved for gpgsm
|
|
rvk = revocation key
|
|
'''
|
|
def __init__(self):
|
|
list.__init__(self)
|
|
self.curkey = None
|
|
self.fingerprints = []
|
|
|
|
def key(self, args):
|
|
myvars = ("""
|
|
type trust length algo keyid date expires dummy ownertrust uid
|
|
""").split()
|
|
self.curkey = {}
|
|
for i in range(len(myvars)):
|
|
self.curkey[myvars[i]] = args[i]
|
|
self.curkey['uids'] = [self.curkey['uid']]
|
|
del self.curkey['uid']
|
|
self.append(self.curkey)
|
|
|
|
pub = sec = key
|
|
|
|
def fpr(self, args):
|
|
self.curkey['fingerprint'] = args[9]
|
|
self.fingerprints.append(args[9])
|
|
|
|
def uid(self, args):
|
|
self.curkey['uids'].append(args[9])
|
|
|
|
def handle_status(self, key, value):
|
|
pass
|
|
|
|
_GPG_EXEC = "/usr/bin/gpg"
|
|
GPG_HOME = os.path.join(etpConst['confdir'], "gpg-keys")
|
|
|
|
def __init__(self, keystore_dir = None):
|
|
"""
|
|
Instance constructor.
|
|
|
|
@param repository_identifier: Entropy unique repository identifier
|
|
@type repository_identifier: string
|
|
"""
|
|
self.__encbits = 2048
|
|
if keystore_dir is None:
|
|
self.__keystore = Repository.GPG_HOME
|
|
else:
|
|
self.__keystore = keystore_dir
|
|
self.__keymap_file = os.path.join(self.__keystore, "entropy.keymap")
|
|
self.__key_list_cache = None
|
|
|
|
# setup repositories keys dir
|
|
if not os.path.isdir(self.__keystore) and not \
|
|
os.path.lexists(self.__keystore):
|
|
try:
|
|
os.makedirs(self.__keystore, 0o775)
|
|
except OSError as err:
|
|
if err.errno != errno.EACCES:
|
|
raise
|
|
raise Repository.GPGServiceNotAvailable(err)
|
|
|
|
# try to setup proper permissions, gpg is a pita
|
|
try:
|
|
const_setup_perms(self.__keystore, etpConst['entropygid'],
|
|
f_perms = 0o660)
|
|
except (IOError, OSError,):
|
|
raise Repository.GPGServiceNotAvailable(
|
|
"cannot setup permissions for %s" % (self.__keystore,))
|
|
|
|
if not os.access(Repository._GPG_EXEC, os.X_OK):
|
|
raise Repository.GPGServiceNotAvailable("no gnupg installed")
|
|
|
|
import socket
|
|
self.__socket = socket
|
|
|
|
def __get_date_after_days(self, days):
|
|
"""
|
|
Given a time delta expressed in days, return new ISO date string.
|
|
"""
|
|
exp_date = datetime.date.today() + datetime.timedelta(days)
|
|
year = str(exp_date.year)
|
|
month = str(exp_date.month)
|
|
day = str(exp_date.day)
|
|
if len(day) < 2:
|
|
day = '0' + day
|
|
if len(month) < 2:
|
|
month = '0' + month
|
|
return "%s-%s-%s" % (year, month, day,)
|
|
|
|
def __is_str_unixtime_in_the_past(self, unixtime):
|
|
today = datetime.date.today()
|
|
unix_date = datetime.date.fromtimestamp(float(unixtime))
|
|
return today > unix_date
|
|
|
|
def __get_keymap(self):
|
|
"""
|
|
Read Entropy keys <-> repository map from keymap file.
|
|
"""
|
|
keymap = {}
|
|
if not os.path.isfile(self.__keymap_file):
|
|
return keymap
|
|
|
|
with open(self.__keymap_file, "r") as key_f:
|
|
for line in key_f.readlines():
|
|
try:
|
|
my_repoid, my_fp = line.strip().split()
|
|
except ValueError:
|
|
continue
|
|
keymap[my_repoid] = my_fp
|
|
return keymap
|
|
|
|
def __write_keymap(self, new_keymap):
|
|
"""
|
|
Write Entropy keys <-> repository map to keymap file.
|
|
"""
|
|
# write back, safely
|
|
self.__key_list_cache = None
|
|
tmp_path = self.__keymap_file+".entropy.tmp"
|
|
enc = etpConst['conf_encoding']
|
|
with codecs.open(tmp_path, "w", encoding=enc) as key_f:
|
|
for key, fp in new_keymap.items():
|
|
key_f.write("%s %s\n" % (key, fp,))
|
|
key_f.flush()
|
|
# atomic
|
|
os.rename(tmp_path, self.__keymap_file)
|
|
const_setup_perms(self.__keymap_file, etpConst['entropygid'])
|
|
|
|
def __update_keymap(self, repoid, fingerprint):
|
|
"""
|
|
Update Entropy keys <-> repository mapping, add mapping between
|
|
repoid and fingerprint.
|
|
"""
|
|
keymap = self.__get_keymap()
|
|
keymap[repoid] = fingerprint
|
|
self.__write_keymap(keymap)
|
|
|
|
def __remove_keymap(self, repoid):
|
|
"""
|
|
Remove repository identifier <-> GPG key mapping.
|
|
"""
|
|
keymap = self.__get_keymap()
|
|
if repoid in keymap:
|
|
del keymap[repoid]
|
|
self.__write_keymap(keymap)
|
|
|
|
def __list_keys(self, secret = False, homedir = None):
|
|
|
|
which = 'keys'
|
|
if secret:
|
|
which = 'secret-keys'
|
|
args = self.__default_gpg_args(homedir = homedir) + \
|
|
["--list-%s" % (which,), "--fixed-list-mode", "--fingerprint",
|
|
"--with-colons"]
|
|
|
|
proc = subprocess.Popen(args, **self.__default_popen_args())
|
|
try:
|
|
# wait for process to terminate
|
|
proc_rc = proc.wait()
|
|
if proc_rc != 0:
|
|
raise Repository.GPGError("cannot list keys, exit status %s" % (
|
|
proc_rc,))
|
|
|
|
out_data = proc.stdout.readlines()
|
|
finally:
|
|
self.__default_popen_close(proc)
|
|
|
|
valid_keywords = ['pub', 'uid', 'sec', 'fpr']
|
|
result = Repository.ListKeys()
|
|
for line in out_data:
|
|
line = const_convert_to_unicode(line)
|
|
|
|
const_debug_write(__name__, "_list_keys: read => %s" % (
|
|
line.strip(),))
|
|
items = line.strip().split(':')
|
|
if not items:
|
|
continue
|
|
|
|
keyword = items[0]
|
|
if keyword in valid_keywords:
|
|
getattr(result, keyword)(items)
|
|
|
|
return result
|
|
|
|
def get_keys(self, private = False):
|
|
"""
|
|
Get available keys indexed by name.
|
|
|
|
@return: available keys and their metadata
|
|
@rtype: dict
|
|
"""
|
|
if self.__key_list_cache is not None:
|
|
return self.__key_list_cache.copy()
|
|
|
|
keymap = self.__get_keymap()
|
|
pubkeys = dict((x['fingerprint'], x,) for x in \
|
|
self.__list_keys(secret = private))
|
|
key_data = dict((x, pubkeys.get(y),) for x, y in keymap.items() if \
|
|
pubkeys.get(y) is not None)
|
|
self.__key_list_cache = key_data
|
|
|
|
return key_data.copy()
|
|
|
|
def __gen_key_input(self, **kwargs):
|
|
"""
|
|
Generate --gen-key input per gpg doc/DETAILS
|
|
"""
|
|
parms = {}
|
|
for key, val in list(kwargs.items()):
|
|
key = key.replace('_','-').title()
|
|
parms[key] = val
|
|
parms.setdefault('Key-Type','RSA')
|
|
parms.setdefault('Key-Length', 1024)
|
|
parms.setdefault('Name-Real', "Autogenerated Key")
|
|
parms.setdefault('Name-Comment', "Generated by gnupg.py")
|
|
try:
|
|
logname = os.environ['LOGNAME']
|
|
except KeyError:
|
|
logname = os.environ['USERNAME']
|
|
hostname = self.__socket.gethostname()
|
|
parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'),
|
|
hostname))
|
|
out = "Key-Type: %s\n" % parms.pop('Key-Type')
|
|
for key, val in list(parms.items()):
|
|
out += "%s: %s\n" % (key, val)
|
|
out += "%commit\n"
|
|
return out
|
|
|
|
# Key-Type: RSA
|
|
# Key-Length: 1024
|
|
# Name-Real: ISdlink Server on %s
|
|
# Name-Comment: Created by %s
|
|
# Name-Email: isdlink@%s
|
|
# Expire-Date: 0
|
|
# %commit
|
|
#
|
|
#
|
|
# Key-Type: DSA
|
|
# Key-Length: 1024
|
|
# Subkey-Type: ELG-E
|
|
# Subkey-Length: 1024
|
|
# Name-Real: Joe Tester
|
|
# Name-Comment: with stupid passphrase
|
|
# Name-Email: joe@foo.bar
|
|
# Expire-Date: 0
|
|
# Passphrase: abc
|
|
# %pubring foo.pub
|
|
# %secring foo.sec
|
|
# %commit
|
|
|
|
def __gen_key(self, key_input):
|
|
"""Generate a key; you might use gen_key_input() to create the
|
|
control input.
|
|
|
|
>>> gpg = GPG(gnupghome="/tmp/pygpgtest")
|
|
>>> input = gpg.gen_key_input()
|
|
>>> result = gpg.gen_key(input)
|
|
>>> assert result
|
|
>>> result = gpg.gen_key('foo')
|
|
>>> assert not result
|
|
|
|
"""
|
|
args = self.__default_gpg_args(preserve_perms = False) + \
|
|
["--status-fd", "2", "--batch", "--gen-key"]
|
|
|
|
const_debug_write(__name__, "Repository.__gen_key args => %s" % (
|
|
args,))
|
|
proc = subprocess.Popen(args,
|
|
**self.__default_popen_args(stderr = True))
|
|
try:
|
|
if sys.hexversion >= 0x3000000:
|
|
key_input = const_convert_to_rawstring(key_input)
|
|
# feed gpg with data
|
|
proc_stdout, proc_stderr = proc.communicate(input = key_input)
|
|
# wait for process to terminate
|
|
proc_rc = proc.wait()
|
|
finally:
|
|
self.__default_popen_close(proc)
|
|
|
|
if proc_rc != 0:
|
|
raise Repository.GPGError(
|
|
"cannot generate key, exit status %s" % (proc_rc,))
|
|
|
|
if sys.hexversion >= 0x3000000:
|
|
proc_stdout = const_convert_to_unicode(proc_stdout)
|
|
proc_stderr = const_convert_to_unicode(proc_stderr)
|
|
# now get fucking fingerprint
|
|
key_data = [x.strip() for x in (proc_stdout+proc_stderr).split("\n") \
|
|
if x.strip() and "KEY_CREATED" in x.split()]
|
|
if not key_data or len(key_data) > 1:
|
|
raise Repository.GPGError(
|
|
"cannot grab fingerprint of newly created key, data: %s" % (
|
|
proc_stdout,))
|
|
# cross fingers
|
|
fp = key_data[0].split()[-1]
|
|
return fp
|
|
|
|
def create_keypair(self, repository_identifier, passphrase = None,
|
|
name_email = None, expiration_days = None):
|
|
"""
|
|
Create Entropy repository GPG keys and store them.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@param passphrase: passphrase to use
|
|
@type passphrase: string
|
|
@param name_email: email to use
|
|
@type name_email: string
|
|
@param expiration_days: number of days after the key expires
|
|
@type expiration_days: int
|
|
@return: Repository key fingerprint string
|
|
@rtype: string
|
|
@raise KeyError: if another keypair is already set
|
|
"""
|
|
kwargs = {
|
|
'key_length': self.__encbits,
|
|
'name_real': repository_identifier,
|
|
'name_comment': '%s [%s|%s] repository key' % (
|
|
repository_identifier, etpConst['currentarch'],
|
|
etpConst['product'],),
|
|
}
|
|
if name_email:
|
|
kwargs['name_email'] = name_email
|
|
if passphrase:
|
|
kwargs['passphrase'] = passphrase
|
|
if expiration_days:
|
|
kwargs['expire_date'] = self.__get_date_after_days(expiration_days)
|
|
|
|
key_input = self.__gen_key_input(**kwargs)
|
|
key_output = self.__gen_key(key_input)
|
|
|
|
# write to keymap
|
|
self.__update_keymap(repository_identifier, key_output)
|
|
|
|
# ensure permissions
|
|
const_setup_perms(self.__keystore, etpConst['entropygid'],
|
|
f_perms = 0o660)
|
|
|
|
return key_output
|
|
|
|
def get_key_metadata(self, repository_identifier, private = False):
|
|
"""
|
|
Return key metadata for given repository identifier.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@keyword private: return metadata related to private key
|
|
@type private: bool
|
|
@raise KeyError: if no keys are set
|
|
@return: key metadata
|
|
@rtype: dict
|
|
"""
|
|
keyring = self.get_keys(private = private)
|
|
return keyring[repository_identifier]
|
|
|
|
def __delete_key(self, fingerprint, secret = False):
|
|
|
|
args = self.__default_gpg_args(preserve_perms=False) + \
|
|
["--batch", "--yes"]
|
|
if secret:
|
|
args.append("--delete-secret-key")
|
|
else:
|
|
args.append("--delete-key")
|
|
args.append(fingerprint)
|
|
|
|
const_debug_write(__name__, "Repository.__delete_key args => %s" % (
|
|
args,))
|
|
proc = subprocess.Popen(args, **self.__default_popen_args())
|
|
try:
|
|
# wait for process to terminate
|
|
proc_rc = proc.wait()
|
|
finally:
|
|
self.__default_popen_close(proc)
|
|
|
|
if proc_rc != 0:
|
|
raise Repository.GPGError(
|
|
"cannot delete key fingerprint %s, exit status %s" % (
|
|
fingerprint, proc_rc,))
|
|
|
|
def delete_keypair(self, repository_identifier):
|
|
"""
|
|
Delete keys (public and private) for currently set repository.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@raise KeyError: if key for given repository doesn't exist
|
|
"""
|
|
keymap = self.__get_keymap()
|
|
fingerprint = keymap[repository_identifier]
|
|
self.__delete_key(fingerprint, secret = True)
|
|
self.__delete_key(fingerprint)
|
|
self.__remove_keymap(repository_identifier)
|
|
# ensure permissions
|
|
const_setup_perms(self.__keystore, etpConst['entropygid'],
|
|
f_perms = 0o660)
|
|
|
|
def is_pubkey_expired(self, repository_identifier):
|
|
"""
|
|
Return whether public key is expired.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@return: True, if key is expired
|
|
@rtype: bool
|
|
@raise KeyError, if key is not available
|
|
"""
|
|
ts = self.get_key_metadata(repository_identifier)
|
|
if not ts['expires']:
|
|
return False
|
|
try:
|
|
return self.__is_str_unixtime_in_the_past(ts['expires'])
|
|
except ValueError:
|
|
return False
|
|
|
|
def is_privkey_expired(self, repository_identifier):
|
|
"""
|
|
Return whether private key is expired.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@return: True, if key is expired
|
|
@rtype: bool
|
|
@raise KeyError, if key is not available
|
|
"""
|
|
ts = self.get_key_metadata(repository_identifier, private = True)
|
|
if not ts['expires']:
|
|
return False
|
|
try:
|
|
return self.__is_str_unixtime_in_the_past(ts['expires'])
|
|
except ValueError:
|
|
return False
|
|
|
|
def is_keypair_available(self, repository_identifier):
|
|
"""
|
|
Return whether public and private key for given repository identifier
|
|
is available.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@return: True, if public and private key is available
|
|
@rtype: bool
|
|
@raise Repository.KeyExpired: if key is expired
|
|
"""
|
|
try:
|
|
self.get_key_metadata(repository_identifier)
|
|
except KeyError:
|
|
return False
|
|
try:
|
|
self.get_key_metadata(repository_identifier, private = True)
|
|
except KeyError:
|
|
return False
|
|
|
|
if self.is_privkey_expired(repository_identifier):
|
|
raise Repository.KeyExpired("Key for %s is expired !" % (
|
|
repository_identifier,))
|
|
|
|
return True
|
|
|
|
def is_pubkey_available(self, repository_identifier):
|
|
"""
|
|
Return whether public key for given repository identifier is available.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@return: True, if public key is available
|
|
@rtype: bool
|
|
@raise Repository.KeyExpired: if key is expired
|
|
"""
|
|
try:
|
|
self.get_pubkey(repository_identifier)
|
|
except KeyError:
|
|
return False
|
|
|
|
try:
|
|
if self.is_pubkey_expired(repository_identifier):
|
|
raise Repository.KeyExpired("Key for %s is expired !" % (
|
|
repository_identifier,))
|
|
except Repository.GPGError:
|
|
# wtf! something like => GPGError: cannot list keys, exit status 2
|
|
return False
|
|
|
|
return True
|
|
|
|
def is_privkey_available(self, repository_identifier):
|
|
"""
|
|
Return whether private key for given repository identifier is available.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@return: True, if private key is available
|
|
@rtype: bool
|
|
@raise Repository.KeyExpired: if key is expired
|
|
"""
|
|
try:
|
|
self.get_privkey(repository_identifier)
|
|
except KeyError:
|
|
return False
|
|
|
|
if self.is_privkey_expired(repository_identifier):
|
|
raise Repository.KeyExpired("Key for %s is expired !" % (
|
|
repository_identifier,))
|
|
|
|
return True
|
|
|
|
def __export_key(self, fingerprint, key_type = "public"):
|
|
"""
|
|
Export GPG keys to string.
|
|
"""
|
|
|
|
args = self.__default_gpg_args() + ["--armor"]
|
|
if key_type == "public":
|
|
args += ["--export"]
|
|
elif key_type == "private":
|
|
args += ["--export-secret-key"]
|
|
else:
|
|
raise AttributeError("invalid key_type")
|
|
args.append(fingerprint)
|
|
|
|
proc = subprocess.Popen(args, **self.__default_popen_args())
|
|
try:
|
|
# wait for process to terminate
|
|
proc_rc = proc.wait()
|
|
|
|
if proc_rc != 0:
|
|
raise Repository.GPGError(
|
|
"cannot export key which fingerprint is %s, error: %s" % (
|
|
fingerprint, proc_rc,))
|
|
|
|
key_string = proc.stdout.read()
|
|
if sys.hexversion >= 0x3000000:
|
|
key_string = const_convert_to_unicode(key_string)
|
|
return key_string
|
|
finally:
|
|
self.__default_popen_close(proc)
|
|
|
|
def get_pubkey(self, repository_identifier):
|
|
"""
|
|
Get public key for currently set repository, if any, otherwise raise
|
|
KeyError.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@return: public key
|
|
@rtype: string
|
|
@raise KeyError: if no keypair is set for repository
|
|
"""
|
|
keymap = self.__get_keymap()
|
|
fingerprint = keymap[repository_identifier]
|
|
try:
|
|
pubkey = self.__export_key(fingerprint)
|
|
except Repository.GPGError as err:
|
|
raise KeyError(repr(err))
|
|
return pubkey
|
|
|
|
def get_privkey(self, repository_identifier):
|
|
"""
|
|
Get private key for currently set repository, if any, otherwise raise
|
|
KeyError.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@return: private key
|
|
@rtype: string
|
|
@raise KeyError: if no keypair is set for repository
|
|
"""
|
|
keymap = self.__get_keymap()
|
|
fingerprint = keymap[repository_identifier]
|
|
pubkey = self.__export_key(fingerprint, key_type = "private")
|
|
return pubkey
|
|
|
|
def get_key_fingerprint(self, key_path):
|
|
"""
|
|
Return the fingerprint contained in the given key file, if any.
|
|
Otherwise return None.
|
|
|
|
@param key_path: valid path to GPG key file
|
|
@type key_path: string
|
|
"""
|
|
tmp_dir = None
|
|
try:
|
|
tmp_dir = tempfile.mkdtemp(prefix=".entropy.security.get_fp")
|
|
args = self.__default_gpg_args(preserve_perms = False,
|
|
homedir = tmp_dir)
|
|
args += ["--import", key_path]
|
|
|
|
proc = subprocess.Popen(args, **self.__default_popen_args())
|
|
try:
|
|
# wait for process to terminate
|
|
proc_rc = proc.wait()
|
|
finally:
|
|
self.__default_popen_close(proc)
|
|
|
|
if proc_rc != 0:
|
|
return None
|
|
|
|
now_keys = set([x['fingerprint'] for x in self.__list_keys(
|
|
homedir = tmp_dir)])
|
|
if not now_keys:
|
|
return None
|
|
# NOTE: not supporting multiple keys, is this a problem?
|
|
return now_keys.pop()
|
|
finally:
|
|
if tmp_dir is not None:
|
|
shutil.rmtree(tmp_dir, True)
|
|
|
|
def install_key(self, repository_identifier, key_path,
|
|
ignore_nothing_imported = False, merge_key = False):
|
|
"""
|
|
Add key to keyring.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@param key_path: valid path to GPG key file
|
|
@type key_path: string
|
|
@keyword ignore_nothing_imported: if True, ignore NothingImported
|
|
exception
|
|
@type ignore_nothing_imported: bool
|
|
@keyword merge_key: add --import-options merge-only to gpg callback
|
|
@type merge_key: bool
|
|
@return: fingerprint
|
|
@rtype: string
|
|
@raise KeyAlreadyInstalled: if key is already installed
|
|
@raise NothingImported: if key_path contains garbage
|
|
"""
|
|
args = self.__default_gpg_args(preserve_perms = False)
|
|
if merge_key:
|
|
args += ["--import-options", "merge-only"]
|
|
args += ["--import", key_path]
|
|
try:
|
|
current_keys = set([x['fingerprint'] for x in self.__list_keys()])
|
|
except OSError as err:
|
|
if err.errno == errno.EIO:
|
|
raise Repository.GPGError(
|
|
"cannot list keys for %s" % (repository_identifier))
|
|
raise
|
|
|
|
proc = subprocess.Popen(args, **self.__default_popen_args())
|
|
try:
|
|
# wait for process to terminate
|
|
proc_rc = proc.wait()
|
|
finally:
|
|
self.__default_popen_close(proc)
|
|
|
|
if proc_rc != 0:
|
|
raise Repository.GPGError(
|
|
"cannot install key at %s, for %s" % (
|
|
key_path, repository_identifier))
|
|
|
|
now_keys = set([x['fingerprint'] for x in self.__list_keys()])
|
|
new_keys = now_keys - current_keys
|
|
if (len(new_keys) < 1) and not ignore_nothing_imported:
|
|
raise Repository.NothingImported(
|
|
"nothing imported from %s, for %s" % (
|
|
key_path, repository_identifier,))
|
|
|
|
nothing_imported = False
|
|
if len(new_keys) < 1 and ignore_nothing_imported:
|
|
nothing_imported = True
|
|
|
|
if len(new_keys) > 1:
|
|
raise Repository.KeyAlreadyInstalled(
|
|
"wtf? more than one key imported from %s, for %s" % (
|
|
key_path, repository_identifier,))
|
|
|
|
fp = None
|
|
if not nothing_imported:
|
|
fp = new_keys.pop()
|
|
self.__update_keymap(repository_identifier, fp)
|
|
fp = str(fp)
|
|
|
|
# setup perms again
|
|
const_setup_perms(self.__keystore, etpConst['entropygid'],
|
|
f_perms = 0o660)
|
|
|
|
return fp
|
|
|
|
def delete_pubkey(self, repository_identifier):
|
|
"""
|
|
Delete public key bound to given repository identifier.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@raise KeyError: if no key is set for given repository identifier
|
|
"""
|
|
metadata = self.get_key_metadata(repository_identifier)
|
|
self.__remove_keymap(repository_identifier)
|
|
self.__delete_key(metadata['fingerprint'])
|
|
# setup perms again
|
|
const_setup_perms(self.__keystore, etpConst['entropygid'],
|
|
f_perms = 0o660)
|
|
|
|
def __default_gpg_args(self, preserve_perms = True, homedir = None):
|
|
if homedir is None:
|
|
homedir = self.__keystore
|
|
args = [Repository._GPG_EXEC, "--no-tty", "--no-permission-warning",
|
|
"--no-greeting", "--homedir", homedir]
|
|
if preserve_perms:
|
|
args.append("--preserve-permissions")
|
|
return args
|
|
|
|
def __default_popen_args(self, stderr = False):
|
|
kwargs = {
|
|
'stdout': subprocess.PIPE,
|
|
'stdin': subprocess.PIPE,
|
|
}
|
|
if (not etpUi['debug']) or stderr:
|
|
kwargs['stderr'] = subprocess.PIPE
|
|
return kwargs
|
|
|
|
def __default_popen_close(self, proc):
|
|
if proc.stdout is not None:
|
|
proc.stdout.close()
|
|
if proc.stderr is not None:
|
|
proc.stderr.close()
|
|
if proc.stdin is not None:
|
|
proc.stdin.close()
|
|
|
|
def __sign_file(self, file_path, fingerprint):
|
|
|
|
args = self.__default_gpg_args() + ["-sa", "--detach-sign"]
|
|
if fingerprint:
|
|
args += ["--default-key", fingerprint]
|
|
|
|
args.append(file_path)
|
|
const_debug_write(__name__, "Repository.__sign_file args => %s" % (
|
|
args,))
|
|
|
|
asc_path = file_path + etpConst['etpgpgextension']
|
|
# remove previously stored .asc
|
|
if os.path.isfile(asc_path):
|
|
const_debug_write(__name__,
|
|
"Repository.__sign_file had to rm %s" % (asc_path,))
|
|
os.remove(asc_path)
|
|
|
|
|
|
proc = subprocess.Popen(args, **self.__default_popen_args())
|
|
try:
|
|
# wait for process to terminate
|
|
proc_rc = proc.wait()
|
|
|
|
if proc_rc != 0:
|
|
raise Repository.GPGError(
|
|
"cannot sign file %s, exit status %s" % (
|
|
file_path, proc_rc,))
|
|
|
|
if not os.path.isfile(asc_path):
|
|
raise OSError("cannot find %s" % (asc_path,))
|
|
|
|
return asc_path
|
|
|
|
finally:
|
|
self.__default_popen_close(proc)
|
|
|
|
def sign_file(self, repository_identifier, file_path):
|
|
"""
|
|
Sign given file path using key of given repository identifier.
|
|
A custom passphrase can be provided as string.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@param file_path: path to file to sign
|
|
@type file_path: string
|
|
@return: path to signature file
|
|
@rtype: string
|
|
"""
|
|
metadata = self.get_key_metadata(repository_identifier)
|
|
return self.__sign_file(file_path, metadata['fingerprint'])
|
|
|
|
def __verify_file(self, file_path, signature_path, fingerprint):
|
|
|
|
args = self.__default_gpg_args()
|
|
if fingerprint:
|
|
args += ["--default-key", fingerprint]
|
|
args += ["--verify", signature_path, file_path]
|
|
const_debug_write(__name__, "Repository.__verify_file args => %s" % (
|
|
args,))
|
|
|
|
proc = subprocess.Popen(args, **self.__default_popen_args())
|
|
try:
|
|
# wait for process to terminate
|
|
proc_rc = proc.wait()
|
|
|
|
if proc_rc != 0:
|
|
raise Repository.GPGError("cannot verify file %s, exit status %s" % (
|
|
file_path, proc_rc,))
|
|
finally:
|
|
self.__default_popen_close(proc)
|
|
|
|
def verify_file(self, repository_identifier, file_path, signature_path):
|
|
"""
|
|
Verify file in file_path usign signature in signature_path and key from
|
|
repository_identifier.
|
|
|
|
@param repository_identifier: repository identifier
|
|
@type repository_identifier: string
|
|
@param file_path: path to file to verify
|
|
@type file_path: string
|
|
@param signature_path: path to signature to verify
|
|
@type signature_path: string
|
|
@return: a tuple composed by (validity_bool, error message,)
|
|
@rtype: tuple
|
|
"""
|
|
metadata = self.get_key_metadata(repository_identifier)
|
|
try:
|
|
self.__verify_file(file_path, signature_path, metadata['fingerprint'])
|
|
except Repository.GPGError as err:
|
|
const_debug_write(__name__, "Repository.verify_file error: %s" % (
|
|
err,))
|
|
return False, str(err)
|
|
return True, ''
|