Imported Debian patch 4.8.10-2

This commit is contained in:
Timo Aaltonen
2020-11-23 20:48:56 +02:00
committed by Mario Fetka
parent 8bc559c5a1
commit 358acdd85f
917 changed files with 1185414 additions and 1069733 deletions

View File

@@ -1,93 +1,106 @@
# Copyright (C) 2015 IPA Project Contributors, see COPYING for license
from __future__ import print_function, absolute_import
import contextlib
import os
import secrets
from base64 import b64encode
# pylint: disable=relative-import
from custodia.message.kem import KEMClient, KEY_USAGE_SIG, KEY_USAGE_ENC
# pylint: enable=relative-import
from jwcrypto.common import json_decode
from jwcrypto.jwk import JWK
from ipalib.krb_utils import krb5_format_service_principal_name
from ipaserver.secrets.kem import IPAKEMKeys
from ipaserver.secrets.store import iSecStore
from ipaserver.secrets.store import IPASecStore
from ipaplatform.paths import paths
from base64 import b64encode
import ldapurl
import gssapi
import os
import urllib3
import requests
class CustodiaClient(object):
@contextlib.contextmanager
def ccache_env(ccache):
"""Temporarily set KRB5CCNAME environment variable
"""
orig_ccache = os.environ.get('KRB5CCNAME')
os.environ['KRB5CCNAME'] = ccache
try:
yield
finally:
os.environ.pop('KRB5CCNAME', None)
if orig_ccache is not None:
os.environ['KRB5CCNAME'] = orig_ccache
class CustodiaClient:
def __init__(self, client_service, keyfile, keytab, server, realm,
ldap_uri=None, auth_type=None):
if client_service.endswith(realm) or "@" not in client_service:
raise ValueError(
"Client service name must be a GSS name (service@host), "
"not '{}'.".format(client_service)
)
self.client_service = client_service
self.keytab = keytab
self.server = server
self.realm = realm
self.ldap_uri = ldap_uri
self.auth_type = auth_type
self.service_name = gssapi.Name(
'HTTP@{}'.format(server), gssapi.NameType.hostbased_service
)
self.keystore = IPASecStore()
# use in-process MEMORY ccache. Handler process don't need a TGT.
self.ccache = 'MEMORY:Custodia_{}'.format(secrets.token_hex())
with ccache_env(self.ccache):
# Init creds immediately to make sure they are valid. Creds
# can also be re-inited by _auth_header to avoid expiry.
self.creds = self._init_creds()
self.ikk = IPAKEMKeys(
{'server_keys': keyfile, 'ldap_uri': ldap_uri}
)
self.kemcli = KEMClient(
self._server_keys(), self._client_keys()
)
def _client_keys(self):
return self.ikk.server_keys
def _server_keys(self, server, realm):
principal = 'host/%s@%s' % (server, realm)
def _server_keys(self):
principal = krb5_format_service_principal_name(
'host', self.server, self.realm
)
sk = JWK(**json_decode(self.ikk.find_key(principal, KEY_USAGE_SIG)))
ek = JWK(**json_decode(self.ikk.find_key(principal, KEY_USAGE_ENC)))
return (sk, ek)
return sk, ek
def _ldap_uri(self, realm):
dashrealm = '-'.join(realm.split('.'))
socketpath = paths.SLAPD_INSTANCE_SOCKET_TEMPLATE % (dashrealm,)
return 'ldapi://' + ldapurl.ldapUrlEscape(socketpath)
def _keystore(self, realm, ldap_uri, auth_type):
config = dict()
if ldap_uri is None:
config['ldap_uri'] = self._ldap_uri(realm)
else:
config['ldap_uri'] = ldap_uri
if auth_type is not None:
config['auth_type'] = auth_type
return iSecStore(config)
def __init__(
self, client_service, keyfile, keytab, server, realm,
ldap_uri=None, auth_type=None):
self.client_service = client_service
self.keytab = keytab
# Init creds immediately to make sure they are valid. Creds
# can also be re-inited by _auth_header to avoid expiry.
#
self.creds = self.init_creds()
self.service_name = gssapi.Name('HTTP@%s' % (server,),
gssapi.NameType.hostbased_service)
self.server = server
self.ikk = IPAKEMKeys({'server_keys': keyfile, 'ldap_uri': ldap_uri})
self.kemcli = KEMClient(self._server_keys(server, realm),
self._client_keys())
self.keystore = self._keystore(realm, ldap_uri, auth_type)
# FIXME: Remove warnings about missing subjAltName for the
# requests module
urllib3.disable_warnings()
def init_creds(self):
name = gssapi.Name(self.client_service,
gssapi.NameType.hostbased_service)
store = {'client_keytab': self.keytab,
'ccache': 'MEMORY:Custodia_%s' % b64encode(
os.urandom(8)).decode('ascii')}
def _init_creds(self):
name = gssapi.Name(
self.client_service, gssapi.NameType.hostbased_service
)
store = {
'client_keytab': self.keytab,
'ccache': self.ccache
}
return gssapi.Credentials(name=name, store=store, usage='initiate')
def _auth_header(self):
if not self.creds or self.creds.lifetime < 300:
self.creds = self.init_creds()
ctx = gssapi.SecurityContext(name=self.service_name, creds=self.creds)
if self.creds.lifetime < 300:
self.creds = self._init_creds()
ctx = gssapi.SecurityContext(
name=self.service_name,
creds=self.creds
)
authtok = ctx.step()
return {'Authorization': 'Negotiate %s' % b64encode(
authtok).decode('ascii')}
def fetch_key(self, keyname, store=True):
# Prepare URL
url = 'https://%s/ipa/keys/%s' % (self.server, keyname)
@@ -99,9 +112,11 @@ class CustodiaClient(object):
headers = self._auth_header()
# Perform request
r = requests.get(url, headers=headers,
verify=paths.IPA_CA_CRT,
params={'type': 'kem', 'value': request})
r = requests.get(
url, headers=headers,
verify=paths.IPA_CA_CRT,
params={'type': 'kem', 'value': request}
)
r.raise_for_status()
reply = r.json()

