Imported Debian patch 4.7.2-3
This commit is contained in:
committed by
Mario Fetka
parent
27edeba051
commit
8bc559c5a1
@@ -21,8 +21,6 @@
|
||||
|
||||
from __future__ import print_function, absolute_import
|
||||
|
||||
from pprint import pformat
|
||||
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
@@ -34,57 +32,13 @@ from pytest_multihost import make_multihost_fixture
|
||||
|
||||
from ipapython import ipautil
|
||||
from ipaplatform.paths import paths
|
||||
from ipatests.test_util import yield_fixture
|
||||
from .config import Config
|
||||
from .env_config import get_global_config
|
||||
from . import tasks
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CLASS_LOGFILES = [
|
||||
# dirsrv logs
|
||||
paths.VAR_LOG_DIRSRV,
|
||||
# IPA install logs
|
||||
paths.IPASERVER_INSTALL_LOG,
|
||||
paths.IPACLIENT_INSTALL_LOG,
|
||||
paths.IPAREPLICA_INSTALL_LOG,
|
||||
paths.IPAREPLICA_CONNCHECK_LOG,
|
||||
paths.IPAREPLICA_CA_INSTALL_LOG,
|
||||
paths.IPASERVER_KRA_INSTALL_LOG,
|
||||
paths.IPA_CUSTODIA_AUDIT_LOG,
|
||||
paths.IPACLIENTSAMBA_INSTALL_LOG,
|
||||
paths.IPACLIENTSAMBA_UNINSTALL_LOG,
|
||||
paths.IPATRUSTENABLEAGENT_LOG,
|
||||
# IPA uninstall logs
|
||||
paths.IPASERVER_UNINSTALL_LOG,
|
||||
paths.IPACLIENT_UNINSTALL_LOG,
|
||||
# IPA upgrade logs
|
||||
paths.IPAUPGRADE_LOG,
|
||||
# IPA backup and restore logs
|
||||
paths.IPARESTORE_LOG,
|
||||
paths.IPABACKUP_LOG,
|
||||
# kerberos related logs
|
||||
paths.KADMIND_LOG,
|
||||
paths.KRB5KDC_LOG,
|
||||
# httpd logs
|
||||
paths.VAR_LOG_HTTPD_DIR,
|
||||
# dogtag logs
|
||||
paths.VAR_LOG_PKI_DIR,
|
||||
# selinux logs
|
||||
paths.VAR_LOG_AUDIT,
|
||||
# sssd
|
||||
paths.VAR_LOG_SSSD_DIR,
|
||||
# system
|
||||
paths.RESOLV_CONF,
|
||||
paths.HOSTS,
|
||||
]
|
||||
|
||||
|
||||
def make_class_logs(host):
|
||||
logs = list(CLASS_LOGFILES)
|
||||
env_filename = os.path.join(host.config.test_dir, 'env.sh')
|
||||
logs.append(env_filename)
|
||||
return logs
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
group = parser.getgroup("IPA integration tests")
|
||||
@@ -96,45 +50,39 @@ def pytest_addoption(parser):
|
||||
|
||||
def _get_logname_from_node(node):
|
||||
name = node.nodeid
|
||||
name = re.sub(r'\(\)/', '', name) # remove ()/
|
||||
name = re.sub(r'[()]', '', name) # and standalone brackets
|
||||
name = re.sub(r'(/|::)', '-', name)
|
||||
name = re.sub('\(\)/', '', name) # remove ()/
|
||||
name = re.sub('[()]', '', name) # and standalone brackets
|
||||
name = re.sub('(/|::)', '-', name)
|
||||
return name
|
||||
|
||||
|
||||
def collect_test_logs(node, logs_dict, test_config, suffix=''):
|
||||
def collect_test_logs(node, logs_dict, test_config):
|
||||
"""Collect logs from a test
|
||||
|
||||
Calls collect_logs and collect_systemd_journal
|
||||
Calls collect_logs
|
||||
|
||||
:param node: The pytest collection node (request.node)
|
||||
:param logs_dict: Mapping of host to list of log filnames to collect
|
||||
:param test_config: Pytest configuration
|
||||
:param suffix: The custom suffix of the name of logfiles' directory
|
||||
"""
|
||||
name = '{node}{suffix}'.format(
|
||||
node=_get_logname_from_node(node),
|
||||
suffix=suffix,
|
||||
)
|
||||
logfile_dir = test_config.getoption('logfile_dir')
|
||||
collect_logs(
|
||||
name=name,
|
||||
name=_get_logname_from_node(node),
|
||||
logs_dict=logs_dict,
|
||||
logfile_dir=logfile_dir,
|
||||
logfile_dir=test_config.getoption('logfile_dir'),
|
||||
beakerlib_plugin=test_config.pluginmanager.getplugin('BeakerLibPlugin'),
|
||||
)
|
||||
|
||||
hosts = logs_dict.keys() # pylint: disable=dict-keys-not-iterating
|
||||
collect_systemd_journal(name, hosts, logfile_dir)
|
||||
|
||||
|
||||
def collect_systemd_journal(name, hosts, logfile_dir=None):
|
||||
def collect_systemd_journal(node, hosts, test_config):
|
||||
"""Collect systemd journal from remote hosts
|
||||
|
||||
:param name: Name under which logs are collected, e.g. name of the test
|
||||
:param node: The pytest collection node (request.node)
|
||||
:param hosts: List of hosts from which to collect journal
|
||||
:param logfile_dir: Directory to log to
|
||||
:param test_config: Pytest configuration
|
||||
"""
|
||||
name = _get_logname_from_node(node)
|
||||
logfile_dir = test_config.getoption('logfile_dir')
|
||||
|
||||
if logfile_dir is None:
|
||||
return
|
||||
|
||||
@@ -184,8 +132,6 @@ def collect_logs(name, logs_dict, logfile_dir=None, beakerlib_plugin=None):
|
||||
|
||||
for host, logs in logs_dict.items():
|
||||
logger.info('Collecting logs from: %s', host.hostname)
|
||||
# make list of unique log filenames
|
||||
logs = list(set(logs))
|
||||
dirname = os.path.join(topdirname, host.hostname)
|
||||
if not os.path.isdir(dirname):
|
||||
os.makedirs(dirname)
|
||||
@@ -235,119 +181,23 @@ def collect_logs(name, logs_dict, logfile_dir=None, beakerlib_plugin=None):
|
||||
shutil.rmtree(topdirname)
|
||||
|
||||
|
||||
class IntegrationLogs:
|
||||
"""Represent logfile collections
|
||||
Collection is a mapping of IPA hosts and a list of logfiles to be
|
||||
collected. There are two types of collections: class and method.
|
||||
The former contains a list of logfiles which will be collected on
|
||||
each test (within class) completion, while the latter contains
|
||||
a list of logfiles which will be collected on only certain test
|
||||
completion (once).
|
||||
"""
|
||||
def __init__(self):
|
||||
self._class_logs = {}
|
||||
self._method_logs = {}
|
||||
|
||||
def set_logs(self, host, logs):
|
||||
self._class_logs[host] = list(logs)
|
||||
|
||||
@property
|
||||
def method_logs(self):
|
||||
return self._method_logs
|
||||
|
||||
@property
|
||||
def class_logs(self):
|
||||
return self._class_logs
|
||||
|
||||
def init_method_logs(self):
|
||||
"""Initilize method logs with the class ones"""
|
||||
self._method_logs = {}
|
||||
for k in self._class_logs:
|
||||
self._method_logs[k] = list(self._class_logs[k])
|
||||
|
||||
def collect_class_log(self, host, filename):
|
||||
"""Add class scope log
|
||||
The file with the given filename will be collected from the
|
||||
host on an each test completion(within a test class).
|
||||
"""
|
||||
logger.info('Adding %s:%s to list of class logs to collect',
|
||||
host.external_hostname, filename)
|
||||
self._class_logs.setdefault(host, []).append(filename)
|
||||
self._method_logs.setdefault(host, []).append(filename)
|
||||
|
||||
def collect_method_log(self, host, filename):
|
||||
"""Add method scope log
|
||||
The file with the given filename will be collected from the
|
||||
host on a test completion.
|
||||
"""
|
||||
logger.info('Adding %s:%s to list of method logs to collect',
|
||||
host.external_hostname, filename)
|
||||
self._method_logs.setdefault(host, []).append(filename)
|
||||
|
||||
|
||||
@pytest.fixture(scope='class')
|
||||
def class_integration_logs(request):
|
||||
"""Internal fixture providing class-level logs_dict
|
||||
For adjusting collection of logs, please, use 'integration_logs'
|
||||
fixture.
|
||||
"""
|
||||
integration_logs = IntegrationLogs()
|
||||
yield integration_logs
|
||||
# since the main fixture of integration tests('mh') depends on
|
||||
# this one the class logs collecting happens *after* the teardown
|
||||
# of that fixture. The 'uninstall' is among the finalizers of 'mh'.
|
||||
# This means that the logs collected here are the IPA *uninstall*
|
||||
# logs.
|
||||
class_logs = integration_logs.class_logs
|
||||
collect_test_logs(request.node, class_logs, request.config,
|
||||
suffix='-uninstall')
|
||||
def class_integration_logs():
|
||||
"""Internal fixture providing class-level logs_dict"""
|
||||
return {}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@yield_fixture
|
||||
def integration_logs(class_integration_logs, request):
|
||||
"""Provides access to test integration logs, and collects after each test
|
||||
To collect a logfile on a test completion one should add the dependency on
|
||||
this fixture and call its 'collect_method_log' method.
|
||||
For example, run TestFoo.
|
||||
```
|
||||
class TestFoo(IntegrationTest):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def test_bar(self, integration_logs):
|
||||
integration_logs.collect_method_log(self.master, '/logfile')
|
||||
```
|
||||
'/logfile' will be collected only for 'test_bar' test.
|
||||
|
||||
To collect a logfile on a test class completion one should add the
|
||||
dependency on this fixture and call its 'collect_class_log' method.
|
||||
For example, run TestFoo.
|
||||
```
|
||||
class TestFoo(IntegrationTest):
|
||||
def test_foo(self, integration_logs):
|
||||
integration_logs.collect_class_log(self.master, '/logfile')
|
||||
|
||||
def test_bar(self):
|
||||
pass
|
||||
```
|
||||
'/logfile' will be collected 3 times:
|
||||
1) on 'test_foo' completion
|
||||
2) on 'test_bar' completion
|
||||
3) on 'TestFoo' completion
|
||||
|
||||
Note, the registration of a collection works at the runtime. This means
|
||||
that if the '/logfile' will be registered in 'test_bar' then
|
||||
it will not be collected on 'test_foo' completion:
|
||||
1) on 'test_bar' completion
|
||||
2) on 'TestFoo' completion
|
||||
"""
|
||||
class_integration_logs.init_method_logs()
|
||||
yield class_integration_logs
|
||||
method_logs = class_integration_logs.method_logs
|
||||
collect_test_logs(request.node, method_logs, request.config)
|
||||
hosts = class_integration_logs.keys()
|
||||
collect_test_logs(request.node, class_integration_logs, request.config)
|
||||
collect_systemd_journal(request.node, hosts, request.config)
|
||||
|
||||
|
||||
@pytest.fixture(scope='class')
|
||||
@yield_fixture(scope='class')
|
||||
def mh(request, class_integration_logs):
|
||||
"""IPA's multihost fixture object
|
||||
"""
|
||||
@@ -368,17 +218,11 @@ def mh(request, class_integration_logs):
|
||||
for _i in range(cls.num_ad_domains):
|
||||
domain_descriptions.append({
|
||||
'type': 'AD',
|
||||
'hosts': {'ad': 1}
|
||||
})
|
||||
for _i in range(cls.num_ad_subdomains):
|
||||
domain_descriptions.append({
|
||||
'type': 'AD_SUBDOMAIN',
|
||||
'hosts': {'ad_subdomain': 1}
|
||||
})
|
||||
for _i in range(cls.num_ad_treedomains):
|
||||
domain_descriptions.append({
|
||||
'type': 'AD_TREEDOMAIN',
|
||||
'hosts': {'ad_treedomain': 1}
|
||||
'hosts': {
|
||||
'ad': 1,
|
||||
'ad_subdomain': cls.num_ad_domains,
|
||||
'ad_treedomain': cls.num_ad_domains,
|
||||
}
|
||||
})
|
||||
|
||||
mh = make_multihost_fixture(
|
||||
@@ -392,53 +236,32 @@ def mh(request, class_integration_logs):
|
||||
[mh.master] = mh.domain.hosts_by_role('master')
|
||||
mh.replicas = mh.domain.hosts_by_role('replica')
|
||||
mh.clients = mh.domain.hosts_by_role('client')
|
||||
ad_domains = mh.config.ad_domains
|
||||
if ad_domains:
|
||||
mh.ads = []
|
||||
for domain in ad_domains:
|
||||
mh.ads.extend(domain.hosts_by_role('ad'))
|
||||
mh.ad_subdomains = []
|
||||
for domain in ad_domains:
|
||||
mh.ad_subdomains.extend(domain.hosts_by_role('ad_subdomain'))
|
||||
mh.ad_treedomains = []
|
||||
for domain in ad_domains:
|
||||
mh.ad_treedomains.extend(domain.hosts_by_role('ad_treedomain'))
|
||||
|
||||
cls.logs_to_collect = class_integration_logs.class_logs
|
||||
cls.logs_to_collect = class_integration_logs
|
||||
|
||||
if logger.isEnabledFor(logging.INFO):
|
||||
logger.info(pformat(mh.config.to_dict()))
|
||||
|
||||
for ipa_host in mh.config.get_all_ipa_hosts():
|
||||
class_integration_logs.set_logs(ipa_host, make_class_logs(ipa_host))
|
||||
def collect_log(host, filename):
|
||||
logger.info('Adding %s:%s to list of logs to collect',
|
||||
host.external_hostname, filename)
|
||||
class_integration_logs.setdefault(host, []).append(filename)
|
||||
|
||||
print(mh.config)
|
||||
for host in mh.config.get_all_hosts():
|
||||
host.add_log_collector(collect_log)
|
||||
logger.info('Preparing host %s', host.hostname)
|
||||
tasks.prepare_host(host)
|
||||
|
||||
add_compat_attrs(cls, mh)
|
||||
setup_class(cls, mh)
|
||||
mh._pytestmh_request.addfinalizer(lambda: teardown_class(cls))
|
||||
|
||||
def fin():
|
||||
del_compat_attrs(cls)
|
||||
mh._pytestmh_request.addfinalizer(fin)
|
||||
yield mh.install()
|
||||
|
||||
try:
|
||||
yield mh.install()
|
||||
finally:
|
||||
# the 'mh' fixture depends on 'class_integration_logs' one,
|
||||
# thus, the class logs collecting happens *after* the teardown
|
||||
# of 'mh' fixture. The 'uninstall' is among the finalizers of 'mh'.
|
||||
# This means that the logs collected here are the IPA *uninstall*
|
||||
# logs and the 'install' ones can be removed during the IPA
|
||||
# uninstall phase. To address this problem(e.g. installation error)
|
||||
# the install logs will be collected into '{nodeid}-install' directory
|
||||
# while the uninstall ones into '{nodeid}-uninstall'.
|
||||
class_logs = class_integration_logs.class_logs
|
||||
collect_test_logs(request.node, class_logs, request.config,
|
||||
suffix='-install')
|
||||
for host in cls.get_all_hosts():
|
||||
host.remove_log_collector(collect_log)
|
||||
|
||||
collect_test_logs(request.node, class_integration_logs, request.config)
|
||||
|
||||
|
||||
def add_compat_attrs(cls, mh):
|
||||
def setup_class(cls, mh):
|
||||
"""Add convenience attributes to the test class
|
||||
|
||||
This is deprecated in favor of the mh fixture.
|
||||
@@ -449,13 +272,9 @@ def add_compat_attrs(cls, mh):
|
||||
cls.replicas = mh.replicas
|
||||
cls.clients = mh.clients
|
||||
cls.ad_domains = mh.config.ad_domains
|
||||
if cls.ad_domains:
|
||||
cls.ads = mh.ads
|
||||
cls.ad_subdomains = mh.ad_subdomains
|
||||
cls.ad_treedomains = mh.ad_treedomains
|
||||
|
||||
|
||||
def del_compat_attrs(cls):
|
||||
def teardown_class(cls):
|
||||
"""Remove convenience attributes from the test class
|
||||
|
||||
This is deprecated in favor of the mh fixture.
|
||||
@@ -464,9 +283,5 @@ def del_compat_attrs(cls):
|
||||
del cls.master
|
||||
del cls.replicas
|
||||
del cls.clients
|
||||
del cls.domain
|
||||
if cls.ad_domains:
|
||||
del cls.ads
|
||||
del cls.ad_subdomains
|
||||
del cls.ad_treedomains
|
||||
del cls.ad_domains
|
||||
del cls.domain
|
||||
|
||||
@@ -43,7 +43,6 @@ class Config(pytest_multihost.config.Config):
|
||||
'dns_forwarder',
|
||||
'domain_level',
|
||||
'log_journal_since',
|
||||
'fips_mode',
|
||||
}
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
@@ -68,7 +67,6 @@ class Config(pytest_multihost.config.Config):
|
||||
self.log_journal_since = kwargs.get('log_journal_since') or '-1h'
|
||||
if self.domain_level is None:
|
||||
self.domain_level = MAX_DOMAIN_LEVEL
|
||||
self.fips_mode = kwargs.get('fips_mode', False)
|
||||
|
||||
def get_domain_class(self):
|
||||
return Domain
|
||||
@@ -80,18 +78,13 @@ class Config(pytest_multihost.config.Config):
|
||||
|
||||
@property
|
||||
def ad_domains(self):
|
||||
return [d for d in self.domains if d.is_ad_type]
|
||||
return [d for d in self.domains if d.type == 'AD']
|
||||
|
||||
def get_all_hosts(self):
|
||||
for domain in self.domains:
|
||||
for host in domain.hosts:
|
||||
yield host
|
||||
|
||||
def get_all_ipa_hosts(self):
|
||||
for ipa_domain in (d for d in self.domains if d.is_ipa_type):
|
||||
for ipa_host in ipa_domain.hosts:
|
||||
yield ipa_host
|
||||
|
||||
def to_dict(self):
|
||||
extra_args = self.extra_init_args - {'dirman_dn'}
|
||||
result = super(Config, self).to_dict(extra_args)
|
||||
@@ -129,18 +122,10 @@ class Domain(pytest_multihost.config.Domain):
|
||||
self.name = str(name)
|
||||
self.hosts = []
|
||||
|
||||
assert self.is_ipa_type or self.is_ad_type
|
||||
assert domain_type in ('IPA', 'AD')
|
||||
self.realm = self.name.upper()
|
||||
self.basedn = DN(*(('dc', p) for p in name.split('.')))
|
||||
|
||||
@property
|
||||
def is_ipa_type(self):
|
||||
return self.type == 'IPA'
|
||||
|
||||
@property
|
||||
def is_ad_type(self):
|
||||
return self.type == 'AD' or self.type.startswith('AD_')
|
||||
|
||||
@property
|
||||
def static_roles(self):
|
||||
# Specific roles for each domain type are hardcoded
|
||||
@@ -148,19 +133,15 @@ class Domain(pytest_multihost.config.Domain):
|
||||
return ('master', 'replica', 'client', 'other')
|
||||
elif self.type == 'AD':
|
||||
return ('ad',)
|
||||
elif self.type == 'AD_SUBDOMAIN':
|
||||
return ('ad_subdomain',)
|
||||
elif self.type == 'AD_TREEDOMAIN':
|
||||
return ('ad_treedomain',)
|
||||
else:
|
||||
raise LookupError(self.type)
|
||||
|
||||
def get_host_class(self, host_dict):
|
||||
from ipatests.pytest_ipa.integration.host import Host, WinHost
|
||||
|
||||
if self.is_ipa_type:
|
||||
if self.type == 'IPA':
|
||||
return Host
|
||||
elif self.is_ad_type:
|
||||
elif self.type == 'AD':
|
||||
return WinHost
|
||||
else:
|
||||
raise LookupError(self.type)
|
||||
|
||||
@@ -20,13 +20,11 @@ import os
|
||||
import os.path
|
||||
import six
|
||||
|
||||
from cryptography import __version__ as cryptography_version
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.x509.oid import NameOID
|
||||
from pkg_resources import parse_version
|
||||
from pyasn1.type import univ, char, namedtype, tag
|
||||
from pyasn1.codec.der import encoder as der_encoder
|
||||
from pyasn1.codec.native import decoder as native_decoder
|
||||
@@ -152,22 +150,13 @@ def profile_ca(builder, ca_nick, ca):
|
||||
critical=False,
|
||||
)
|
||||
else:
|
||||
ski_ext = ca.cert.extensions.get_extension_for_class(
|
||||
x509.SubjectKeyIdentifier
|
||||
ski = ca.cert.extensions.get_extension_for_class(
|
||||
x509.SubjectKeyIdentifier)
|
||||
builder = builder.add_extension(
|
||||
x509.AuthorityKeyIdentifier
|
||||
.from_issuer_subject_key_identifier(ski),
|
||||
critical=False,
|
||||
)
|
||||
auth_keyidentifier = (x509.AuthorityKeyIdentifier
|
||||
.from_issuer_subject_key_identifier)
|
||||
'''
|
||||
cryptography < 2.7 accepts only Extension object.
|
||||
Remove this workaround when all supported platforms update
|
||||
python-cryptography.
|
||||
'''
|
||||
if (parse_version(cryptography_version) >= parse_version('2.7')):
|
||||
extension = auth_keyidentifier(ski_ext.value)
|
||||
else:
|
||||
extension = auth_keyidentifier(ski_ext)
|
||||
|
||||
builder = builder.add_extension(extension, critical=False)
|
||||
return builder
|
||||
|
||||
|
||||
@@ -420,15 +409,6 @@ def gen_server_certs(nick_base, hostname, org, ca=None):
|
||||
]),
|
||||
ca, dns_name=hostname, warp=-2 * YEAR
|
||||
)
|
||||
gen_cert(
|
||||
profile_server, nick_base + u'-not-yet-valid',
|
||||
x509.Name([
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, org),
|
||||
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Future'),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, hostname),
|
||||
]),
|
||||
ca, dns_name=hostname, warp=1 * DAY,
|
||||
)
|
||||
gen_cert(profile_server, nick_base + u'-badusage',
|
||||
x509.Name([
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, org),
|
||||
|
||||
@@ -28,6 +28,8 @@ import os
|
||||
import json
|
||||
import collections
|
||||
|
||||
import six
|
||||
|
||||
from ipapython import ipautil
|
||||
from ipatests.pytest_ipa.integration.config import Config, Domain
|
||||
from ipalib.constants import MAX_DOMAIN_LEVEL
|
||||
@@ -63,8 +65,6 @@ _setting_infos = (
|
||||
_SettingInfo('domain_level', 'DOMAINLVL', MAX_DOMAIN_LEVEL),
|
||||
|
||||
_SettingInfo('log_journal_since', 'LOG_JOURNAL_SINCE', '-1h'),
|
||||
# userspace FIPS mode
|
||||
_SettingInfo('fips_mode', 'IPA_FIPS_MODE', False),
|
||||
)
|
||||
|
||||
|
||||
@@ -134,7 +134,7 @@ def config_from_env(env):
|
||||
kwargs['domains'] = []
|
||||
|
||||
# $IPv6SETUP needs to be 'TRUE' to enable ipv6
|
||||
if isinstance(kwargs['ipv6'], str):
|
||||
if isinstance(kwargs['ipv6'], six.string_types):
|
||||
kwargs['ipv6'] = (kwargs['ipv6'].upper() == 'TRUE')
|
||||
|
||||
config = Config(**kwargs)
|
||||
@@ -143,21 +143,15 @@ def config_from_env(env):
|
||||
|
||||
domain_index = 1
|
||||
while (env.get('MASTER_env%s' % domain_index) or
|
||||
env.get('AD_env%s' % domain_index) or
|
||||
env.get('AD_SUBDOMAIN_env%s' % domain_index) or
|
||||
env.get('AD_TREEDOMAIN_env%s' % domain_index)):
|
||||
env.get('AD_env%s' % domain_index)):
|
||||
|
||||
if env.get('MASTER_env%s' % domain_index):
|
||||
# IPA domain takes precedence to AD domain in case of conflict
|
||||
config.domains.append(domain_from_env(env, config, domain_index,
|
||||
domain_type='IPA'))
|
||||
else:
|
||||
for domain_type in ('AD', 'AD_SUBDOMAIN', 'AD_TREEDOMAIN'):
|
||||
if env.get('%s_env%s' % (domain_type, domain_index)):
|
||||
config.domains.append(
|
||||
domain_from_env(env, config, domain_index,
|
||||
domain_type=domain_type))
|
||||
break
|
||||
config.domains.append(domain_from_env(env, config, domain_index,
|
||||
domain_type='AD'))
|
||||
domain_index += 1
|
||||
|
||||
return config
|
||||
@@ -282,7 +276,7 @@ def domain_from_env(env, config, index, domain_type):
|
||||
if domain_type == 'IPA':
|
||||
master_role = 'MASTER'
|
||||
else:
|
||||
master_role = domain_type
|
||||
master_role = 'AD'
|
||||
|
||||
env_suffix = '_env%s' % index
|
||||
|
||||
|
||||
@@ -1,67 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2019 FreeIPA Contributors see COPYING for license
|
||||
#
|
||||
"""FIPS testing helpers
|
||||
|
||||
Based on userspace FIPS mode by Ondrej Moris.
|
||||
|
||||
Userspace FIPS mode fakes a Kernel in FIPS enforcing mode. User space
|
||||
programs behave like the Kernel was booted in FIPS enforcing mode. Kernel
|
||||
space code still runs in standard mode.
|
||||
"""
|
||||
import os
|
||||
from ipaplatform.paths import paths
|
||||
|
||||
FIPS_OVERLAY_DIR = "/var/tmp/userspace-fips"
|
||||
FIPS_OVERLAY = os.path.join(FIPS_OVERLAY_DIR, "fips_enabled")
|
||||
SYSTEM_FIPS = "/etc/system-fips"
|
||||
|
||||
|
||||
def is_fips_enabled(host):
|
||||
"""Check if host has """
|
||||
result = host.run_command(
|
||||
["cat", paths.PROC_FIPS_ENABLED], raiseonerr=False
|
||||
)
|
||||
if result.returncode == 1:
|
||||
# FIPS mode not available
|
||||
return None
|
||||
elif result.returncode == 0:
|
||||
return result.stdout_text.strip() == "1"
|
||||
else:
|
||||
raise RuntimeError(result.stderr_text)
|
||||
|
||||
|
||||
def enable_userspace_fips(host):
|
||||
# create /etc/system-fips
|
||||
host.put_file_contents(SYSTEM_FIPS, "# userspace fips\n")
|
||||
# fake Kernel FIPS mode with bind mount
|
||||
host.run_command(["mkdir", "-p", FIPS_OVERLAY_DIR])
|
||||
host.put_file_contents(FIPS_OVERLAY, "1\n")
|
||||
host.run_command(
|
||||
["mount", "--bind", FIPS_OVERLAY, paths.PROC_FIPS_ENABLED]
|
||||
)
|
||||
# set crypto policy to FIPS mode
|
||||
host.run_command(["update-crypto-policies", "--show"])
|
||||
host.run_command(["update-crypto-policies", "--set", "FIPS"])
|
||||
# sanity check
|
||||
assert is_fips_enabled(host)
|
||||
result = host.run_command(
|
||||
["openssl", "md5", "/dev/null"], raiseonerr=False
|
||||
)
|
||||
assert result.returncode == 1
|
||||
assert "EVP_DigestInit_ex:disabled for FIPS" in result.stderr_text
|
||||
|
||||
|
||||
def disable_userspace_fips(host):
|
||||
host.run_command(["rm", "-f", SYSTEM_FIPS])
|
||||
host.run_command(["update-crypto-policies", "--set", "DEFAULT"])
|
||||
result = host.run_command(
|
||||
["umount", paths.PROC_FIPS_ENABLED], raiseonerr=False
|
||||
)
|
||||
host.run_command(["rm", "-rf", FIPS_OVERLAY_DIR])
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(result.stderr_text)
|
||||
|
||||
# sanity check
|
||||
assert not is_fips_enabled(host)
|
||||
host.run_command(["openssl", "md5", "/dev/null"])
|
||||
@@ -1,277 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2018 FreeIPA Contributors. See COPYING for license
|
||||
#
|
||||
|
||||
"""Firewall class for integration testing using firewalld"""
|
||||
|
||||
import abc
|
||||
|
||||
from ipapython import ipautil
|
||||
|
||||
|
||||
class FirewallBase(abc.ABC):
|
||||
def __init__(self, host):
|
||||
"""Initialize with host where firewall changes should be applied"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def run(self):
|
||||
"""Enable and start firewall service"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def enable_service(self, service):
|
||||
"""Enable firewall rules for service"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def disable_service(self, service):
|
||||
"""Disable firewall rules for service"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def enable_services(self, services):
|
||||
"""Enable firewall rules for list of services"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def disable_services(self, services):
|
||||
"""Disable firewall rules for list of services"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def passthrough_rule(self, rule, ipv=None):
|
||||
"""Generic method to get direct passthrough rules to
|
||||
rule is an ip[6]tables rule without using the ip[6]tables command.
|
||||
The rule will per default be added to the IPv4 and IPv6 firewall.
|
||||
If there are IP version specific parts in the rule, please make sure
|
||||
that ipv is adapted properly.
|
||||
The rule is added to the direct sub chain of the chain that is used
|
||||
in the rule"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def add_passthrough_rules(self, rules, ipv=None):
|
||||
"""Add passthough rules to the end of the chain
|
||||
rules is a list of ip[6]tables rules, where the first entry of each
|
||||
rule is the chain. No --append/-A, --delete/-D should be added before
|
||||
the chain name, beacuse these are added by the method.
|
||||
If there are IP version specific parts in the rule, please make sure
|
||||
that ipv is adapted properly.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def prepend_passthrough_rules(self, rules, ipv=None):
|
||||
"""Insert passthough rules starting at position 1 as a block
|
||||
rules is a list of ip[6]tables rules, where the first entry of each
|
||||
rule is the chain. No --append/-A, --delete/-D should be added before
|
||||
the chain name, beacuse these are added by the method.
|
||||
If there are IP version specific parts in the rule, please make sure
|
||||
that ipv is adapted properly.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def remove_passthrough_rules(self, rules, ipv=None):
|
||||
"""Remove passthrough rules
|
||||
rules is a list of ip[6]tables rules, where the first entry of each
|
||||
rule is the chain. No --append/-A, --delete/-D should be added before
|
||||
the chain name, beacuse these are added by the method.
|
||||
If there are IP version specific parts in the rule, please make sure
|
||||
that ipv is adapted properly.
|
||||
"""
|
||||
|
||||
|
||||
class NoOpFirewall(FirewallBase):
|
||||
"""
|
||||
no-op firewall is intended for platforms which haven't high level firewall
|
||||
backend.
|
||||
"""
|
||||
def run(self):
|
||||
pass
|
||||
|
||||
def enable_service(self, service):
|
||||
pass
|
||||
|
||||
def disable_service(self, service):
|
||||
pass
|
||||
|
||||
def enable_services(self, services):
|
||||
pass
|
||||
|
||||
def disable_services(self, services):
|
||||
pass
|
||||
|
||||
def passthrough_rule(self, rule, ipv=None):
|
||||
pass
|
||||
|
||||
def add_passthrough_rules(self, rules, ipv=None):
|
||||
pass
|
||||
|
||||
def prepend_passthrough_rules(self, rules, ipv=None):
|
||||
pass
|
||||
|
||||
def remove_passthrough_rules(self, rules, ipv=None):
|
||||
pass
|
||||
|
||||
|
||||
class FirewallD(FirewallBase):
|
||||
def __init__(self, host):
|
||||
"""Initialize with host where firewall changes should be applied"""
|
||||
self.host = host
|
||||
|
||||
def run(self):
|
||||
# Unmask firewalld service
|
||||
self.host.run_command(["systemctl", "unmask", "firewalld"])
|
||||
# Enable firewalld service
|
||||
self.host.run_command(["systemctl", "enable", "firewalld"])
|
||||
# Start firewalld service
|
||||
self.host.run_command(["systemctl", "start", "firewalld"])
|
||||
|
||||
def _rp_action(self, args):
|
||||
"""Run-time and permanant firewall action"""
|
||||
cmd = ["firewall-cmd"]
|
||||
cmd.extend(args)
|
||||
|
||||
# Run-time part
|
||||
result = self.host.run_command(cmd, raiseonerr=False)
|
||||
if result.returncode not in [0, 11, 12]:
|
||||
# Ignore firewalld error codes:
|
||||
# 11 is ALREADY_ENABLED
|
||||
# 12 is NOT_ENABLED
|
||||
raise ipautil.CalledProcessError(result.returncode, cmd,
|
||||
result.stdout_text,
|
||||
result.stderr_text)
|
||||
|
||||
# Permanent part
|
||||
result = self.host.run_command(cmd + ["--permanent"],
|
||||
raiseonerr=False)
|
||||
if result.returncode not in [0, 11, 12]:
|
||||
# Ignore firewalld error codes:
|
||||
# 11 is ALREADY_ENABLED
|
||||
# 12 is NOT_ENABLED
|
||||
raise ipautil.CalledProcessError(result.returncode, cmd,
|
||||
result.stdout_text,
|
||||
result.stderr_text)
|
||||
|
||||
def enable_service(self, service):
|
||||
"""Enable firewall service in firewalld runtime and permanent
|
||||
environment"""
|
||||
self._rp_action(["--add-service", service])
|
||||
|
||||
def disable_service(self, service):
|
||||
"""Disable firewall service in firewalld runtime and permanent
|
||||
environment"""
|
||||
self._rp_action(["--remove-service", service])
|
||||
|
||||
def enable_services(self, services):
|
||||
"""Enable list of firewall services in firewalld runtime and
|
||||
permanent environment"""
|
||||
args = []
|
||||
for service in services:
|
||||
args.extend(["--add-service", service])
|
||||
self._rp_action(args)
|
||||
|
||||
def disable_services(self, services):
|
||||
"""Disable list of firewall services in firewalld runtime and
|
||||
permanent environment"""
|
||||
args = []
|
||||
for service in services:
|
||||
args.extend(["--remove-service", service])
|
||||
self._rp_action(args)
|
||||
|
||||
def passthrough_rule(self, rule, ipv=None):
|
||||
"""Generic method to get direct passthrough rules to firewalld
|
||||
rule is an ip[6]tables rule without using the ip[6]tables command.
|
||||
The rule will per default be added to the IPv4 and IPv6 firewall.
|
||||
If there are IP version specific parts in the rule, please make sure
|
||||
that ipv is adapted properly.
|
||||
The rule is added to the direct sub chain of the chain that is used
|
||||
in the rule"""
|
||||
if ipv is None:
|
||||
ipvs = ["ipv4", "ipv6"]
|
||||
else:
|
||||
ipvs = [ipv]
|
||||
for _ipv in ipvs:
|
||||
args = ["firewall-cmd", "--direct", "--passthrough", _ipv] + rule
|
||||
self.host.run_command(args)
|
||||
|
||||
def add_passthrough_rules(self, rules, ipv=None):
|
||||
"""Add passthough rules to the end of the chain
|
||||
rules is a list of ip[6]tables rules, where the first entry of each
|
||||
rule is the chain. No --append/-A, --delete/-D should be added before
|
||||
the chain name, beacuse these are added by the method.
|
||||
If there are IP version specific parts in the rule, please make sure
|
||||
that ipv is adapted properly.
|
||||
"""
|
||||
for rule in rules:
|
||||
self.passthrough_rule(["-A"] + rule, ipv)
|
||||
|
||||
def prepend_passthrough_rules(self, rules, ipv=None):
|
||||
"""Insert passthough rules starting at position 1 as a block
|
||||
rules is a list of ip[6]tables rules, where the first entry of each
|
||||
rule is the chain. No --append/-A, --delete/-D should be added before
|
||||
the chain name, beacuse these are added by the method.
|
||||
If there are IP version specific parts in the rule, please make sure
|
||||
that ipv is adapted properly.
|
||||
"""
|
||||
# first rule number in iptables is 1
|
||||
for i, rule in enumerate(rules, start=1):
|
||||
self.passthrough_rule(["-I", rule[0], str(i)] + rule[1:], ipv)
|
||||
|
||||
def remove_passthrough_rules(self, rules, ipv=None):
|
||||
"""Remove passthrough rules
|
||||
rules is a list of ip[6]tables rules, where the first entry of each
|
||||
rule is the chain. No --append/-A, --delete/-D should be added before
|
||||
the chain name, beacuse these are added by the method.
|
||||
If there are IP version specific parts in the rule, please make sure
|
||||
that ipv is adapted properly.
|
||||
"""
|
||||
for rule in rules:
|
||||
self.passthrough_rule(["-D"] + rule, ipv)
|
||||
|
||||
|
||||
class Firewall(FirewallBase):
|
||||
"""
|
||||
Depending on the ipaplatform proxy firewall tasks to the actual backend.
|
||||
Current supported backends: firewalld and no-op firewall.
|
||||
"""
|
||||
def __init__(self, host):
|
||||
"""Initialize with host where firewall changes should be applied"""
|
||||
# break circular dependency
|
||||
from .tasks import get_platform
|
||||
|
||||
self.host = host
|
||||
platform = get_platform(host)
|
||||
|
||||
firewalls = {
|
||||
'rhel': FirewallD,
|
||||
'fedora': FirewallD,
|
||||
'debian': FirewallD,
|
||||
'ubuntu': FirewallD,
|
||||
'altlinux': NoOpFirewall,
|
||||
}
|
||||
if platform not in firewalls:
|
||||
raise ValueError(
|
||||
"Platform {} doesn't support Firewall".format(platform))
|
||||
self.firewall = firewalls[platform](self.host)
|
||||
self.run()
|
||||
|
||||
def run(self):
|
||||
self.firewall.run()
|
||||
|
||||
def enable_service(self, service):
|
||||
self.firewall.enable_service(service)
|
||||
|
||||
def disable_service(self, service):
|
||||
self.firewall.disable_service(service)
|
||||
|
||||
def enable_services(self, services):
|
||||
self.firewall.enable_services(services)
|
||||
|
||||
def disable_services(self, services):
|
||||
self.firewall.disable_services(services)
|
||||
|
||||
def passthrough_rule(self, rule, ipv=None):
|
||||
self.firewall.passthrough_rule(rule, ipv)
|
||||
|
||||
def add_passthrough_rules(self, rules, ipv=None):
|
||||
self.firewall.add_passthrough_rules(rules, ipv)
|
||||
|
||||
def prepend_passthrough_rules(self, rules, ipv=None):
|
||||
self.firewall.prepend_passthrough_rules(rules, ipv)
|
||||
|
||||
def remove_passthrough_rules(self, rules, ipv=None):
|
||||
self.firewall.remove_passthrough_rules(rules, ipv)
|
||||
@@ -18,111 +18,15 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
"""Host class for integration testing"""
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
import ldap
|
||||
import pytest_multihost.host
|
||||
|
||||
from ipaplatform.paths import paths
|
||||
from ipapython import ipaldap
|
||||
|
||||
from .fips import (
|
||||
is_fips_enabled, enable_userspace_fips, disable_userspace_fips
|
||||
)
|
||||
from .transport import IPAOpenSSHTransport
|
||||
|
||||
FIPS_NOISE_RE = re.compile(br"FIPS mode initialized\r?\n?")
|
||||
|
||||
|
||||
class LDAPClientWithoutCertCheck(ipaldap.LDAPClient):
|
||||
"""Adds an option to disable certificate check for TLS connection
|
||||
|
||||
To disable certificate validity check create client with added option
|
||||
no_certificate_check:
|
||||
client = LDAPClientWithoutCertCheck(..., no_certificate_check=True)
|
||||
"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._no_certificate_check = kwargs.pop(
|
||||
'no_certificate_check', False)
|
||||
super(LDAPClientWithoutCertCheck, self).__init__(*args, **kwargs)
|
||||
|
||||
def _connect(self):
|
||||
if (self._start_tls and self.protocol == 'ldap' and
|
||||
self._no_certificate_check):
|
||||
with self.error_handler():
|
||||
conn = ipaldap.ldap_initialize(
|
||||
self.ldap_uri, cacertfile=self._cacert)
|
||||
conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
|
||||
ldap.OPT_X_TLS_NEVER)
|
||||
conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
|
||||
conn.start_tls_s()
|
||||
return conn
|
||||
else:
|
||||
return super(LDAPClientWithoutCertCheck, self)._connect()
|
||||
|
||||
|
||||
class Host(pytest_multihost.host.Host):
|
||||
"""Representation of a remote IPA host"""
|
||||
|
||||
transport_class = IPAOpenSSHTransport
|
||||
|
||||
def __init__(self, domain, hostname, role, ip=None,
|
||||
external_hostname=None, username=None, password=None,
|
||||
test_dir=None, host_type=None):
|
||||
super().__init__(
|
||||
domain, hostname, role, ip=ip,
|
||||
external_hostname=external_hostname, username=username,
|
||||
password=password, test_dir=test_dir, host_type=host_type
|
||||
)
|
||||
self._fips_mode = None
|
||||
self._userspace_fips = False
|
||||
|
||||
@property
|
||||
def is_fips_mode(self):
|
||||
"""Check and cache if a system is in FIPS mode
|
||||
"""
|
||||
if self._fips_mode is None:
|
||||
self._fips_mode = is_fips_enabled(self)
|
||||
return self._fips_mode
|
||||
|
||||
@property
|
||||
def is_userspace_fips(self):
|
||||
"""Check if host uses fake userspace FIPS
|
||||
"""
|
||||
return self._userspace_fips
|
||||
|
||||
def enable_userspace_fips(self):
|
||||
"""Enable fake userspace FIPS mode
|
||||
|
||||
The call has no effect if the system is already in FIPS mode.
|
||||
|
||||
:return: True if system was modified, else None
|
||||
"""
|
||||
if not self.is_fips_mode:
|
||||
enable_userspace_fips(self)
|
||||
self._fips_mode = True
|
||||
self._userspace_fips = True
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def disable_userspace_fips(self):
|
||||
"""Disable fake userspace FIPS mode
|
||||
|
||||
The call has no effect if userspace FIPS mode is not enabled.
|
||||
|
||||
:return: True if system was modified, else None
|
||||
"""
|
||||
if self.is_userspace_fips:
|
||||
disable_userspace_fips(self)
|
||||
self._userspace_fips = False
|
||||
self._fips_mode = False
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _make_host(domain, hostname, role, ip, external_hostname):
|
||||
# We need to determine the type of the host, this depends on the domain
|
||||
@@ -134,37 +38,18 @@ class Host(pytest_multihost.host.Host):
|
||||
else:
|
||||
cls = Host
|
||||
|
||||
return cls(
|
||||
domain,
|
||||
hostname,
|
||||
role,
|
||||
ip=ip,
|
||||
external_hostname=external_hostname
|
||||
)
|
||||
return cls(domain, hostname, role, ip, external_hostname)
|
||||
|
||||
def ldap_connect(self):
|
||||
"""Return an LDAPClient authenticated to this host as directory manager
|
||||
"""
|
||||
self.log.info('Connecting to LDAP at %s', self.external_hostname)
|
||||
# get IPA CA cert to establish a secure connection
|
||||
cacert = self.get_file_contents(paths.IPA_CA_CRT)
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
f.write(cacert)
|
||||
f.flush()
|
||||
|
||||
hostnames_mismatch = self.hostname != self.external_hostname
|
||||
conn = LDAPClientWithoutCertCheck.from_hostname_secure(
|
||||
self.external_hostname,
|
||||
cacert=f.name,
|
||||
no_certificate_check=hostnames_mismatch)
|
||||
binddn = self.config.dirman_dn
|
||||
self.log.info('LDAP bind as %s', binddn)
|
||||
conn.simple_bind(binddn, self.config.dirman_password)
|
||||
|
||||
# The CA cert file has been loaded into the SSL_CTX and is no
|
||||
# longer required.
|
||||
|
||||
return conn
|
||||
ldap_uri = ipaldap.get_ldap_uri(self.external_hostname)
|
||||
ldap = ipaldap.LDAPClient(ldap_uri)
|
||||
binddn = self.config.dirman_dn
|
||||
self.log.info('LDAP bind as %s' % binddn)
|
||||
ldap.simple_bind(binddn, self.config.dirman_password)
|
||||
return ldap
|
||||
|
||||
@classmethod
|
||||
def from_env(cls, env, domain, hostname, role, index, domain_index):
|
||||
@@ -175,35 +60,6 @@ class Host(pytest_multihost.host.Host):
|
||||
from ipatests.pytest_ipa.integration.env_config import host_to_env
|
||||
return host_to_env(self, **kwargs)
|
||||
|
||||
def run_command(self, argv, set_env=True, stdin_text=None,
|
||||
log_stdout=True, raiseonerr=True,
|
||||
cwd=None, bg=False, encoding='utf-8', ok_returncode=0):
|
||||
"""Wrapper around run_command to log stderr on raiseonerr=True
|
||||
|
||||
:param ok_returncode: return code considered to be correct,
|
||||
you can pass an integer or sequence of integers
|
||||
"""
|
||||
result = super().run_command(
|
||||
argv, set_env=set_env, stdin_text=stdin_text,
|
||||
log_stdout=log_stdout, raiseonerr=False, cwd=cwd, bg=bg,
|
||||
encoding=encoding
|
||||
)
|
||||
# in FIPS mode SSH may print noise to stderr, remove the string
|
||||
# "FIPS mode initialized" + optional newline.
|
||||
result.stderr_bytes = FIPS_NOISE_RE.sub(b'', result.stderr_bytes)
|
||||
try:
|
||||
result_ok = result.returncode in ok_returncode
|
||||
except TypeError:
|
||||
result_ok = result.returncode == ok_returncode
|
||||
if not result_ok and raiseonerr:
|
||||
result.log.error('stderr: %s', result.stderr_text)
|
||||
raise subprocess.CalledProcessError(
|
||||
result.returncode, argv,
|
||||
result.stdout_text, result.stderr_text
|
||||
)
|
||||
else:
|
||||
return result
|
||||
|
||||
|
||||
class WinHost(pytest_multihost.host.WinHost):
|
||||
"""
|
||||
@@ -212,4 +68,3 @@ class WinHost(pytest_multihost.host.WinHost):
|
||||
This serves as a sketch class once we move from manual preparation of
|
||||
Active Directory to the automated setup.
|
||||
"""
|
||||
transport_class = IPAOpenSSHTransport
|
||||
|
||||
1317
ipatests/pytest_ipa/integration/tasks.py
Executable file → Normal file
1317
ipatests/pytest_ipa/integration/tasks.py
Executable file → Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,48 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2020 FreeIPA Contributors see COPYING for license
|
||||
#
|
||||
"""Enhanced SSH transport for pytest multihost
|
||||
|
||||
Provides SSH password login for OpenSSH transport
|
||||
"""
|
||||
import os
|
||||
|
||||
from pytest_multihost.transport import OpenSSHTransport
|
||||
|
||||
|
||||
class IPAOpenSSHTransport(OpenSSHTransport):
|
||||
def _get_ssh_argv(self):
|
||||
"""Return the path to SSH and options needed for every call"""
|
||||
control_file = os.path.join(self.control_dir.path, "control")
|
||||
known_hosts_file = os.path.join(self.control_dir.path, "known_hosts")
|
||||
|
||||
argv = [
|
||||
"ssh",
|
||||
"-l",
|
||||
self.host.ssh_username,
|
||||
"-o",
|
||||
"ControlPath=%s" % control_file,
|
||||
"-o",
|
||||
"StrictHostKeyChecking=no",
|
||||
"-o",
|
||||
"UserKnownHostsFile=%s" % known_hosts_file,
|
||||
]
|
||||
|
||||
if self.host.ssh_key_filename:
|
||||
key_filename = os.path.expanduser(self.host.ssh_key_filename)
|
||||
argv.extend(["-i", key_filename])
|
||||
elif self.host.ssh_password:
|
||||
password_file = os.path.join(self.control_dir.path, "password")
|
||||
with open(password_file, "w") as f:
|
||||
os.fchmod(f.fileno(), 0o600)
|
||||
f.write(self.host.ssh_password)
|
||||
f.write("\n")
|
||||
argv = ["sshpass", f"-f{password_file}"] + argv
|
||||
else:
|
||||
self.log.critical("No SSH credentials configured")
|
||||
raise RuntimeError("No SSH credentials configured")
|
||||
|
||||
argv.append(self.host.external_hostname)
|
||||
self.log.debug("SSH invocation: %s", argv)
|
||||
|
||||
return argv
|
||||
Reference in New Issue
Block a user