# # Copyright (C) 2023 FreeIPA Contributors see COPYING for license # import os.path import pytest import random import re import string import time from ipalib.constants import KRA_TRACKING_REQS from ipapython.ipaldap import realm_to_serverid from ipatests.test_integration.base import IntegrationTest from ipatests.test_integration.test_acme import ( prepare_acme_client, certbot_register, certbot_standalone_cert, get_selinux_status, skip_certbot_tests, skip_mod_md_tests, ) from ipatests.test_integration.test_caless import CALessBase from ipatests.test_integration.test_cert import get_certmonger_fs_id from ipatests.test_integration.test_external_ca import ( install_server_external_ca_step1, install_server_external_ca_step2, check_CA_flag, verify_caentry ) from ipatests.test_integration.test_ipa_cert_fix import ( check_status, needs_resubmit, get_cert_expiry ) from ipatests.test_integration.test_ipahealthcheck import ( run_healthcheck, set_excludes ) from ipatests.pytest_ipa.integration import tasks from ipatests.pytest_ipa.integration.env_config import get_global_config from ipalib import x509 as ipa_x509 from ipaplatform.paths import paths config = get_global_config() hsm_lib_path = '' if config.token_library: hsm_lib_path = config.token_library else: hsm_lib_path = '/usr/lib64/pkcs11/libsofthsm2.so' def get_hsm_token(host): """Helper method to get an hsm token This method creates a softhsm token if the hsm hardware token is not found. """ if host.config.token_name: return (host.config.token_name, host.config.token_password) token_name = ''.join( random.choice(string.ascii_letters) for i in range(10) ) token_passwd = ''.join( random.choice(string.ascii_letters) for i in range(10) ) # remove the token if already exist host.run_command( ['softhsm2-util', '--delete-token', '--token', token_name], raiseonerr=False ) host.run_command( ['runuser', '-u', 'pkiuser', '--', 'softhsm2-util', '--init-token', '--free', '--pin', token_passwd, '--so-pin', token_passwd, '--label', token_name] ) return (token_name, token_passwd) def delete_hsm_token(hosts, token_name): for host in hosts: if host.config.token_name: # assumption: for time being /root/cleantoken.sh is copied # host manually. This should be removed in final iteration. host.run_command(['sh', '/root/cleantoken.sh']) else: host.run_command( ['softhsm2-util', '--delete-token', '--token', token_name], raiseonerr=False ) def find_softhsm_token_files(host, token): if not host.transport.file_exists(paths.PKI_TOMCAT_ALIAS_DIR): return None, [] result = host.run_command([ paths.MODUTIL, '-list', 'libsofthsm2', '-dbdir', paths.PKI_TOMCAT_ALIAS_DIR ]) serial = None state = 'token_name' for line in result.stdout_text.split('\n'): if state == 'token_name' and 'Token Name:' in line.strip(): (_label, tokenname) = line.split(':', 1) if tokenname.strip() == token: state = 'serial' elif state == 'serial' and 'Token Serial Number' in line.strip(): (_label, serial) = line.split(':', 1) serial = serial.strip() serial = "{}-{}".format(serial[0:4], serial[4:]) break if serial is None: raise RuntimeError("can't find softhsm token serial for %s" % token) result = host.run_command( ['ls', '-l', '/var/lib/softhsm/tokens/']) serialdir = None for r in result.stdout_text.split('\n'): if serial in r: dirname = r.split()[-1:][0] serialdir = f'/var/lib/softhsm/tokens/{dirname}' break if serialdir is None: raise RuntimeError("can't find softhsm token directory for %s" % serial) result = host.run_command(['ls', '-1', serialdir]) return serialdir, [ os.path.join(serialdir, file) for file in result.stdout_text.strip().split('\n') ] def copy_token_files(src_host, dest_host, token_name): """Helper method to copy the token files to replica""" # copy the token files to replicas if not src_host.config.token_name: serialdir, token_files = find_softhsm_token_files( src_host, token_name ) if serialdir: for host in dest_host: tasks.copy_files(src_host, host, token_files) host.run_command(['usermod', 'pkiuser', '-a', '-G', 'ods']) host.run_command( ['chown', '-R', 'pkiuser:pkiuser', serialdir] ) def check_version(host): if tasks.get_pki_version(host) < tasks.parse_version('11.5.0'): raise pytest.skip("PKI HSM support is not available") class BaseHSMTest(IntegrationTest): master_with_dns = True master_with_kra = False master_with_ad = False master_extra_args = [] token_password = None token_name = None token_password_file = '/tmp/token_password' random_serial = False @classmethod def install(cls, mh): check_version(cls.master) # Enable pkiuser to read softhsm tokens cls.master.run_command(['usermod', 'pkiuser', '-a', '-G', 'ods']) cls.token_name, cls.token_password = get_hsm_token(cls.master) cls.master.put_file_contents( cls.token_password_file, cls.token_password ) tasks.install_master( cls.master, setup_dns=cls.master_with_dns, setup_kra=cls.master_with_kra, setup_adtrust=cls.master_with_ad, extra_args=( '--token-name', cls.token_name, '--token-library-path', hsm_lib_path, '--token-password', cls.token_password ) ) cls.sync_tokens(cls.master) @classmethod def uninstall(cls, mh): check_version(cls.master) super(BaseHSMTest, cls).uninstall(mh) delete_hsm_token([cls.master] + cls.replicas, cls.token_name) @classmethod def sync_tokens(cls, source, token_name=None): """Synchronize non-networked HSM tokens between machines source: source host for the token data """ if ( hsm_lib_path and 'nfast' in hsm_lib_path ): for host in [cls.master] + cls.replicas: if host == source: continue tasks.copy_nfast_data(source, host) for host in [cls.master] + cls.replicas: if host == source: continue copy_token_files(source, [host], token_name if token_name else cls.token_name) class TestHSMInstall(BaseHSMTest): num_replicas = 3 num_clients = 1 topology = 'star' def test_hsm_install_replica0_ca_less_install(self): check_version(self.master) tasks.install_replica( self.master, self.replicas[0], setup_ca=False, setup_dns=True, ) def test_hsm_install_replica0_ipa_ca_install(self): check_version(self.master) self.sync_tokens(self.master) tasks.install_ca( self.replicas[0], extra_args=('--token-password', self.token_password,), ) def test_hsm_install_replica0_ipa_kra_install(self): check_version(self.master) tasks.install_kra( self.replicas[0], first_instance=True, extra_args=('--token-password', self.token_password,) ) self.sync_tokens(self.replicas[0]) def test_hsm_install_replica0_ipa_dns_install(self): tasks.install_dns(self.replicas[0]) def test_hsm_install_replica1_with_ca_install(self): check_version(self.master) tasks.install_replica( self.master, self.replicas[1], setup_ca=True, extra_args=('--token-password', self.token_password,) ) def test_hsm_install_replica1_ipa_kra_install(self): check_version(self.master) tasks.install_kra( self.replicas[1], extra_args=('--token-password', self.token_password,) ) def test_hsm_install_replica1_ipa_dns_install(self): check_version(self.master) tasks.install_dns(self.replicas[1]) def test_hsm_install_replica2_with_ca_kra_dns_install(self): check_version(self.master) tasks.install_replica( self.master, self.replicas[2], setup_ca=True, setup_kra=True, setup_dns=True, extra_args=('--token-password', self.token_password,) ) def test_hsm_install_master_ipa_kra_install(self): check_version(self.master) tasks.install_kra( self.master, extra_args=('--token-password', self.token_password,) ) def test_hsm_install_client(self): check_version(self.master) tasks.install_client(self.master, self.clients[0]) def test_hsm_install_issue_user_cert(self): check_version(self.master) user = 'testuser1' csr_file = f'{user}.csr' key_file = f'{user}.key' cert_file = f'{user}.crt' tasks.kinit_admin(self.master) tasks.user_add(self.master, user) openssl_cmd = [ 'openssl', 'req', '-newkey', 'rsa:2048', '-keyout', key_file, '-nodes', '-out', csr_file, '-subj', '/CN=' + user] self.master.run_command(openssl_cmd) cmd_args = ['ipa', 'cert-request', '--principal', user, '--certificate-out', cert_file, csr_file] self.master.run_command(cmd_args) def test_hsm_install_healthcheck(self): check_version(self.master) set_excludes(self.master, "key", "DSCLE0004") tasks.install_packages(self.master, ['*ipa-healthcheck']) returncode, output = run_healthcheck( self.master, output_type="human", failures_only=True ) assert returncode == 0 assert output == "No issues found." def test_hsm_install_server_password_file(self): check_version(self.master) # cleanup before fresh install with password file for client in self.clients: tasks.uninstall_client(client) for replica in self.replicas: tasks.uninstall_master(replica) tasks.uninstall_master(self.master) delete_hsm_token([self.master] + self.replicas, self.token_name) self.token_name, self.token_password = get_hsm_token(self.master) self.master.put_file_contents(self.token_password_file, self.token_password) self.replicas[0].put_file_contents(self.token_password_file, self.token_password) tasks.install_master( self.master, setup_dns=self.master_with_dns, setup_kra=self.master_with_kra, setup_adtrust=self.master_with_ad, extra_args=( '--token-name', self.token_name, '--token-library-path', hsm_lib_path, '--token-password-file', self.token_password_file ) ) self.sync_tokens(self.master, token_name=self.token_name) def test_hsm_install_replica0_password_file(self): check_version(self.master) tasks.install_replica( self.master, self.replicas[0], setup_ca=True, extra_args=('--token-password-file', self.token_password_file,) ) def test_hsm_install_replica0_kra_password_file(self): check_version(self.master) tasks.install_kra( self.replicas[0], extra_args=('--token-password-file', self.token_password_file,) ) class TestHSMInstallADTrustBase(BaseHSMTest): """ Base test for builtin AD trust installation in combination with other components with HSM support """ num_replicas = 1 master_with_dns = False master_with_kra = True def test_hsm_adtrust_replica0_all_components(self): check_version(self.master) tasks.install_replica( self.master, self.replicas[0], setup_ca=True, setup_adtrust=False, setup_kra=True, setup_dns=True, nameservers='master' if self.master_with_dns else None, extra_args=('--token-password', self.token_password,) ) class TestADTrustInstallWithDNS_KRA_ADTrust(BaseHSMTest): num_replicas = 1 master_with_dns = True master_with_kra = True master_with_ad = True def test_hsm_adtrust_replica0(self): check_version(self.master) tasks.install_replica( self.master, self.replicas[0], setup_ca=True, setup_kra=True, extra_args=('--token-password', self.token_password,) ) class TestHSMcertRenewal(BaseHSMTest): master_with_kra = True def test_certs_renewal(self): """ Test that the KRA subsystem certificates renew properly """ check_version(self.master) CA_TRACKING_REQS = { 'ocspSigningCert cert-pki-ca': 'caocspSigningCert', 'subsystemCert cert-pki-ca': 'casubsystemCert', 'auditSigningCert cert-pki-ca': 'caauditSigningCert' } CA_TRACKING_REQS.update(KRA_TRACKING_REQS) self.master.put_file_contents(self.token_password_file, self.token_password) for nickname in CA_TRACKING_REQS: cert = tasks.certutil_fetch_cert( self.master, paths.PKI_TOMCAT_ALIAS_DIR, self.token_password_file, nickname, token_name=self.token_name, ) starting_serial = int(cert.serial_number) cmd_arg = [ 'ipa-getcert', 'resubmit', '-v', '-w', '-d', paths.PKI_TOMCAT_ALIAS_DIR, '-n', nickname, ] result = self.master.run_command(cmd_arg) request_id = re.findall(r'\d+', result.stdout_text) status = tasks.wait_for_request(self.master, request_id[0], 120) assert status == "MONITORING" args = ['-L', '-h', self.token_name, '-f', self.token_password_file,] tasks.run_certutil(self.master, args, paths.PKI_TOMCAT_ALIAS_DIR) cert = tasks.certutil_fetch_cert( self.master, paths.PKI_TOMCAT_ALIAS_DIR, self.token_password_file, nickname, token_name=self.token_name, ) assert starting_serial != int(cert.serial_number) class TestHSMCALessToExternalToSelfSignedCA(CALessBase, BaseHSMTest): """Test server caless to external CA to self signed scenario""" num_replicas = 1 @classmethod def install(cls, mh): check_version(cls.master) super(TestHSMCALessToExternalToSelfSignedCA, cls).install(mh) # Enable pkiuser to read softhsm tokens cls.master.run_command(['usermod', 'pkiuser', '-a', '-G', 'ods']) cls.token_name, cls.token_password = get_hsm_token(cls.master) @classmethod def uninstall(cls, mh): check_version(cls.master) super(TestHSMCALessToExternalToSelfSignedCA, cls).uninstall(mh) delete_hsm_token([cls.master] + cls.replicas, cls.token_name) def test_hsm_caless_server(self): """Install CA-less master""" check_version(self.master) self.create_pkcs12('ca1/server') self.prepare_cacert('ca1') master = self.install_server() assert master.returncode == 0 def test_hsm_caless_to_ca_full(self): check_version(self.master) tasks.install_ca( self.master, extra_args=( '--token-name', self.token_name, '--token-library-path', hsm_lib_path, '--token-password', self.token_password ), ) self.sync_tokens(self.master) ca_show = self.master.run_command(['ipa', 'ca-show', 'ipa']) assert 'Subject DN: CN=Certificate Authority,O={}'.format( self.master.domain.realm) in ca_show.stdout_text def test_hsm_caless_selfsigned_to_external_ca_install(self): # Install external CA on master result = self.master.run_command([paths.IPA_CACERT_MANAGE, 'renew', '--external-ca']) assert result.returncode == 0 # Sign CA, transport it to the host and get ipa a root ca paths. root_ca_fname, ipa_ca_fname = tasks.sign_ca_and_transport( self.master, paths.IPA_CA_CSR, ROOT_CA, IPA_CA) # renew CA with externally signed one result = self.master.run_command([paths.IPA_CACERT_MANAGE, 'renew', '--external-cert-file={}'. format(ipa_ca_fname), '--external-cert-file={}'. format(root_ca_fname)]) assert result.returncode == 0 # update IPA certificate databases result = self.master.run_command([paths.IPA_CERTUPDATE]) assert result.returncode == 0 # Check if external CA have "C" flag after the switch result = check_CA_flag(self.master) assert bool(result), ('External CA does not have "C" flag') # Check that ldap entries for the CA have been updated remote_cacrt = self.master.get_file_contents(ipa_ca_fname) cacrt = ipa_x509.load_pem_x509_certificate(remote_cacrt) verify_caentry(self.master, cacrt) def test_hsm_caless_external_to_self_signed_ca(self): check_version(self.master) self.master.run_command([paths.IPA_CACERT_MANAGE, 'renew', '--self-signed']) self.master.run_command([paths.IPA_CERTUPDATE]) def test_hsm_caless_replica0_with_ca_install(self): check_version(self.master) self.sync_tokens(self.master) tasks.install_replica( self.master, self.replicas[0], setup_ca=True, extra_args=('--token-password', self.token_password,) ) IPA_CA = "ipa_ca.crt" ROOT_CA = "root_ca.crt" class TestHSMExternalToSelfSignedCA(BaseHSMTest): """ Test of FreeIPA server installation with external CA then renew it to self-signed """ num_replicas = 1 @classmethod def install(cls, mh): check_version(cls.master) # Enable pkiuser to read softhsm tokens cls.master.run_command(['usermod', 'pkiuser', '-a', '-G', 'ods']) cls.token_name, cls.token_password = get_hsm_token(cls.master) @classmethod def uninstall(cls, mh): check_version(cls.master) super(TestHSMExternalToSelfSignedCA, cls).uninstall(mh) delete_hsm_token([cls.master] + cls.replicas, cls.token_name) def test_hsm_external_ca_install(self): check_version(self.master) # Step 1 of ipa-server-install. result = install_server_external_ca_step1( self.master, extra_args=[ '--external-ca-type=ms-cs', '--token-name', self.token_name, '--token-library-path', hsm_lib_path, '--token-password', self.token_password ] ) assert result.returncode == 0 root_ca_fname, ipa_ca_fname = tasks.sign_ca_and_transport( self.master, paths.ROOT_IPA_CSR, ROOT_CA, IPA_CA ) # Step 2 of ipa-server-install. result = install_server_external_ca_step2( self.master, ipa_ca_fname, root_ca_fname, extra_args=[ '--token-name', self.token_name, '--token-library-path', hsm_lib_path, '--token-password', self.token_password ] ) assert result.returncode == 0 self.sync_tokens(self.master) def test_hsm_external_kra_install(self): check_version(self.master) tasks.install_kra( self.master, first_instance=True, extra_args=('--token-password', self.token_password,) ) self.sync_tokens(self.master) def test_hsm_external_to_self_signed_ca(self): check_version(self.master) self.master.run_command([paths.IPA_CACERT_MANAGE, 'renew', '--self-signed']) self.master.run_command([paths.IPA_CERTUPDATE]) def test_hsm_external_ca_replica0_install(self): check_version(self.master) tasks.install_replica( self.master, self.replicas[0], setup_kra=True, extra_args=('--token-password', self.token_password,) ) @pytest.fixture def expire_cert_critical(): """ Fixture to expire the certs by moving the system date using date -s command and revert it back """ hosts = dict() def _expire_cert_critical(host): hosts['host'] = host # move date to expire certs tasks.move_date(host, 'stop', '+3Years+1day') host.run_command( ['ipactl', 'restart', '--ignore-service-failures'] ) yield _expire_cert_critical host = hosts.pop('host') # Prior to uninstall remove all the cert tracking to prevent # errors from certmonger trying to check the status of certs # that don't matter because we are uninstalling. host.run_command(['systemctl', 'stop', 'certmonger']) # Important: run_command with a str argument is able to # perform shell expansion but run_command with a list of # arguments is not host.run_command('rm -fv ' + paths.CERTMONGER_REQUESTS_DIR + '*') tasks.uninstall_master(host) tasks.move_date(host, 'start', '-3Years-1day') class TestHSMcertFix(BaseHSMTest): master_with_dns = False def test_hsm_renew_expired_cert_on_master(self, expire_cert_critical): check_version(self.master) expire_cert_critical(self.master) # wait for cert expiry check_status(self.master, 8, "CA_UNREACHABLE") self.master.run_command(['ipa-cert-fix', '-v'], stdin_text='yes\n') check_status(self.master, 9, "MONITORING", timeout=1000) # second iteration of ipa-cert-fix result = self.master.run_command( ['ipa-cert-fix', '-v'], stdin_text='yes\n' ) assert "Nothing to do" in result.stdout_text check_status(self.master, 9, "MONITORING") class TestHSMcertFixKRA(BaseHSMTest): master_with_dns = False master_with_kra = True def test_hsm_renew_expired_cert_with_kra(self, expire_cert_critical): check_version(self.master) expire_cert_critical(self.master) # check if all subsystem cert expired check_status(self.master, 11, "CA_UNREACHABLE") self.master.run_command(['ipa-cert-fix', '-v'], stdin_text='yes\n') self.master.run_command(['systemctl', 'restart', 'certmonger']) check_status(self.master, 12, "MONITORING") class TestHSMcertFixReplica(BaseHSMTest): num_replicas = 1 master_with_dns = False @classmethod def install(cls, mh): super(TestHSMcertFixReplica, cls).install(mh) tasks.install_replica( cls.master, cls.replicas[0], setup_ca=True, nameservers='master' if cls.master_with_dns else None, extra_args=('--token-password', cls.token_password,) ) @pytest.fixture def expire_certs(self): # move system date to expire certs for host in self.master, self.replicas[0]: tasks.move_date(host, 'stop', '+3years+1days') host.run_command( ['ipactl', 'restart', '--ignore-service-failures'] ) yield # move date back on replica and master for host in self.replicas[0], self.master: tasks.uninstall_master(host) tasks.move_date(host, 'start', '-3years-1days') def test_hsm_renew_expired_cert_replica(self, expire_certs): check_version(self.master) # wait for cert expiry self.master.run_command(['systemctl', 'restart', 'certmonger']) check_status(self.master, 8, "CA_UNREACHABLE") self.master.run_command(['ipa-cert-fix', '-v'], stdin_text='yes\n') check_status(self.master, 9, "MONITORING") # replica operations # 'Server-Cert cert-pki-ca' cert will be in CA_UNREACHABLE state cmd = self.replicas[0].run_command( ['getcert', 'list', '-d', paths.PKI_TOMCAT_ALIAS_DIR, '-n', 'Server-Cert cert-pki-ca'] ) req_id = get_certmonger_fs_id(cmd.stdout_text) tasks.wait_for_certmonger_status( self.replicas[0], ('CA_UNREACHABLE'), req_id, timeout=600 ) # get initial expiry date to compare later with renewed cert initial_expiry = get_cert_expiry( self.replicas[0], paths.PKI_TOMCAT_ALIAS_DIR, 'Server-Cert cert-pki-ca' ) # check that HTTP,LDAP,PKINIT are renewed and in MONITORING state instance = realm_to_serverid(self.master.domain.realm) dirsrv_cert = paths.ETC_DIRSRV_SLAPD_INSTANCE_TEMPLATE % instance for cert in (paths.KDC_CERT, paths.HTTPD_CERT_FILE): cmd = self.replicas[0].run_command( ['getcert', 'list', '-f', cert] ) req_id = get_certmonger_fs_id(cmd.stdout_text) tasks.wait_for_certmonger_status( self.replicas[0], ('MONITORING'), req_id, timeout=600 ) cmd = self.replicas[0].run_command( ['getcert', 'list', '-d', dirsrv_cert] ) req_id = get_certmonger_fs_id(cmd.stdout_text) tasks.wait_for_certmonger_status( self.replicas[0], ('MONITORING'), req_id, timeout=600 ) # check if replication working fine testuser = 'testuser1' password = 'Secret@123' stdin = (f"{self.master.config.admin_password}\n" f"{self.master.config.admin_password}\n" f"{self.master.config.admin_password}\n") self.master.run_command(['kinit', 'admin'], stdin_text=stdin) tasks.user_add(self.master, testuser, password=password) self.replicas[0].run_command(['kinit', 'admin'], stdin_text=stdin) self.replicas[0].run_command(['ipa', 'user-show', testuser]) # renew shared certificates by resubmitting to certmonger cmd = self.replicas[0].run_command( ['getcert', 'list', '-f', paths.RA_AGENT_PEM] ) req_id = get_certmonger_fs_id(cmd.stdout_text) if needs_resubmit(self.replicas[0], req_id): self.replicas[0].run_command( ['getcert', 'resubmit', '-i', req_id] ) tasks.wait_for_certmonger_status( self.replicas[0], ('MONITORING'), req_id, timeout=600 ) for cert_nick in ('auditSigningCert cert-pki-ca', 'ocspSigningCert cert-pki-ca', 'subsystemCert cert-pki-ca'): cmd = self.replicas[0].run_command( ['getcert', 'list', '-d', paths.PKI_TOMCAT_ALIAS_DIR, '-n', cert_nick] ) req_id = get_certmonger_fs_id(cmd.stdout_text) if needs_resubmit(self.replicas[0], req_id): self.replicas[0].run_command( ['getcert', 'resubmit', '-i', req_id] ) tasks.wait_for_certmonger_status( self.replicas[0], ('MONITORING'), req_id, timeout=600 ) self.replicas[0].run_command( ['ipa-cert-fix', '-v'], stdin_text='yes\n' ) check_status(self.replicas[0], 9, "MONITORING") # Sometimes certmonger takes time to update the cert status # So check in nssdb instead of relying on getcert command renewed_expiry = get_cert_expiry( self.replicas[0], paths.PKI_TOMCAT_ALIAS_DIR, 'Server-Cert cert-pki-ca' ) assert renewed_expiry > initial_expiry class TestHSMNegative(IntegrationTest): master_with_dns = False token_password_file = '/tmp/token_password' @classmethod def install(cls, mh): check_version(cls.master) # Enable pkiuser to read softhsm tokens cls.master.run_command(['usermod', 'pkiuser', '-a', '-G', 'ods']) cls.token_name, cls.token_password = get_hsm_token(cls.master) @classmethod def uninstall(cls, mh): check_version(cls.master) cls.master.run_command( ['softhsm2-util', '--delete-token', '--token', cls.token_name], raiseonerr=False ) def test_hsm_negative_wrong_token_details(self): check_version(self.master) # wrong token name result = tasks.install_master( self.master, raiseonerr=False, extra_args=( '--token-name', 'random_token', '--token-library-path', hsm_lib_path, '--token-password', self.token_password ) ) assert result.returncode != 0 # wrong token password result = tasks.install_master( self.master, raiseonerr=False, extra_args=( '--token-name', self.token_name, '--token-library-path', hsm_lib_path, '--token-password', 'token_passwd' ) ) assert result.returncode != 0 # wrong token lib result = tasks.install_master( self.master, raiseonerr=False, extra_args=( '--token-name', self.token_name, '--token-library-path', '/tmp/non_existing_hsm_lib_path', '--token-password', self.token_password ) ) assert result.returncode != 0 def test_hsm_negative_bad_token_dir_permissions(self): """Create an unreadable softhsm2 token and install should fail. This is most often seen on replicas where the pkiuser is not a member of the ods group. """ check_version(self.master) token_name = 'bad_perms' token_passwd = 'Secret123' self.master.run_command( ['softhsm2-util', '--delete-token', '--token', token_name], raiseonerr=False ) self.master.run_command( ['usermod', 'pkiuser', '-a', '-G', 'ods'] ) self.master.run_command( ['softhsm2-util', '--init-token', '--free', '--pin', token_passwd, '--so-pin', token_passwd, '--label', token_name] ) self.master.run_command( ['gpasswd', '-d', 'pkiuser', 'ods'] ) result = tasks.install_master( self.master, raiseonerr=False, extra_args=( '--token-name', token_name, '--token-library-path', hsm_lib_path, '--token-password', token_passwd ) ) self.master.run_command( ['usermod', 'pkiuser', '-a', '-G', 'ods'] ) self.master.run_command( ['softhsm2-util', '--delete-token', '--token', token_name], raiseonerr=False ) assert result.returncode != 0 assert ( f"Token named '{token_name}' was not found" in result.stderr_text ) def test_hsm_negative_special_char_token_name(self): check_version(self.master) token_name = 'hsm:token' token_passwd = 'Secret123' self.master.run_command( ['softhsm2-util', '--delete-token', '--token', token_name], raiseonerr=False ) self.master.run_command( ['runuser', '-u', 'pkiuser', '--', 'softhsm2-util', '--init-token', '--free', '--pin', token_passwd, '--so-pin', token_passwd, '--label', token_name] ) # special character in token name result = tasks.install_master( self.master, raiseonerr=False, extra_args=( '--token-name', token_name, '--token-library-path', hsm_lib_path, '--token-password', token_passwd ) ) assert result.returncode != 0 def test_hsm_negative_token_password_and_file(self): """Test token-password and token-password-file at same time Test if command fails when --token-password and --token-password-file provided at the same time results into command failure. """ check_version(self.master) self.master.put_file_contents( self.token_password_file, self.token_password ) result = tasks.install_master( self.master, raiseonerr=False, extra_args=( '--token-name', self.token_name, '--token-library-path', hsm_lib_path, '--token-password', self.token_password, '--token-password-file', self.token_password_file ) ) self.master.run_command( ['softhsm2-util', '--delete-token', '--token', self.token_name], raiseonerr=False ) # assert 'error message non existing token lib' in result.stderr_text assert result.returncode != 0 class TestHSMACME(CALessBase): num_clients = 1 @classmethod def install(cls, mh): check_version(cls.master) super(TestHSMACME, cls).install(mh) # install packages before client install in case of IPA DNS problems cls.acme_server = prepare_acme_client(cls.master, cls.clients[0]) # Enable pkiuser to read softhsm tokens cls.master.run_command(['usermod', 'pkiuser', '-a', '-G', 'ods']) cls.token_name, cls.token_password = get_hsm_token(cls.master) tasks.install_master( cls.master, setup_dns=True, extra_args=( '--token-name', cls.token_name, '--token-library-path', hsm_lib_path, '--token-password', cls.token_password ) ) tasks.install_client(cls.master, cls.clients[0]) @classmethod def uninstall(cls, mh): check_version(cls.master) super(TestHSMACME, cls).uninstall(mh) delete_hsm_token([cls.master], cls.token_name) @pytest.mark.skipif(skip_certbot_tests, reason='certbot not available') def test_certbot_certonly_standalone(self): check_version(self.master) # enable ACME on server tasks.kinit_admin(self.master) self.master.run_command(['ipa-acme-manage', 'enable']) # register account to certbot certbot_register(self.clients[0], self.acme_server) # request ACME cert with certbot certbot_standalone_cert(self.clients[0], self.acme_server) @pytest.mark.skipif(skip_mod_md_tests, reason='mod_md not available') def test_mod_md(self): check_version(self.master) if get_selinux_status(self.clients[0]): # mod_md requires its own SELinux policy to grant perms to # maintaining ACME registration and cert state. raise pytest.skip("SELinux is enabled, this will fail") # write config self.clients[0].run_command(['mkdir', '-p', '/etc/httpd/conf.d']) self.clients[0].run_command(['mkdir', '-p', '/etc/httpd/md']) self.clients[0].put_file_contents( '/etc/httpd/conf.d/md.conf', '\n'.join([ f'MDCertificateAuthority {self.acme_server}', 'MDCertificateAgreement accepted', 'MDStoreDir /etc/httpd/md', f'MDomain {self.clients[0].hostname}', '', f' ServerName {self.clients[0].hostname}', ' SSLEngine on', '\n', ]), ) # To check for successful cert issuance means knowing how mod_md # stores certificates, or looking for specific log messages. # If the thing we are inspecting changes, the test will break. # So I prefer a conservative sleep. # self.clients[0].run_command(['systemctl', 'restart', 'httpd']) time.sleep(15) # We expect mod_md has acquired the certificate by now. # Perform a graceful restart to begin using the cert. # (If mod_md ever learns to start using newly acquired # certificates /without/ the second restart, then both # of these sleeps can be replaced by "loop until good".) # self.clients[0].run_command(['systemctl', 'reload', 'httpd']) time.sleep(3) # HTTPS request from server to client (should succeed) self.master.run_command( ['curl', f'https://{self.clients[0].hostname}']) # clean-up self.clients[0].run_command(['rm', '-rf', '/etc/httpd/md']) self.clients[0].run_command(['rm', '-f', '/etc/httpd/conf.d/md.conf']) class TestHSMBackupRestore(BaseHSMTest): def test_hsm_backup_restore(self): check_version(self.master) backup_path = tasks.get_backup_dir(self.master) self.master.run_command(['ipa-server-install', '--uninstall', '-U']) assert not self.master.transport.file_exists( paths.IPA_CUSTODIA_KEYS) assert not self.master.transport.file_exists( paths.IPA_CUSTODIA_CONF) self.master.run_command( ['ipa-restore', backup_path], stdin_text=f'{self.master.config.dirman_password}\nyes' ) @pytest.fixture def issue_and_expire_acme_cert(): """Fixture to expire cert by moving date past expiry of acme cert""" hosts = [] def _issue_and_expire_acme_cert( master, client, acme_server_url, no_of_cert=1 ): hosts.append(master) hosts.append(client) # enable the ACME service on master master.run_command(['ipa-acme-manage', 'enable']) # register the account with certbot certbot_register(client, acme_server_url) # request a standalone acme cert certbot_standalone_cert(client, acme_server_url, no_of_cert) # move system date to expire acme cert for host in hosts: tasks.kdestroy_all(host) tasks.move_date(host, 'stop', '+90days+2hours') # restart ipa services as date moved and wait to get things settle time.sleep(10) master.run_command(['ipactl', 'restart']) time.sleep(10) tasks.get_kdcinfo(master) # Note raiseonerr=False: # the assert is located after kdcinfo retrieval. # run kinit command repeatedly until sssd gets settle # after date change tasks.run_repeatedly( master, "KRB5_TRACE=/dev/stdout kinit admin", stdin_text='{0}\n{0}\n{0}\n'.format( master.config.admin_password ) ) # Retrieve kdc.$REALM after the password change, just in case SSSD # domain status flipped to online during the password change. tasks.get_kdcinfo(master) yield _issue_and_expire_acme_cert # move back date for host in hosts: tasks.move_date(host, 'start', '-90days-2hours') # restart ipa services as date moved and wait to get things settle # if the internal fixture was not called (for instance because the test # was skipped), hosts = [] and hosts[0] would produce an IndexError # exception. if hosts: time.sleep(10) hosts[0].run_command(['ipactl', 'restart']) time.sleep(10) class TestHSMACMEPrune(IntegrationTest): """Validate that ipa-acme-manage configures dogtag for pruning""" num_clients = 1 @classmethod def install(cls, mh): check_version(cls.master) super(TestHSMACMEPrune, cls).install(mh) # install packages before client install in case of IPA DNS problems cls.acme_server = prepare_acme_client(cls.master, cls.clients[0]) # Enable pkiuser to read softhsm tokens cls.master.run_command(['usermod', 'pkiuser', '-a', '-G', 'ods']) cls.token_name, cls.token_password = get_hsm_token(cls.master) tasks.install_master( cls.master, setup_dns=True, random_serial=True, extra_args=( '--token-name', cls.token_name, '--token-library-path', hsm_lib_path, '--token-password', cls.token_password ) ) tasks.install_client(cls.master, cls.clients[0]) @classmethod def uninstall(cls, mh): check_version(cls.master) super(TestHSMACMEPrune, cls).uninstall(mh) delete_hsm_token([cls.master], cls.token_name) def test_hsm_prune_cert_manual(self, issue_and_expire_acme_cert): """Test to prune expired certificate by manual run""" if (tasks.get_pki_version(self.master) < tasks.parse_version('11.3.0')): raise pytest.skip("Certificate pruning is not available") issue_and_expire_acme_cert( self.master, self.clients[0], self.acme_server) # check that the certificate issued for the client result = self.master.run_command( ['ipa', 'cert-find', '--subject', self.clients[0].hostname] ) assert f'CN={self.clients[0].hostname}' in result.stdout_text # We moved time forward 90 days + 2 hours. Configure it to # prune after an hour then run it. self.master.run_command( ['ipa-acme-manage', 'pruning', '--enable', '--certretention=60', '--certretentionunit=minute',] ) self.master.run_command(['ipactl', 'restart']) self.master.run_command(['ipa-acme-manage', 'pruning', '--run']) # wait for cert to get prune time.sleep(50) # check if client cert is removed result = self.master.run_command( ['ipa', 'cert-find', '--subject', self.clients[0].hostname], raiseonerr=False ) assert f'CN={self.clients[0].hostname}' not in result.stdout_text def test_hsm_prune_cert_cron(self, issue_and_expire_acme_cert): """Test to prune expired certificate by cron job""" if (tasks.get_pki_version(self.master) < tasks.parse_version('11.3.0')): raise pytest.skip("Certificate pruning is not available") issue_and_expire_acme_cert( self.master, self.clients[0], self.acme_server) # check that the certificate issued for the client result = self.master.run_command( ['ipa', 'cert-find', '--subject', self.clients[0].hostname] ) assert f'CN={self.clients[0].hostname}' in result.stdout_text # enable pruning self.master.run_command(['ipa-acme-manage', 'pruning', '--enable']) # cron would be set to run the next minute cron_minute = self.master.run_command( [ "python3", "-c", ( "from datetime import datetime, timedelta; " "print(int((datetime.now() + " "timedelta(minutes=5)).strftime('%M')))" ), ] ).stdout_text.strip() self.master.run_command( ['ipa-acme-manage', 'pruning', f'--cron={cron_minute} * * * *'] ) self.master.run_command(['ipactl', 'restart']) # wait for 5 minutes to cron to execute and 20 sec for just in case time.sleep(320) # check if client cert is removed result = self.master.run_command( ['ipa', 'cert-find', '--subject', self.clients[0].hostname], raiseonerr=False ) assert f'CN={self.clients[0].hostname}' not in result.stdout_text class TestHSMVault(BaseHSMTest): """Validate that vault works properly""" num_clients = 1 master_with_kra = True @classmethod def install(cls, mh): super(TestHSMVault, cls).install(mh) tasks.install_client(cls.master, cls.clients[0]) def test_hsm_vault_create_and_retrieve_master(self): vault_name = "testvault" vault_password = "password" vault_data = "SSBsb3ZlIENJIHRlc3RzCg==" # create vault on master tasks.kinit_admin(self.master) self.master.run_command([ "ipa", "vault-add", vault_name, "--password", vault_password, "--type", "symmetric", ]) # archive vault self.master.run_command([ "ipa", "vault-archive", vault_name, "--password", vault_password, "--data", vault_data, ]) # wait after archival time.sleep(45) # retrieve vault on master self.master.run_command([ "ipa", "vault-retrieve", vault_name, "--password", vault_password, ]) # retrieve on client tasks.kinit_admin(self.clients[0]) self.clients[0].run_command([ "ipa", "vault-retrieve", vault_name, "--password", vault_password, ])