View File

@@ -7,7 +7,7 @@ import ldap.filter
from ipapython.ipaldap import ldap_initialize
class iSecLdap(object):
class iSecLdap:
def __init__(self, uri, auth_type=None):
self.uri = uri

View File

@@ -0,0 +1,2 @@
"""Export / import handlers
"""

View File

@@ -0,0 +1,75 @@
#
# Copyright (C) 2019 IPA Project Contributors, see COPYING for license
#
"""Common helpers for handlers
"""
import argparse
import base64
import json
import shutil
import tempfile
def default_json(obj):
"""JSON encoder default handler
"""
if isinstance(obj, (bytes, bytearray)):
return base64.b64encode(obj).decode('ascii')
raise TypeError(
"Object of type {} is not JSON serializable".format(type(obj))
)
def json_dump(data, exportfile):
"""Dump JSON to file
"""
json.dump(
data,
exportfile,
default=default_json,
separators=(',', ':'),
sort_keys=True
)
def mkparser(supports_import=True, **kwargs):
"""Create default parser for handler with export / import args
All commands support export to file or stdout. Most commands can also
import from a file or stdin. Export and import are mutually exclusive
options.
"""
parser = argparse.ArgumentParser(**kwargs)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--export',
help='JSON export file ("-" for stdout)',
dest='exportfile',
type=argparse.FileType('w')
)
if supports_import:
group.add_argument(
'--import',
help='JSON import file ("-" for stdin)',
dest='importfile',
type=argparse.FileType('r')
)
return parser
def main(parser, export_func, import_func=None, **kwargs):
"""Common main function for handlers
"""
args = parser.parse_args()
if args.exportfile is not None:
func = export_func
else:
func = import_func
tmpdir = tempfile.mkdtemp()
try:
func(args, tmpdir, **kwargs)
finally:
shutil.rmtree(tmpdir)

