Imported Upstream version 4.6.2

This commit is contained in:
Mario Fetka
2021-07-25 07:32:41 +02:00
commit 8ff3be4216
1788 changed files with 1900965 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
import ipatests.util
ipatests.util.check_ipaclient_unittests()

View File

@@ -0,0 +1,70 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Base class for all cmdline tests
"""
import nose
import distutils.spawn
import os
from ipalib import api
from ipalib import errors
from ipaplatform.paths import paths
from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test
from ipaserver.plugins.ldap2 import ldap2
# See if our LDAP server is up and we can talk to it over GSSAPI
try:
conn = ldap2(api)
conn.connect()
conn.disconnect()
server_available = True
except errors.DatabaseError:
server_available = False
except Exception as e:
server_available = False
class cmdline_test(XMLRPC_test):
"""
Base class for all command-line tests
"""
# some reasonable default command
command = paths.LS
@classmethod
def setup_class(cls):
# Find the executable in $PATH
# This is neded because ipautil.run resets the PATH to
# a system default.
original_command = cls.command
if not os.path.isabs(cls.command):
cls.command = distutils.spawn.find_executable(cls.command)
# raise an error if the command is missing even if the remote
# server is not available.
if not cls.command:
raise AssertionError(
'Command %r not available' % original_command
)
super(cmdline_test, cls).setup_class()
if not server_available:
raise nose.SkipTest(
'Server not available: %r' % api.env.xmlrpc_uri
)

View File

@@ -0,0 +1,332 @@
import contextlib
import os
import shlex
import subprocess
import sys
import nose
import six
from six import StringIO
from ipatests import util
from ipalib import api, errors
import pytest
if six.PY3:
unicode = str
TEST_ZONE = u'zoneadd.%(domain)s' % api.env
HERE = os.path.abspath(os.path.dirname(__file__))
BASE_DIR = os.path.abspath(os.path.join(HERE, os.pardir, os.pardir))
@pytest.mark.tier0
@pytest.mark.needs_ipaapi
class TestCLIParsing(object):
"""Tests that commandlines are correctly parsed to Command keyword args
"""
def check_command(self, commandline, expected_command_name, **kw_expected):
argv = shlex.split(commandline)
executioner = api.Backend.cli
cmd = executioner.get_command(argv)
kw_got = executioner.parse(cmd, argv[1:])
kw_got = executioner.process_keyword_arguments(cmd, kw_got)
util.assert_deepequal(expected_command_name, cmd.name, 'Command name')
util.assert_deepequal(kw_expected, kw_got)
def run_command(self, command_name, **kw):
"""Run a command on the server"""
if not api.Backend.rpcclient.isconnected():
api.Backend.rpcclient.connect()
try:
api.Command[command_name](**kw)
except errors.NetworkError:
raise nose.SkipTest('%r: Server not available: %r' %
(self.__module__, api.env.xmlrpc_uri))
@contextlib.contextmanager
def fake_stdin(self, string_in):
"""Context manager that temporarily replaces stdin to read a string"""
old_stdin = sys.stdin
sys.stdin = StringIO(string_in)
yield
sys.stdin = old_stdin
def test_ping(self):
self.check_command('ping', 'ping')
def test_plugins(self):
self.check_command('plugins', 'plugins')
def test_user_show(self):
self.check_command('user-show admin', 'user_show', uid=u'admin')
def test_user_show_underscore(self):
self.check_command('user_show admin', 'user_show', uid=u'admin')
def test_group_add(self):
self.check_command(
'group-add tgroup1 --desc="Test group"',
'group_add',
cn=u'tgroup1',
description=u'Test group',
)
def test_sudocmdgroup_add_member(self):
# Test CSV splitting is not done
self.check_command(
# The following is as it would appear on the command line:
r'sudocmdgroup-add-member tcmdgroup1 --sudocmds=ab,c --sudocmds=d',
'sudocmdgroup_add_member',
cn=u'tcmdgroup1',
sudocmd=[u'ab,c', u'd'],
)
def test_group_add_nonposix(self):
self.check_command(
'group-add tgroup1 --desc="Test group" --nonposix',
'group_add',
cn=u'tgroup1',
description=u'Test group',
nonposix=True,
)
def test_group_add_gid(self):
self.check_command(
'group-add tgroup1 --desc="Test group" --gid=1234',
'group_add',
cn=u'tgroup1',
description=u'Test group',
gidnumber=u'1234',
)
def test_group_add_interactive(self):
with self.fake_stdin('Test group\n'):
self.check_command(
'group-add tgroup1', 'group_add',
cn=u'tgroup1',
)
def test_dnsrecord_add(self):
self.check_command(
'dnsrecord-add %s ns --a-rec=1.2.3.4' % TEST_ZONE,
'dnsrecord_add',
dnszoneidnsname=TEST_ZONE,
idnsname=u'ns',
arecord=u'1.2.3.4',
)
def test_dnsrecord_del_all(self):
try:
self.run_command('dnszone_add', idnsname=TEST_ZONE)
except errors.NotFound:
raise nose.SkipTest('DNS is not configured')
try:
self.run_command('dnsrecord_add',
dnszoneidnsname=TEST_ZONE,
idnsname=u'ns', arecord=u'1.2.3.4', force=True)
with self.fake_stdin('yes\n'):
self.check_command(
'dnsrecord_del %s ns' % TEST_ZONE,
'dnsrecord_del',
dnszoneidnsname=TEST_ZONE,
idnsname=u'ns',
del_all=True,
)
with self.fake_stdin('YeS\n'):
self.check_command(
'dnsrecord_del %s ns' % TEST_ZONE,
'dnsrecord_del',
dnszoneidnsname=TEST_ZONE,
idnsname=u'ns',
del_all=True,
)
finally:
self.run_command('dnszone_del', idnsname=TEST_ZONE)
def test_dnsrecord_del_one_by_one(self):
try:
self.run_command('dnszone_add', idnsname=TEST_ZONE)
except errors.NotFound:
raise nose.SkipTest('DNS is not configured')
try:
records = (u'1 1 E3B72BA346B90570EED94BE9334E34AA795CED23',
u'2 1 FD2693C1EFFC11A8D2BE57229212A04B45663791')
for record in records:
self.run_command('dnsrecord_add',
dnszoneidnsname=TEST_ZONE, idnsname=u'ns',
sshfprecord=record)
with self.fake_stdin('no\nyes\nyes\n'):
self.check_command(
'dnsrecord_del %s ns' % TEST_ZONE,
'dnsrecord_del',
dnszoneidnsname=TEST_ZONE,
idnsname=u'ns',
sshfprecord=records,
)
finally:
self.run_command('dnszone_del', idnsname=TEST_ZONE)
def test_dnsrecord_add_ask_for_missing_fields(self):
sshfp_parts = (1, 1, u'E3B72BA346B90570EED94BE9334E34AA795CED23')
with self.fake_stdin('SSHFP\n%d\n%d\n%s' % sshfp_parts):
self.check_command(
'dnsrecord-add %s sshfp' % TEST_ZONE,
'dnsrecord_add',
dnszoneidnsname=TEST_ZONE,
idnsname=u'sshfp',
sshfp_part_fp_type=sshfp_parts[0],
sshfp_part_algorithm=sshfp_parts[1],
sshfp_part_fingerprint=sshfp_parts[2],
)
# test with lowercase record type
with self.fake_stdin('sshfp\n%d\n%d\n%s' % sshfp_parts):
self.check_command(
'dnsrecord-add %s sshfp' % TEST_ZONE,
'dnsrecord_add',
dnszoneidnsname=TEST_ZONE,
idnsname=u'sshfp',
sshfp_part_fp_type=sshfp_parts[0],
sshfp_part_algorithm=sshfp_parts[1],
sshfp_part_fingerprint=sshfp_parts[2],
)
# NOTE: when a DNS record part is passed via command line, it is not
# converted to its base type when transfered via wire
with self.fake_stdin('%d\n%s' % (sshfp_parts[1], sshfp_parts[2])):
self.check_command(
'dnsrecord-add %s sshfp --sshfp-algorithm=%d' % (
TEST_ZONE, sshfp_parts[0]),
'dnsrecord_add',
dnszoneidnsname=TEST_ZONE,
idnsname=u'sshfp',
sshfp_part_fp_type=sshfp_parts[0],
# passed via cmdline
sshfp_part_algorithm=unicode(sshfp_parts[1]),
sshfp_part_fingerprint=sshfp_parts[2],
)
with self.fake_stdin(sshfp_parts[2]):
self.check_command(
'dnsrecord-add %s sshfp --sshfp-algorithm=%d '
'--sshfp-fp-type=%d' % (
TEST_ZONE, sshfp_parts[0], sshfp_parts[1]),
'dnsrecord_add',
dnszoneidnsname=TEST_ZONE,
idnsname=u'sshfp',
# passed via cmdline
sshfp_part_fp_type=unicode(sshfp_parts[0]),
# passed via cmdline
sshfp_part_algorithm=unicode(sshfp_parts[1]),
sshfp_part_fingerprint=sshfp_parts[2],
)
def test_dnsrecord_del_comma(self):
try:
self.run_command(
'dnszone_add', idnsname=TEST_ZONE)
except errors.NotFound:
raise nose.SkipTest('DNS is not configured')
try:
self.run_command(
'dnsrecord_add',
dnszoneidnsname=TEST_ZONE,
idnsname=u'test',
txtrecord=u'"A pretty little problem," said Holmes.')
with self.fake_stdin('no\nyes\n'):
self.check_command(
'dnsrecord_del %s test' % TEST_ZONE,
'dnsrecord_del',
dnszoneidnsname=TEST_ZONE,
idnsname=u'test',
txtrecord=[u'"A pretty little problem," said Holmes.'])
finally:
self.run_command('dnszone_del', idnsname=TEST_ZONE)
def test_idrange_add(self):
"""
Test idrange-add with interative prompt
"""
def test_with_interactive_input():
with self.fake_stdin('5\n500000\n'):
self.check_command(
'idrange_add range1 --base-id=1 --range-size=1',
'idrange_add',
cn=u'range1',
ipabaseid=u'1',
ipaidrangesize=u'1',
ipabaserid=5,
ipasecondarybaserid=500000,
)
def test_with_command_line_options():
self.check_command(
'idrange_add range1 --base-id=1 --range-size=1 '
'--rid-base=5 --secondary-rid-base=500000',
'idrange_add',
cn=u'range1',
ipabaseid=u'1',
ipaidrangesize=u'1',
ipabaserid=u'5',
ipasecondarybaserid=u'500000',
)
def test_without_options():
self.check_command(
'idrange_add range1 --base-id=1 --range-size=1',
'idrange_add',
cn=u'range1',
ipabaseid=u'1',
ipaidrangesize=u'1',
)
adtrust_dn = 'cn=ADTRUST,cn=%s,cn=masters,cn=ipa,cn=etc,%s' % \
(api.env.host, api.env.basedn)
adtrust_is_enabled = api.Command['adtrust_is_enabled']()['result']
mockldap = None
if not adtrust_is_enabled:
# ipa-adtrust-install not run - no need to pass rid-base
# and secondary-rid-base
test_without_options()
# Create a mock service object to test against
adtrust_add = dict(
ipaconfigstring=b'enabledService',
objectclass=[b'top', b'nsContainer', b'ipaConfigObject']
)
mockldap = util.MockLDAP()
mockldap.add_entry(adtrust_dn, adtrust_add)
# Pass rid-base and secondary-rid-base interactively
test_with_interactive_input()
# Pass rid-base and secondary-rid-base on the command-line
test_with_command_line_options()
if not adtrust_is_enabled:
mockldap.del_entry(adtrust_dn)
def test_cli_fsencoding():
# https://pagure.io/freeipa/issue/5887
env = {
key: value for key, value in os.environ.items()
if not key.startswith(('LC_', 'LANG'))
}
env['LC_ALL'] = 'C'
env['PYTHONPATH'] = BASE_DIR
p = subprocess.Popen(
[sys.executable, '-m', 'ipaclient', 'help'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
out, err = p.communicate()
assert p.returncode > 0, (out, err)
assert b'System encoding must be UTF-8' in err, (out, err)

View File

@@ -0,0 +1,149 @@
# Authors: Petr Viktorin <pviktori@redhat.com>
#
# Copyright (C) 2012 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys
import six
from six import StringIO
from ipalib import api, errors
from ipaserver.plugins.user import user_add
import pytest
if six.PY3:
unicode = str
pytestmark = pytest.mark.needs_ipaapi
@pytest.mark.tier0
class CLITestContext(object):
"""Context manager that replaces stdout & stderr, and catches SystemExit
Whatever was printed to the streams is available in ``stdout`` and
``stderr`` attrributes once the with statement finishes.
When exception is given, asserts that exception is raised. The exception
will be available in the ``exception`` attribute.
"""
def __init__(self, exception=None):
self.exception = exception
def __enter__(self):
self.old_streams = sys.stdout, sys.stderr
self.stdout_fileobj = sys.stdout = StringIO()
self.stderr_fileobj = sys.stderr = StringIO()
return self
def __exit__(self, exc_type, exc_value, traceback):
sys.stdout, sys.stderr = self.old_streams
self.stdout = self.stdout_fileobj.getvalue()
self.stderr = self.stderr_fileobj.getvalue()
self.stdout_fileobj.close()
self.stderr_fileobj.close()
if self.exception:
if not isinstance(exc_value, self.exception):
return False
self.exception = exc_value
return True
def test_ipa_help():
"""Test that `ipa help` only writes to stdout"""
with CLITestContext() as ctx:
return_value = api.Backend.cli.run(['help'])
assert return_value == 0
assert ctx.stderr == ''
def test_ipa_without_arguments():
"""Test that `ipa` errors out, and prints the help to stderr"""
with CLITestContext(exception=SystemExit) as ctx:
api.Backend.cli.run([])
assert ctx.exception.code == 2
assert ctx.stdout == ''
assert 'Error: Command not specified' in ctx.stderr
with CLITestContext() as help_ctx:
api.Backend.cli.run(['help'])
assert help_ctx.stdout in ctx.stderr
def test_bare_topic():
"""Test that `ipa user` errors out, and prints the help to stderr
This is because `user` is a topic, not a command, so `ipa user` doesn't
match our usage string. The help should be accessed using `ipa help user`.
"""
with CLITestContext(exception=errors.CommandError) as ctx:
api.Backend.cli.run(['user'])
assert ctx.exception.name == 'user'
assert ctx.stdout == ''
with CLITestContext() as help_ctx:
return_value = api.Backend.cli.run(['help', 'user'])
assert return_value == 0
assert help_ctx.stdout in ctx.stderr
def test_command_help():
"""Test that `help user-add` & `user-add -h` are equivalent and contain doc
"""
with CLITestContext() as help_ctx:
return_value = api.Backend.cli.run(['help', 'user-add'])
assert return_value == 0
assert help_ctx.stderr == ''
with CLITestContext(exception=SystemExit) as h_ctx:
api.Backend.cli.run(['user-add', '-h'])
assert h_ctx.exception.code == 0
assert h_ctx.stderr == ''
assert h_ctx.stdout == help_ctx.stdout
assert unicode(user_add.doc) in help_ctx.stdout
def test_ambiguous_command_or_topic():
"""Test that `help ping` & `ping -h` are NOT equivalent
One is a topic, the other is a command
"""
with CLITestContext() as help_ctx:
return_value = api.Backend.cli.run(['help', 'ping'])
assert return_value == 0
assert help_ctx.stderr == ''
with CLITestContext(exception=SystemExit) as h_ctx:
api.Backend.cli.run(['ping', '-h'])
assert h_ctx.exception.code == 0
assert h_ctx.stderr == ''
assert h_ctx.stdout != help_ctx.stdout
def test_multiline_description():
"""Test that all of a multi-line command description appears in output
"""
# This assumes trust_add has multiline doc. Ensure it is so.
assert '\n\n' in unicode(api.Command.trust_add.doc).strip()
with CLITestContext(exception=SystemExit) as help_ctx:
api.Backend.cli.run(['trust-add', '-h'])
assert unicode(api.Command.trust_add.doc).strip() in help_ctx.stdout

View File

@@ -0,0 +1,325 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Test `ipa-getkeytab`
"""
import os
import shutil
import tempfile
import gssapi
import pytest
from ipalib import api
from ipaplatform.paths import paths
from ipapython import ipautil, ipaldap
from ipaserver.plugins.ldap2 import ldap2
from ipatests.test_cmdline.cmdline import cmdline_test
from ipatests.test_xmlrpc.tracker import host_plugin, service_plugin
def use_keytab(principal, keytab):
try:
tmpdir = tempfile.mkdtemp(prefix = "tmp-")
ccache_file = 'FILE:%s/ccache' % tmpdir
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
store = {'ccache': ccache_file,
'client_keytab': keytab}
os.environ['KRB5CCNAME'] = ccache_file
gssapi.Credentials(name=name, usage='initiate', store=store)
conn = ldap2(api)
conn.connect(autobind=ipaldap.AUTOBIND_DISABLED)
conn.disconnect()
except gssapi.exceptions.GSSError as e:
raise Exception('Unable to bind to LDAP. Error initializing principal %s in %s: %s' % (principal, keytab, str(e)))
finally:
os.environ.pop('KRB5CCNAME', None)
if tmpdir:
shutil.rmtree(tmpdir)
@pytest.fixture(scope='class')
def test_host(request):
host_tracker = host_plugin.HostTracker(u'test-host')
return host_tracker.make_fixture(request)
@pytest.fixture(scope='class')
def test_service(request, test_host):
service_tracker = service_plugin.ServiceTracker(u'srv', test_host.name)
test_host.ensure_exists()
return service_tracker.make_fixture(request)
@pytest.mark.needs_ipaapi
class KeytabRetrievalTest(cmdline_test):
"""
Base class for keytab retrieval tests
"""
command = "ipa-getkeytab"
keytabname = None
@classmethod
def setup_class(cls):
super(KeytabRetrievalTest, cls).setup_class()
keytabfd, keytabname = tempfile.mkstemp()
os.close(keytabfd)
os.unlink(keytabname)
cls.keytabname = keytabname
@classmethod
def teardown_class(cls):
super(KeytabRetrievalTest, cls).teardown_class()
try:
os.unlink(cls.keytabname)
except OSError:
pass
def run_ipagetkeytab(self, service_principal, args=tuple(),
raiseonerr=False):
new_args = [self.command,
"-p", service_principal,
"-k", self.keytabname]
if not args:
new_args.extend(['-s', api.env.host])
else:
new_args.extend(list(args))
return ipautil.run(
new_args,
stdin=None,
raiseonerr=raiseonerr,
capture_error=True)
def assert_success(self, *args, **kwargs):
result = self.run_ipagetkeytab(*args, **kwargs)
expected = 'Keytab successfully retrieved and stored in: %s\n' % (
self.keytabname)
assert expected in result.error_output, (
'Success message not in output:\n%s' % result.error_output)
def assert_failure(self, retcode, message, *args, **kwargs):
result = self.run_ipagetkeytab(*args, **kwargs)
err = result.error_output
assert message in err
rc = result.returncode
assert rc == retcode
@pytest.mark.tier0
class test_ipagetkeytab(KeytabRetrievalTest):
"""
Test `ipa-getkeytab`.
"""
command = "ipa-getkeytab"
keytabname = None
def test_1_run(self, test_service):
"""
Create a keytab with `ipa-getkeytab` for a non-existent service.
"""
test_service.ensure_missing()
result = self.run_ipagetkeytab(test_service.name)
err = result.error_output
assert 'Failed to parse result: PrincipalName not found.\n' in err, err
rc = result.returncode
assert rc > 0, rc
def test_2_run(self, test_service):
"""
Create a keytab with `ipa-getkeytab` for an existing service.
"""
test_service.ensure_exists()
self.assert_success(test_service.name, raiseonerr=True)
def test_3_use(self, test_service):
"""
Try to use the service keytab.
"""
use_keytab(test_service.name, self.keytabname)
def test_4_disable(self, test_service):
"""
Disable a kerberos principal
"""
retrieve_cmd = test_service.make_retrieve_command()
result = retrieve_cmd()
# Verify that it has a principal key
assert result[u'result'][u'has_keytab']
# Disable it
disable_cmd = test_service.make_disable_command()
disable_cmd()
# Verify that it looks disabled
result = retrieve_cmd()
assert not result[u'result'][u'has_keytab']
def test_5_use_disabled(self, test_service):
"""
Try to use the disabled keytab
"""
try:
use_keytab(test_service.name, self.keytabname)
except Exception as errmsg:
assert('Unable to bind to LDAP. Error initializing principal' in str(errmsg))
class TestBindMethods(KeytabRetrievalTest):
"""
Class that tests '-c'/'-H'/'-Y' flags
"""
dm_password = None
ca_cert = None
@classmethod
def setup_class(cls):
super(TestBindMethods, cls).setup_class()
dmpw_file = os.path.join(api.env.dot_ipa, '.dmpw')
if not os.path.isfile(dmpw_file):
pytest.skip('{} file required for this test'.format(dmpw_file))
with open(dmpw_file, 'r') as f:
cls.dm_password = f.read().strip()
tempfd, temp_ca_cert = tempfile.mkstemp()
os.close(tempfd)
shutil.copy(os.path.join(paths.IPA_CA_CRT), temp_ca_cert)
cls.ca_cert = temp_ca_cert
@classmethod
def teardown_class(cls):
super(TestBindMethods, cls).teardown_class()
try:
os.unlink(cls.ca_cert)
except OSError:
pass
def check_ldapi(self):
if not api.env.ldap_uri.startswith('ldapi://'):
pytest.skip("LDAP URI not pointing to LDAPI socket")
def test_retrieval_with_dm_creds(self, test_service):
test_service.ensure_exists()
self.assert_success(
test_service.name,
args=[
'-D', "cn=Directory Manager",
'-w', self.dm_password,
'-s', api.env.host])
def test_retrieval_using_plain_ldap(self, test_service):
test_service.ensure_exists()
ldap_uri = 'ldap://{}'.format(api.env.host)
self.assert_success(
test_service.name,
args=[
'-D', "cn=Directory Manager",
'-w', self.dm_password,
'-H', ldap_uri])
@pytest.mark.skipif(os.geteuid() != 0,
reason="Must have root privileges to run this test")
def test_retrieval_using_ldapi_external(self, test_service):
test_service.ensure_exists()
self.check_ldapi()
self.assert_success(
test_service.name,
args=[
'-Y',
'EXTERNAL',
'-H', api.env.ldap_uri])
def test_retrieval_using_ldap_gssapi(self, test_service):
test_service.ensure_exists()
self.check_ldapi()
self.assert_success(
test_service.name,
args=[
'-Y',
'GSSAPI',
'-H', api.env.ldap_uri])
def test_retrieval_using_ldaps_ca_cert(self, test_service):
test_service.ensure_exists()
self.assert_success(
test_service.name,
args=[
'-D', "cn=Directory Manager",
'-w', self.dm_password,
'-H', 'ldaps://{}'.format(api.env.host),
'--cacert', self.ca_cert])
def test_ldap_uri_server_raises_error(self, test_service):
test_service.ensure_exists()
self.assert_failure(
2,
"Cannot specify server and LDAP uri simultaneously",
test_service.name,
args=[
'-H', 'ldaps://{}'.format(api.env.host),
'-s', api.env.host],
raiseonerr=False)
def test_invalid_mech_raises_error(self, test_service):
test_service.ensure_exists()
self.assert_failure(
2,
"Invalid SASL bind mechanism",
test_service.name,
args=[
'-H', 'ldaps://{}'.format(api.env.host),
'-Y', 'BOGUS'],
raiseonerr=False)
def test_mech_bind_dn_raises_error(self, test_service):
test_service.ensure_exists()
self.assert_failure(
2,
"Cannot specify both SASL mechanism and bind DN simultaneously",
test_service.name,
args=[
'-D', "cn=Directory Manager",
'-w', self.dm_password,
'-H', 'ldaps://{}'.format(api.env.host),
'-Y', 'EXTERNAL'],
raiseonerr=False)