View File

@@ -0,0 +1,63 @@
#
# Copyright (C) 2019 IPA Project Contributors, see COPYING for license
#
"""Export / import Directory Manager password hash
"""
import json
import os
from ipalib import api
from ipalib import errors
from ipaplatform.paths import paths
from ipapython.dn import DN
from ipapython.ipaldap import LDAPClient, realm_to_ldapi_uri
from . import common
CN_CONFIG = DN(('cn', 'config'))
ROOTPW = 'nsslapd-rootpw'
def export_key(args, tmpdir, conn):
entry = conn.get_entry(CN_CONFIG, [ROOTPW])
data = {
'dmhash': entry.single_value[ROOTPW],
}
common.json_dump(data, args.exportfile)
def import_key(args, tmpdir, conn):
data = json.load(args.importfile)
dmhash = data['dmhash'].encode('ascii')
entry = conn.get_entry(CN_CONFIG, [ROOTPW])
entry.single_value[ROOTPW] = dmhash
try:
conn.update_entry(entry)
except errors.EmptyModlist:
pass
def main():
parser = common.mkparser(
description='ipa-custodia LDAP DM hash handler'
)
if os.getegid() != 0:
parser.error("Must be run as root user.\n")
# create LDAP connection using LDAPI and EXTERNAL bind as root
if not api.isdone('bootstrap'):
api.bootstrap(confdir=paths.ETC_IPA, log=None)
realm = api.env.realm
ldap_uri = realm_to_ldapi_uri(realm)
conn = LDAPClient(ldap_uri=ldap_uri, no_schema=True)
try:
conn.external_bind()
except Exception as e:
parser.error("Failed to connect to {}: {}\n".format(ldap_uri, e))
with conn:
common.main(parser, export_key, import_key, conn=conn)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,122 @@
#
# Copyright (C) 2019 IPA Project Contributors, see COPYING for license
#
"""Export / import cert and key from NSS DB as PKCS#12 data
"""
import base64
import json
import os
from ipaplatform.paths import paths
from ipapython import ipautil
from ipapython.certdb import NSSDatabase
from . import common
def export_key(args, tmpdir):
"""Export key and certificate from the NSS DB to a PKCS#12 file.
The PKCS#12 file is encrypted with a password.
"""
pk12file = os.path.join(tmpdir, 'export.p12')
password = ipautil.ipa_generate_password()
pk12pk12pwfile = os.path.join(tmpdir, 'passwd')
with open(pk12pk12pwfile, 'w') as f:
f.write(password)
nssdb = NSSDatabase(args.nssdb_path)
nssdb.run_pk12util([
"-o", pk12file,
"-n", args.nickname,
"-k", args.nssdb_pwdfile,
"-w", pk12pk12pwfile,
])
with open(pk12file, 'rb') as f:
p12data = f.read()
data = {
'export password': password,
'pkcs12 data': p12data,
}
common.json_dump(data, args.exportfile)
def import_key(args, tmpdir):
"""Import key and certificate from a PKCS#12 file to a NSS DB.
"""
data = json.load(args.importfile)
password = data['export password']
p12data = base64.b64decode(data['pkcs12 data'])
pk12pwfile = os.path.join(tmpdir, 'passwd')
with open(pk12pwfile, 'w') as f:
f.write(password)
pk12file = os.path.join(tmpdir, 'import.p12')
with open(pk12file, 'wb') as f:
f.write(p12data)
nssdb = NSSDatabase(args.nssdb_path)
nssdb.run_pk12util([
"-i", pk12file,
"-n", args.nickname,
"-k", args.nssdb_pwdfile,
"-w", pk12pwfile,
])
def default_parser():
"""Generic interface
"""
parser = common.mkparser(
description='ipa-custodia NSS cert handler'
)
parser.add_argument(
'--nssdb',
dest='nssdb_path',
help='path to NSS DB',
required=True
)
parser.add_argument(
'--pwdfile',
dest='nssdb_pwdfile',
help='path to password file for NSS DB',
required=True
)
parser.add_argument(
'--nickname',
help='nick name of certificate',
required=True
)
return parser
def pki_tomcat_parser():
"""Hard-code Dogtag's NSSDB and its password file
"""
parser = common.mkparser(
description='ipa-custodia pki-tomcat NSS cert handler'
)
parser.add_argument(
'--nickname',
help='nick name of certificate',
required=True
)
parser.set_defaults(
nssdb_path=paths.PKI_TOMCAT_ALIAS_DIR,
nssdb_pwdfile=paths.PKI_TOMCAT_ALIAS_PWDFILE_TXT,
)
return parser
def main(parser=None):
if parser is None:
parser = default_parser()
common.main(parser, export_key, import_key)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,126 @@
#
# Copyright (C) 2019 IPA Project Contributors, see COPYING for license
#
"""Export and wrap key from NSS DB
"""
import os
from ipaplatform.paths import paths
from ipapython import ipautil
from ipapython.certdb import NSSDatabase
from . import common
def export_key(args, tmpdir):
"""Export key and certificate from the NSS DB
The private key is encrypted using key wrapping.
"""
wrapped_key_file = os.path.join(tmpdir, 'wrapped_key')
certificate_file = os.path.join(tmpdir, 'certificate')
ipautil.run([
paths.PKI,
'-d', args.nssdb_path,
'-C', args.nssdb_pwdfile,
'ca-authority-key-export',
'--wrap-nickname', args.wrap_nickname,
'--target-nickname', args.nickname,
'--algorithm', args.algorithm,
'-o', wrapped_key_file
])
nssdb = NSSDatabase(args.nssdb_path)
nssdb.run_certutil([
'-L',
'-n', args.nickname,
'-a',
'-o', certificate_file,
])
with open(wrapped_key_file, 'rb') as f:
wrapped_key = f.read()
with open(certificate_file, 'r') as f:
certificate = f.read()
data = {
'wrapped_key': wrapped_key,
'certificate': certificate
}
common.json_dump(data, args.exportfile)
def default_parser():
"""Generic interface
"""
parser = common.mkparser(
supports_import=False,
description='ipa-custodia NSS wrapped cert handler',
)
parser.add_argument(
'--nssdb',
dest='nssdb_path',
help='path to NSS DB',
required=True
)
parser.add_argument(
'--pwdfile',
dest='nssdb_pwdfile',
help='path to password file for NSS DB',
required=True
)
parser.add_argument(
'--wrap-nickname',
dest='wrap_nickname',
help='nick name of wrapping key',
required=True
)
parser.add_argument(
'--nickname',
dest='nickname',
help='nick name of target key',
required=True
)
return parser
def pki_tomcat_parser():
"""Hard-code Dogtag's NSS DB, its password file, and CA key for wrapping
"""
parser = common.mkparser(
supports_import=False,
description='ipa-custodia pki-tomcat NSS wrapped cert handler',
)
parser.add_argument(
'--nickname',
dest='nickname',
help='nick name of target key',
required=True
)
# Caller must specify a cipher. This gets passed on to
# the 'pki ca-authority-key-export' command (part of
# Dogtag) via its own --algorithm option.
parser.add_argument(
'--algorithm',
dest='algorithm',
help='OID of symmetric wrap algorithm',
required=True
)
parser.set_defaults(
nssdb_path=paths.PKI_TOMCAT_ALIAS_DIR,
nssdb_pwdfile=paths.PKI_TOMCAT_ALIAS_PWDFILE_TXT,
wrap_nickname='caSigningCert cert-pki-ca',
)
return parser
def main(parser=None):
if parser is None:
parser = default_parser()
common.main(parser, export_key, None)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,118 @@
#
# Copyright (C) 2019 IPA Project Contributors, see COPYING for license
#
"""Export / import PEM cert and key file as PKCS#12 data
"""
import base64
import json
import os
from ipaplatform.paths import paths
from ipapython import ipautil
from . import common
def export_key(args, tmpdir):
"""Export cert and private from PEM files as PKCS#12 file.
The PKCS#12 file is encrypted with a password.
"""
pk12file = os.path.join(tmpdir, 'export.p12')
password = ipautil.ipa_generate_password()
pk12pwfile = os.path.join(tmpdir, 'passwd')
with open(pk12pwfile, 'w') as f:
f.write(password)
# OpenSSL does not support pkcs12 export of a cert without key
ipautil.run([
paths.OPENSSL, 'pkcs12', '-export',
'-in', args.certfile,
'-out', pk12file,
'-inkey', args.keyfile,
'-password', 'file:{pk12pwfile}'.format(pk12pwfile=pk12pwfile),
])
with open(pk12file, 'rb') as f:
p12data = f.read()
data = {
'export password': password,
'pkcs12 data': p12data,
}
common.json_dump(data, args.exportfile)
def import_key(args, tmpdir):
"""Export key and certificate from a PKCS#12 file to key and cert files.
"""
data = json.load(args.importfile)
password = data['export password']
p12data = base64.b64decode(data['pkcs12 data'])
pk12pwfile = os.path.join(tmpdir, 'passwd')
with open(pk12pwfile, 'w') as f:
f.write(password)
pk12file = os.path.join(tmpdir, 'import.p12')
with open(pk12file, 'wb') as f:
f.write(p12data)
# get the certificate from the file
cmd = [
paths.OPENSSL, 'pkcs12',
'-in', pk12file,
'-clcerts', '-nokeys',
'-out', args.certfile,
'-password', 'file:{pk12pwfile}'.format(pk12pwfile=pk12pwfile),
]
ipautil.run(cmd, umask=0o027)
# get the private key from the file
cmd = [
paths.OPENSSL, 'pkcs12',
'-in', pk12file,
'-nocerts', '-nodes',
'-out', args.keyfile,
'-password', 'file:{pk12pwfile}'.format(pk12pwfile=pk12pwfile),
]
ipautil.run(cmd, umask=0o027)
def default_parser():
parser = common.mkparser(
description='ipa-custodia PEM file handler'
)
parser.add_argument(
'--certfile',
help='path to PEM encoded cert file',
required=True
)
parser.add_argument(
'keyfile',
help='path to PEM encoded key file',
required=True
)
return parser
def ra_agent_parser():
parser = common.mkparser(
description='ipa-custodia RA agent cert handler'
)
parser.set_defaults(
certfile=paths.RA_AGENT_PEM,
keyfile=paths.RA_AGENT_KEY
)
return parser
def main(parser=None):
if parser is None:
parser = default_parser()
common.main(parser, export_key, import_key)
if __name__ == '__main__':
main()

View File

@@ -5,9 +5,7 @@ from __future__ import print_function, absolute_import
import errno
import os
# pylint: disable=import-error
from six.moves.configparser import ConfigParser
# pylint: enable=import-error
from configparser import ConfigParser
from ipaplatform.paths import paths
from ipapython.dn import DN

View File

@@ -1,28 +1,39 @@
# Copyright (C) 2015 IPA Project Contributors, see COPYING for license
from __future__ import print_function, absolute_import
from base64 import b64encode, b64decode
from custodia.store.interface import CSStore # pylint: disable=relative-import
from jwcrypto.common import json_decode, json_encode
from ipaplatform.paths import paths
from ipapython import ipautil
from ipapython.certdb import NSSDatabase
from ipaserver.secrets.common import iSecLdap
import ldap
import os
import shutil
import sys
import tempfile
from custodia.plugin import CSStore
from ipaplatform.paths import paths
from ipaplatform.constants import constants
from ipapython import ipautil
class UnknownKeyName(Exception):
pass
class DBMAPHandler(object):
class InvalidKeyArguments(Exception):
pass
class DBMAPHandler:
dbtype = None
supports_extra_args = False
def __init__(self, config, dbmap, nickname):
raise NotImplementedError
dbtype = dbmap.get('type')
if dbtype is None or dbtype != self.dbtype:
raise ValueError(
"Invalid type '{}', expected '{}'".format(
dbtype, self.dbtype
)
)
self.config = config
self.dbmap = dbmap
self.nickname = nickname
def export_key(self):
raise NotImplementedError
@@ -31,229 +42,146 @@ class DBMAPHandler(object):
raise NotImplementedError
class DBMAPCommandHandler(DBMAPHandler):
def __init__(self, config, dbmap, nickname):
super().__init__(config, dbmap, nickname)
self.runas = dbmap.get('runas')
self.command = os.path.join(
paths.IPA_CUSTODIA_HANDLER,
dbmap['command']
)
def run_handler(self, extra_args=(), stdin=None):
"""Run handler script to export / import key material
"""
args = [self.command]
args.extend(extra_args)
kwargs = dict(
runas=self.runas,
encoding='utf-8',
)
if stdin:
args.extend(['--import', '-'])
kwargs.update(stdin=stdin)
else:
args.extend(['--export', '-'])
kwargs.update(capture_output=True)
result = ipautil.run(args, **kwargs)
if stdin is None:
return result.output
else:
return None
def log_error(error):
print(error, file=sys.stderr)
class NSSWrappedCertDB(DBMAPHandler):
'''
class NSSWrappedCertDB(DBMAPCommandHandler):
"""
Store that extracts private keys from an NSSDB, wrapped with the
private key of the primary CA.
'''
"""
dbtype = 'NSSDB'
supports_extra_args = True
def __init__(self, config, dbmap, nickname):
if 'path' not in dbmap:
raise ValueError(
'Configuration does not provide NSSDB path')
if 'pwdfile' not in dbmap:
raise ValueError('Configuration does not provide password file')
if 'wrap_nick' not in dbmap:
raise ValueError(
'Configuration does not provide nickname of wrapping key')
self.nssdb_path = dbmap['path']
self.nssdb_pwdfile = dbmap['pwdfile']
self.wrap_nick = dbmap['wrap_nick']
self.target_nick = nickname
OID_DES_EDE3_CBC = '1.2.840.113549.3.7'
def __init__(self, config, dbmap, nickname, *extra_args):
super().__init__(config, dbmap, nickname)
# Extra args is either a single OID specifying desired wrap
# algorithm, or empty. If empty, we must assume that the
# client is an old version that only supports DES-EDE3-CBC.
#
# Using either the client's requested algorithm or the
# default of DES-EDE3-CBC, we pass it along to the handler
# via the --algorithm option. The handler, in turn, passes
# it along to the 'pki ca-authority-key-export' program
# (which is part of Dogtag).
#
if len(extra_args) > 1:
raise InvalidKeyArguments("Too many arguments")
if len(extra_args) == 1:
self.alg = extra_args[0]
else:
self.alg = self.OID_DES_EDE3_CBC
def export_key(self):
tdir = tempfile.mkdtemp(dir=paths.TMP)
try:
wrapped_key_file = os.path.join(tdir, 'wrapped_key')
certificate_file = os.path.join(tdir, 'certificate')
ipautil.run([
paths.PKI, '-d', self.nssdb_path, '-C', self.nssdb_pwdfile,
'ca-authority-key-export',
'--wrap-nickname', self.wrap_nick,
'--target-nickname', self.target_nick,
'-o', wrapped_key_file])
nssdb = NSSDatabase(self.nssdb_path)
nssdb.run_certutil([
'-L', '-n', self.target_nick,
'-a', '-o', certificate_file,
])
with open(wrapped_key_file, 'rb') as f:
wrapped_key = f.read()
with open(certificate_file, 'r') as f:
certificate = f.read()
finally:
shutil.rmtree(tdir)
return json_encode({
'wrapped_key': b64encode(wrapped_key).decode('ascii'),
'certificate': certificate})
return self.run_handler([
'--nickname', self.nickname,
'--algorithm', self.alg,
])
class NSSCertDB(DBMAPHandler):
def __init__(self, config, dbmap, nickname):
if 'type' not in dbmap or dbmap['type'] != 'NSSDB':
raise ValueError('Invalid type "%s",'
' expected "NSSDB"' % (dbmap['type'],))
if 'path' not in dbmap:
raise ValueError('Configuration does not provide NSSDB path')
if 'pwdfile' not in dbmap:
raise ValueError('Configuration does not provide password file')
self.nssdb_path = dbmap['path']
self.nssdb_pwdfile = dbmap['pwdfile']
self.nickname = nickname
class NSSCertDB(DBMAPCommandHandler):
dbtype = 'NSSDB'
def export_key(self):
tdir = tempfile.mkdtemp(dir=paths.TMP)
try:
pk12pwfile = os.path.join(tdir, 'pk12pwfile')
password = ipautil.ipa_generate_password()
with open(pk12pwfile, 'w') as f:
f.write(password)
pk12file = os.path.join(tdir, 'pk12file')
nssdb = NSSDatabase(self.nssdb_path)
nssdb.run_pk12util([
"-o", pk12file,
"-n", self.nickname,
"-k", self.nssdb_pwdfile,
"-w", pk12pwfile,
])
with open(pk12file, 'rb') as f:
data = f.read()
finally:
shutil.rmtree(tdir)
return json_encode({'export password': password,
'pkcs12 data': b64encode(data).decode('ascii')})
return self.run_handler(['--nickname', self.nickname])
def import_key(self, value):
v = json_decode(value)
tdir = tempfile.mkdtemp(dir=paths.TMP)
try:
pk12pwfile = os.path.join(tdir, 'pk12pwfile')
with open(pk12pwfile, 'w') as f:
f.write(v['export password'])
pk12file = os.path.join(tdir, 'pk12file')
with open(pk12file, 'wb') as f:
f.write(b64decode(v['pkcs12 data']))
nssdb = NSSDatabase(self.nssdb_path)
nssdb.run_pk12util([
"-i", pk12file,
"-n", self.nickname,
"-k", self.nssdb_pwdfile,
"-w", pk12pwfile,
])
finally:
shutil.rmtree(tdir)
return self.run_handler(
['--nickname', self.nickname],
stdin=value
)
# Exfiltrate the DM password Hash so it can be set in replica's and this
# way let a replica be install without knowing the DM password and yet
# still keep the DM password synchronized across replicas
class DMLDAP(DBMAPHandler):
class DMLDAP(DBMAPCommandHandler):
dbtype = 'DMLDAP'
def __init__(self, config, dbmap, nickname):
if 'type' not in dbmap or dbmap['type'] != 'DMLDAP':
raise ValueError('Invalid type "%s",'
' expected "DMLDAP"' % (dbmap['type'],))
super().__init__(config, dbmap, nickname)
if nickname != 'DMHash':
raise UnknownKeyName("Unknown Key Named '%s'" % nickname)
self.ldap = iSecLdap(config['ldap_uri'],
config.get('auth_type', None))
def export_key(self):
conn = self.ldap.connect()
r = conn.search_s('cn=config', ldap.SCOPE_BASE,
attrlist=['nsslapd-rootpw'])
if len(r) != 1:
raise RuntimeError('DM Hash not found!')
rootpw = r[0][1]['nsslapd-rootpw'][0]
return json_encode({'dmhash': rootpw.decode('ascii')})
return self.run_handler()
def import_key(self, value):
v = json_decode(value)
rootpw = v['dmhash'].encode('ascii')
conn = self.ldap.connect()
mods = [(ldap.MOD_REPLACE, 'nsslapd-rootpw', rootpw)]
conn.modify_s('cn=config', mods)
self.run_handler(stdin=value)
class PEMFileHandler(DBMAPHandler):
def __init__(self, config, dbmap, nickname=None):
if 'type' not in dbmap or dbmap['type'] != 'PEM':
raise ValueError('Invalid type "{t}", expected PEM'
.format(t=dbmap['type']))
self.certfile = dbmap['certfile']
self.keyfile = dbmap.get('keyfile')
class PEMFileHandler(DBMAPCommandHandler):
dbtype = 'PEM'
def export_key(self):
_fd, tmpfile = tempfile.mkstemp(dir=paths.TMP)
password = ipautil.ipa_generate_password()
args = [
paths.OPENSSL,
"pkcs12", "-export",
"-in", self.certfile,
"-out", tmpfile,
"-password", "pass:{pwd}".format(pwd=password)
]
if self.keyfile is not None:
args.extend(["-inkey", self.keyfile])
try:
ipautil.run(args, nolog=(password, ))
with open(tmpfile, 'rb') as f:
data = f.read()
finally:
os.remove(tmpfile)
return json_encode({'export password': password,
'pkcs12 data': b64encode(data).decode('ascii')})
return self.run_handler()
def import_key(self, value):
v = json_decode(value)
data = b64decode(v['pkcs12 data'])
password = v['export password']
fd, tmpdata = tempfile.mkstemp(dir=paths.TMP)
os.close(fd)
try:
with open(tmpdata, 'wb') as f:
f.write(data)
# get the certificate from the file
ipautil.run([paths.OPENSSL,
"pkcs12",
"-in", tmpdata,
"-clcerts", "-nokeys",
"-out", self.certfile,
"-passin", "pass:{pwd}".format(pwd=password)],
nolog=(password, ))
if self.keyfile is not None:
# get the private key from the file
ipautil.run([paths.OPENSSL,
"pkcs12",
"-in", tmpdata,
"-nocerts", "-nodes",
"-out", self.keyfile,
"-passin", "pass:{pwd}".format(pwd=password)],
nolog=(password, ))
finally:
os.remove(tmpdata)
return self.run_handler(stdin=value)
NAME_DB_MAP = {
'ca': {
'type': 'NSSDB',
'path': paths.PKI_TOMCAT_ALIAS_DIR,
'handler': NSSCertDB,
'pwdfile': paths.PKI_TOMCAT_ALIAS_PWDFILE_TXT,
'command': 'ipa-custodia-pki-tomcat',
'runas': constants.PKI_USER,
},
'ca_wrapped': {
'type': 'NSSDB',
'handler': NSSWrappedCertDB,
'path': paths.PKI_TOMCAT_ALIAS_DIR,
'pwdfile': paths.PKI_TOMCAT_ALIAS_PWDFILE_TXT,
'wrap_nick': 'caSigningCert cert-pki-ca',
'command': 'ipa-custodia-pki-tomcat-wrapped',
'runas': constants.PKI_USER,
},
'ra': {
'type': 'PEM',
'handler': PEMFileHandler,
'certfile': paths.RA_AGENT_PEM,
'keyfile': paths.RA_AGENT_KEY,
'command': 'ipa-custodia-ra-agent',
'runas': None, # import needs root permission to write to directory
},
'dm': {
'type': 'DMLDAP',
'handler': DMLDAP,
'command': 'ipa-custodia-dmldap',
'runas': None, # root
}
}
@@ -265,12 +193,15 @@ class IPASecStore(CSStore):
def _get_handler(self, key):
path = key.split('/', 3)
if len(path) != 3 or path[0] != 'keys':
if len(path) < 3 or path[0] != 'keys':
raise ValueError('Invalid name')
if path[1] not in NAME_DB_MAP:
raise UnknownKeyName("Unknown DB named '%s'" % path[1])
dbmap = NAME_DB_MAP[path[1]]
return dbmap['handler'](self.config, dbmap, path[2])
handler = dbmap['handler']
if len(path) > 3 and not handler.supports_extra_args:
raise InvalidKeyArguments('Handler does not support extra args')
return handler(self.config, dbmap, path[2], *path[3:])
def get(self, key):
try: