Imported Upstream version 4.8.10

This commit is contained in:
Mario Fetka
2021-10-03 11:06:28 +02:00
parent 10dfc9587b
commit 03a8170b15
2361 changed files with 1883897 additions and 338759 deletions

View File

@@ -0,0 +1,7 @@
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
import ipatests.util
ipatests.util.check_ipaclient_unittests()

View File

@@ -21,11 +21,13 @@
Base class for all cmdline tests
"""
import nose
import krbV
from __future__ import absolute_import
import distutils.spawn
import os
import pytest
from ipalib import api
from ipalib import errors
from ipaplatform.paths import paths
@@ -33,16 +35,14 @@ from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test
from ipaserver.plugins.ldap2 import ldap2
# See if our LDAP server is up and we can talk to it over GSSAPI
ccache = krbV.default_context().default_ccache()
try:
conn = ldap2(shared_instance=False, ldap_uri=api.env.ldap_uri, base_dn=api.env.basedn)
conn.connect(ccache=ccache)
conn = ldap2(api)
conn.connect()
conn.disconnect()
server_available = True
except errors.DatabaseError:
server_available = False
except Exception, e:
except Exception as e:
server_available = False
class cmdline_test(XMLRPC_test):
@@ -52,27 +52,22 @@ class cmdline_test(XMLRPC_test):
# some reasonable default command
command = paths.LS
def setUp(self):
@pytest.fixture(autouse=True, scope="class")
def cmdline_setup(self, request, xmlrpc_setup):
# Find the executable in $PATH
# This is neded because ipautil.run resets the PATH to
# a system default.
original_command = self.command
if not os.path.isabs(self.command):
self.command = distutils.spawn.find_executable(self.command)
cls = request.cls
original_command = cls.command
if not os.path.isabs(cls.command):
cls.command = distutils.spawn.find_executable(cls.command)
# raise an error if the command is missing even if the remote
# server is not available.
if not self.command:
if not cls.command:
raise AssertionError(
'Command %r not available' % original_command
)
super(cmdline_test, self).setUp()
if not server_available:
raise nose.SkipTest(
pytest.skip(
'Server not available: %r' % api.env.xmlrpc_uri
)
def tearDown(self):
"""
nose tear-down fixture.
"""
super(cmdline_test, self).tearDown()

View File

@@ -1,16 +1,30 @@
import shlex
import sys
import contextlib
import StringIO
import os
from io import StringIO
import shlex
import subprocess
import sys
import tempfile
import nose
import six
from ipatests import util
from ipatests.test_ipalib.test_x509 import goodcert_headers
from ipalib import api, errors
from ipapython.version import API_VERSION
import pytest
if six.PY3:
unicode = str
TEST_ZONE = u'zoneadd.%(domain)s' % api.env
HERE = os.path.abspath(os.path.dirname(__file__))
BASE_DIR = os.path.abspath(os.path.join(HERE, os.pardir, os.pardir))
class TestCLIParsing(object):
@pytest.mark.tier0
@pytest.mark.needs_ipaapi
class TestCLIParsing:
"""Tests that commandlines are correctly parsed to Command keyword args
"""
def check_command(self, commandline, expected_command_name, **kw_expected):
@@ -26,54 +40,40 @@ class TestCLIParsing(object):
def run_command(self, command_name, **kw):
"""Run a command on the server"""
if not api.Backend.rpcclient.isconnected():
api.Backend.rpcclient.connect(fallback=False)
api.Backend.rpcclient.connect()
try:
api.Command[command_name](**kw)
except errors.NetworkError:
raise nose.SkipTest('%r: Server not available: %r' %
(self.__module__, api.env.xmlrpc_uri))
pytest.skip('%r: Server not available: %r' %
(self.__module__, api.env.xmlrpc_uri))
@contextlib.contextmanager
def fake_stdin(self, string_in):
"""Context manager that temporarily replaces stdin to read a string"""
old_stdin = sys.stdin
sys.stdin = StringIO.StringIO(string_in)
sys.stdin = StringIO(string_in)
yield
sys.stdin = old_stdin
def test_ping(self):
self.check_command('ping', 'ping',
version=API_VERSION)
self.check_command('ping', 'ping')
def test_plugins(self):
self.check_command('plugins', 'plugins')
def test_user_show(self):
self.check_command('user-show admin', 'user_show',
uid=u'admin',
rights=False,
no_members=False,
raw=False,
all=False,
version=API_VERSION)
self.check_command('user-show admin', 'user_show', uid=u'admin')
def test_user_show_underscore(self):
self.check_command('user_show admin', 'user_show',
uid=u'admin',
rights=False,
no_members=False,
raw=False,
all=False,
version=API_VERSION)
self.check_command('user_show admin', 'user_show', uid=u'admin')
def test_group_add(self):
self.check_command('group-add tgroup1 --desc="Test group"',
self.check_command(
'group-add tgroup1 --desc="Test group"',
'group_add',
cn=u'tgroup1',
description=u'Test group',
nonposix=False,
external=False,
no_members=False,
raw=False,
all=False,
version=API_VERSION)
)
def test_sudocmdgroup_add_member(self):
# Test CSV splitting is not done
@@ -83,255 +83,170 @@ class TestCLIParsing(object):
'sudocmdgroup_add_member',
cn=u'tcmdgroup1',
sudocmd=[u'ab,c', u'd'],
no_members=False,
raw=False,
all=False,
version=API_VERSION)
)
def test_group_add_nonposix(self):
self.check_command('group-add tgroup1 --desc="Test group" --nonposix',
self.check_command(
'group-add tgroup1 --desc="Test group" --nonposix',
'group_add',
cn=u'tgroup1',
description=u'Test group',
nonposix=True,
external=False,
no_members=False,
raw=False,
all=False,
version=API_VERSION)
)
def test_group_add_gid(self):
self.check_command('group-add tgroup1 --desc="Test group" --gid=1234',
self.check_command(
'group-add tgroup1 --desc="Test group" --gid=1234',
'group_add',
cn=u'tgroup1',
description=u'Test group',
gidnumber=u'1234',
nonposix=False,
external=False,
no_members=False,
raw=False,
all=False,
version=API_VERSION)
)
def test_group_add_interactive(self):
with self.fake_stdin('Test group\n'):
self.check_command('group-add tgroup1', 'group_add',
self.check_command(
'group-add tgroup1', 'group_add',
cn=u'tgroup1',
description=u'Test group',
nonposix=False,
external=False,
no_members=False,
raw=False,
all=False,
version=API_VERSION)
)
def test_dnsrecord_add(self):
self.check_command('dnsrecord-add test-example.com ns --a-rec=1.2.3.4',
self.check_command(
'dnsrecord-add %s ns --a-rec=1.2.3.4' % TEST_ZONE,
'dnsrecord_add',
dnszoneidnsname=u'test-example.com',
dnszoneidnsname=TEST_ZONE,
idnsname=u'ns',
arecord=u'1.2.3.4',
structured=False,
force=False,
raw=False,
all=False,
version=API_VERSION)
)
def test_dnsrecord_del_all(self):
try:
self.run_command('dnszone_add', idnsname=u'test-example.com',
idnssoamname=u'ns.test-example.com', force=True)
self.run_command('dnszone_add', idnsname=TEST_ZONE)
except errors.NotFound:
raise nose.SkipTest('DNS is not configured')
pytest.skip('DNS is not configured')
try:
self.run_command('dnsrecord_add',
dnszoneidnsname=u'test-example.com',
idnsname=u'ns', arecord=u'1.2.3.4')
dnszoneidnsname=TEST_ZONE,
idnsname=u'ns', arecord=u'1.2.3.4', force=True)
with self.fake_stdin('yes\n'):
self.check_command('dnsrecord_del test-example.com ns',
self.check_command(
'dnsrecord_del %s ns' % TEST_ZONE,
'dnsrecord_del',
dnszoneidnsname=u'test-example.com',
dnszoneidnsname=TEST_ZONE,
idnsname=u'ns',
del_all=True,
structured=False,
version=API_VERSION)
)
with self.fake_stdin('YeS\n'):
self.check_command('dnsrecord_del test-example.com ns',
self.check_command(
'dnsrecord_del %s ns' % TEST_ZONE,
'dnsrecord_del',
dnszoneidnsname=u'test-example.com',
dnszoneidnsname=TEST_ZONE,
idnsname=u'ns',
del_all=True,
structured=False,
version=API_VERSION)
)
finally:
self.run_command('dnszone_del', idnsname=u'test-example.com')
self.run_command('dnszone_del', idnsname=TEST_ZONE)
def test_dnsrecord_del_one_by_one(self):
try:
self.run_command('dnszone_add', idnsname=u'test-example.com',
idnssoamname=u'ns.test-example.com', force=True)
self.run_command('dnszone_add', idnsname=TEST_ZONE)
except errors.NotFound:
raise nose.SkipTest('DNS is not configured')
pytest.skip('DNS is not configured')
try:
records = (u'1 1 E3B72BA346B90570EED94BE9334E34AA795CED23',
u'2 1 FD2693C1EFFC11A8D2BE57229212A04B45663791')
for record in records:
self.run_command('dnsrecord_add',
dnszoneidnsname=u'test-example.com', idnsname=u'ns',
dnszoneidnsname=TEST_ZONE, idnsname=u'ns',
sshfprecord=record)
with self.fake_stdin('no\nyes\nyes\n'):
self.check_command('dnsrecord_del test-example.com ns',
self.check_command(
'dnsrecord_del %s ns' % TEST_ZONE,
'dnsrecord_del',
dnszoneidnsname=u'test-example.com',
dnszoneidnsname=TEST_ZONE,
idnsname=u'ns',
del_all=False,
sshfprecord=records,
structured=False,
version=API_VERSION)
)
finally:
self.run_command('dnszone_del', idnsname=u'test-example.com')
self.run_command('dnszone_del', idnsname=TEST_ZONE)
def test_dnsrecord_add_ask_for_missing_fields(self):
sshfp_parts = (1, 1, u'E3B72BA346B90570EED94BE9334E34AA795CED23')
with self.fake_stdin('SSHFP\n%d\n%d\n%s' % sshfp_parts):
self.check_command('dnsrecord-add test-example.com sshfp',
self.check_command(
'dnsrecord-add %s sshfp' % TEST_ZONE,
'dnsrecord_add',
dnszoneidnsname=u'test-example.com',
dnszoneidnsname=TEST_ZONE,
idnsname=u'sshfp',
sshfp_part_fp_type=sshfp_parts[0],
sshfp_part_algorithm=sshfp_parts[1],
sshfp_part_fingerprint=sshfp_parts[2],
structured=False,
raw=False,
all=False,
force=False,
version=API_VERSION)
)
# test with lowercase record type
with self.fake_stdin('sshfp\n%d\n%d\n%s' % sshfp_parts):
self.check_command(
'dnsrecord-add %s sshfp' % TEST_ZONE,
'dnsrecord_add',
dnszoneidnsname=TEST_ZONE,
idnsname=u'sshfp',
sshfp_part_fp_type=sshfp_parts[0],
sshfp_part_algorithm=sshfp_parts[1],
sshfp_part_fingerprint=sshfp_parts[2],
)
# NOTE: when a DNS record part is passed via command line, it is not
# converted to its base type when transfered via wire
with self.fake_stdin('%d\n%s' % (sshfp_parts[1], sshfp_parts[2])):
self.check_command('dnsrecord-add test-example.com sshfp ' \
'--sshfp-algorithm=%d' % sshfp_parts[0],
self.check_command(
'dnsrecord-add %s sshfp --sshfp-algorithm=%d' % (
TEST_ZONE, sshfp_parts[0]),
'dnsrecord_add',
dnszoneidnsname=u'test-example.com',
dnszoneidnsname=TEST_ZONE,
idnsname=u'sshfp',
sshfp_part_fp_type=sshfp_parts[0],
sshfp_part_algorithm=unicode(sshfp_parts[1]), # passed via cmdline
# passed via cmdline
sshfp_part_algorithm=unicode(sshfp_parts[1]),
sshfp_part_fingerprint=sshfp_parts[2],
structured=False,
raw=False,
all=False,
force=False,
version=API_VERSION)
)
with self.fake_stdin(sshfp_parts[2]):
self.check_command('dnsrecord-add test-example.com sshfp ' \
'--sshfp-algorithm=%d --sshfp-fp-type=%d' % (sshfp_parts[0], sshfp_parts[1]),
self.check_command(
'dnsrecord-add %s sshfp --sshfp-algorithm=%d '
'--sshfp-fp-type=%d' % (
TEST_ZONE, sshfp_parts[0], sshfp_parts[1]),
'dnsrecord_add',
dnszoneidnsname=u'test-example.com',
dnszoneidnsname=TEST_ZONE,
idnsname=u'sshfp',
sshfp_part_fp_type=unicode(sshfp_parts[0]), # passed via cmdline
sshfp_part_algorithm=unicode(sshfp_parts[1]), # passed via cmdline
# passed via cmdline
sshfp_part_fp_type=unicode(sshfp_parts[0]),
# passed via cmdline
sshfp_part_algorithm=unicode(sshfp_parts[1]),
sshfp_part_fingerprint=sshfp_parts[2],
structured=False,
raw=False,
all=False,
force=False,
version=API_VERSION)
)
def test_dnsrecord_del_comma(self):
try:
self.run_command(
'dnszone_add', idnsname=u'test-example.com',
idnssoamname=u'ns.test-example.com', force=True)
'dnszone_add', idnsname=TEST_ZONE)
except errors.NotFound:
raise nose.SkipTest('DNS is not configured')
pytest.skip('DNS is not configured')
try:
self.run_command(
'dnsrecord_add',
dnszoneidnsname=u'test-example.com',
dnszoneidnsname=TEST_ZONE,
idnsname=u'test',
txtrecord=u'"A pretty little problem," said Holmes.')
with self.fake_stdin('no\nyes\n'):
self.check_command(
'dnsrecord_del test-example.com test',
'dnsrecord_del %s test' % TEST_ZONE,
'dnsrecord_del',
dnszoneidnsname=u'test-example.com',
dnszoneidnsname=TEST_ZONE,
idnsname=u'test',
del_all=False,
txtrecord=[u'"A pretty little problem," said Holmes.'],
structured=False,
version=API_VERSION)
txtrecord=[u'"A pretty little problem," said Holmes.'])
finally:
self.run_command('dnszone_del', idnsname=u'test-example.com')
def test_dnszone_add(self):
"""
Test dnszone-add with nameserver IP passed interatively
"""
# Pass IP of nameserver interactively for nameserver in zone
# (absolute name)
with self.fake_stdin('1.1.1.1\n'):
self.check_command(
'dnszone_add example.com --name-server=ns.example.com. '
'--admin-email=admin@example.com',
'dnszone_add',
idnsname=u'example.com',
idnssoamname=u'ns.example.com.',
idnssoarname=u'admin@example.com',
ip_address=u'1.1.1.1',
idnssoaexpire=util.Fuzzy(type=int),
idnssoaserial=util.Fuzzy(type=int),
idnssoaretry=util.Fuzzy(type=int),
idnssoaminimum=util.Fuzzy(type=int),
idnssoarefresh=util.Fuzzy(type=int),
all=False,
raw=False,
force=False,
version=API_VERSION
)
# Pass IP of nameserver interactively for nameserver in zone
# (relative name)
with self.fake_stdin('1.1.1.1\n'):
self.check_command(
'dnszone_add example.com --name-server=ns '
'--admin-email=admin@example.com',
'dnszone_add',
idnsname=u'example.com',
idnssoamname=u'ns',
idnssoarname=u'admin@example.com',
ip_address=u'1.1.1.1',
idnssoaexpire=util.Fuzzy(type=int),
idnssoaserial=util.Fuzzy(type=int),
idnssoaretry=util.Fuzzy(type=int),
idnssoaminimum=util.Fuzzy(type=int),
idnssoarefresh=util.Fuzzy(type=int),
all=False,
raw=False,
force=False,
version=API_VERSION
)
# Nameserver is outside the zone - no need to pass the IP
self.check_command(
'dnszone_add example.com --name-server=ns.example.net. '
'--admin-email=admin@example.com',
'dnszone_add',
idnsname=u'example.com',
idnssoamname=u'ns.example.net.',
idnssoarname=u'admin@example.com',
idnssoaexpire=util.Fuzzy(type=int),
idnssoaserial=util.Fuzzy(type=int),
idnssoaretry=util.Fuzzy(type=int),
idnssoaminimum=util.Fuzzy(type=int),
idnssoarefresh=util.Fuzzy(type=int),
all=False,
raw=False,
force=False,
version=API_VERSION
)
self.run_command('dnszone_del', idnsname=TEST_ZONE)
def test_idrange_add(self):
"""
@@ -347,9 +262,6 @@ class TestCLIParsing(object):
ipaidrangesize=u'1',
ipabaserid=5,
ipasecondarybaserid=500000,
all=False,
raw=False,
version=API_VERSION
)
def test_with_command_line_options():
@@ -362,9 +274,6 @@ class TestCLIParsing(object):
ipaidrangesize=u'1',
ipabaserid=u'5',
ipasecondarybaserid=u'500000',
all=False,
raw=False,
version=API_VERSION
)
def test_without_options():
@@ -374,9 +283,6 @@ class TestCLIParsing(object):
cn=u'range1',
ipabaseid=u'1',
ipaidrangesize=u'1',
all=False,
raw=False,
version=API_VERSION
)
adtrust_dn = 'cn=ADTRUST,cn=%s,cn=masters,cn=ipa,cn=etc,%s' % \
@@ -391,8 +297,8 @@ class TestCLIParsing(object):
# Create a mock service object to test against
adtrust_add = dict(
ipaconfigstring='enabledService',
objectclass=['top', 'nsContainer', 'ipaConfigObject']
ipaconfigstring=b'enabledService',
objectclass=[b'top', b'nsContainer', b'ipaConfigObject']
)
mockldap = util.MockLDAP()
@@ -406,3 +312,98 @@ class TestCLIParsing(object):
if not adtrust_is_enabled:
mockldap.del_entry(adtrust_dn)
def test_certfind(self):
with tempfile.NamedTemporaryFile() as f:
f.write(goodcert_headers)
f.flush()
self.check_command(
'cert_find --file={}'.format(f.name),
'cert_find',
file=goodcert_headers
)
def test_cli_fsencoding():
# https://pagure.io/freeipa/issue/5887
env = {
key: value for key, value in os.environ.items()
if not key.startswith(('LC_', 'LANG'))
}
env['LC_ALL'] = 'C'
env['PYTHONPATH'] = BASE_DIR
# override confdir so test always fails and does not depend on an
# existing installation.
env['IPA_CONFDIR'] = '/'
p = subprocess.Popen(
[sys.executable, '-m', 'ipaclient', 'help'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
out, err = p.communicate()
assert p.returncode != 0, (out, err)
if sys.version_info >= (3, 7):
# Python 3.7+ has PEP 538: Legacy C Locale Coercion
assert b'IPA client is not configured' in err, (out, err)
else:
# Python 3.6 does not support UTF-8 fs encoding with non-UTF LC
assert b'System encoding must be UTF-8' in err, (out, err)
IPA_NOT_CONFIGURED = b'IPA is not configured on this system'
IPA_CLIENT_NOT_CONFIGURED = b'IPA client is not configured on this system'
@pytest.mark.needs_ipaapi
@pytest.mark.skipif(
os.geteuid() != 0 or os.path.isfile('/etc/ipa/default.conf'),
reason="Must have root privileges to run this test "
"and IPA must not be installed")
@pytest.mark.parametrize(
"args, retcode, output, error",
[
# Commands delivered by the client pkg
(['ipa'], 1, None, IPA_CLIENT_NOT_CONFIGURED),
(['ipa-certupdate'], 2, None, IPA_CLIENT_NOT_CONFIGURED),
(['ipa-client-automount'], 2, IPA_CLIENT_NOT_CONFIGURED, None),
# Commands delivered by the server pkg
(['ipa-adtrust-install'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-advise'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-backup'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-cacert-manage'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-ca-install'], 1, None,
b'IPA server is not configured on this system'),
(['ipa-compat-manage'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-crlgen-manage'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-csreplica-manage'], 1, None, IPA_NOT_CONFIGURED),
(['ipactl', 'status'], 4, None, b'IPA is not configured'),
(['ipa-dns-install'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-kra-install'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-ldap-updater',
'/usr/share/ipa/updates/05-pre_upgrade_plugins.update'],
2, None, IPA_NOT_CONFIGURED),
(['ipa-managed-entries'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-nis-manage'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-pkinit-manage'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-replica-manage', 'list'], 1, IPA_NOT_CONFIGURED, None),
(['ipa-server-certinstall'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-server-upgrade'], 2, None, IPA_NOT_CONFIGURED),
(['ipa-winsync-migrate'], 1, None, IPA_NOT_CONFIGURED)
])
def test_command_ipa_not_installed(args, retcode, output, error):
"""
Test that the commands properly return that IPA client|server is not
configured on this system.
Launch the command specified in args.
Check that the exit code is as expected and that stdout and stderr
contain the expected strings.
"""
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
assert retcode == p.returncode
if output:
assert output in out
if error:
assert error in err

View File

@@ -0,0 +1,44 @@
#
# Copyright (C) 2019 FreeIPA Contributors see COPYING for license
#
import inspect
import io
import pydoc
import pytest
from ipalib import api
@pytest.fixture()
def api_obj():
if not api.Backend.rpcclient.isconnected():
api.Backend.rpcclient.connect()
yield api
@pytest.mark.tier0
@pytest.mark.needs_ipaapi
class TestIPAConsole:
def run_pydoc(self, plugin):
s = io.StringIO()
# help() calls pydoc.doc() with pager
pydoc.doc(plugin, "Help %s", output=s)
return s.getvalue()
def test_dir(self, api_obj):
assert "Command" in dir(api_obj)
assert "group_add" in dir(api_obj.Command)
def test_signature(self, api_obj):
sig = api_obj.Command.group_add.__signature__
assert isinstance(sig, inspect.Signature)
params = sig.parameters
assert params['cn'].kind is inspect.Parameter.POSITIONAL_OR_KEYWORD
assert params['cn'].annotation is str
assert params['description'].kind is inspect.Parameter.KEYWORD_ONLY
def test_help(self, api_obj):
s = self.run_pydoc(api_obj.Command.group_add)
# check for __signature__ in help()
assert "group_add(cn: str, *, description: str = None," in s

View File

@@ -18,16 +18,26 @@
#
import sys
import contextlib
import StringIO
import os
from io import StringIO
import shutil
import errno
from nose.tools import assert_raises # pylint: disable=E0611
import six
from ipalib import api, errors
from ipalib.plugins.user import user_add
from ipaserver.plugins.user import user_add
import pytest
if six.PY3:
unicode = str
class CLITestContext(object):
pytestmark = pytest.mark.needs_ipaapi
@pytest.mark.tier0
class CLITestContext:
"""Context manager that replaces stdout & stderr, and catches SystemExit
Whatever was printed to the streams is available in ``stdout`` and
@@ -41,8 +51,8 @@ class CLITestContext(object):
def __enter__(self):
self.old_streams = sys.stdout, sys.stderr
self.stdout_fileobj = sys.stdout = StringIO.StringIO()
self.stderr_fileobj = sys.stderr = StringIO.StringIO()
self.stdout_fileobj = sys.stdout = StringIO()
self.stderr_fileobj = sys.stderr = StringIO()
return self
def __exit__(self, exc_type, exc_value, traceback):
@@ -56,6 +66,8 @@ class CLITestContext(object):
return False
self.exception = exc_value
return True
else:
return None
def test_ipa_help():
@@ -66,6 +78,27 @@ def test_ipa_help():
assert ctx.stderr == ''
def test_ipa_help_without_cache():
"""Test `ipa help` without schema cache"""
cache_dir = os.path.expanduser('~/.cache/ipa/schema/')
backup_dir = os.path.expanduser('~/.cache/ipa/schema.bak/')
shutil.rmtree(backup_dir, ignore_errors=True)
if os.path.isdir(cache_dir):
os.rename(cache_dir, backup_dir)
try:
with CLITestContext() as ctx:
return_value = api.Backend.cli.run(['help'])
assert return_value == 0
assert ctx.stderr == ''
finally:
shutil.rmtree(cache_dir, ignore_errors=True)
try:
os.rename(backup_dir, cache_dir)
except OSError as e:
if e.errno != errno.ENOENT:
raise
def test_ipa_without_arguments():
"""Test that `ipa` errors out, and prints the help to stderr"""
with CLITestContext(exception=SystemExit) as ctx:
@@ -110,7 +143,7 @@ def test_command_help():
assert h_ctx.stderr == ''
assert h_ctx.stdout == help_ctx.stdout
assert unicode(user_add.__doc__) in help_ctx.stdout
assert unicode(user_add.doc) in help_ctx.stdout
def test_ambiguous_command_or_topic():
@@ -130,6 +163,7 @@ def test_ambiguous_command_or_topic():
assert h_ctx.stdout != help_ctx.stdout
def test_multiline_description():
"""Test that all of a multi-line command description appears in output
"""
@@ -137,6 +171,6 @@ def test_multiline_description():
assert '\n\n' in unicode(api.Command.trust_add.doc).strip()
with CLITestContext(exception=SystemExit) as help_ctx:
return_value = api.Backend.cli.run(['trust-add', '-h'])
api.Backend.cli.run(['trust-add', '-h'])
assert unicode(api.Command.trust_add.doc).strip() in help_ctx.stdout

519
ipatests/test_cmdline/test_ipagetkeytab.py Normal file → Executable file
View File

@@ -20,134 +20,479 @@
Test `ipa-getkeytab`
"""
from __future__ import absolute_import
import os
import shutil
from cmdline import cmdline_test
from ipalib import api
from ipalib import errors
import tempfile
from ipapython import ipautil
import nose
import tempfile
import krbV
import gssapi
import pytest
from ipapython.ipautil import private_ccache
from ipalib import api, errors
from ipalib.request import context
from ipaplatform.paths import paths
from ipapython import ipautil, ipaldap
from ipaserver.plugins.ldap2 import ldap2
from ipapython.dn import DN
from ipatests.test_cmdline.cmdline import cmdline_test
from ipatests.test_xmlrpc.tracker import host_plugin, service_plugin
from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, add_oc
from contextlib import contextmanager
@contextmanager
def use_keytab(principal, keytab):
try:
tmpdir = tempfile.mkdtemp(prefix = "tmp-")
ccache_file = 'FILE:%s/ccache' % tmpdir
krbcontext = krbV.default_context()
principal = str(principal)
keytab = krbV.Keytab(name=keytab, context=krbcontext)
principal = krbV.Principal(name=principal, context=krbcontext)
os.environ['KRB5CCNAME'] = ccache_file
ccache = krbV.CCache(name=ccache_file, context=krbcontext, primary_principal=principal)
ccache.init(principal)
ccache.init_creds_keytab(keytab=keytab, principal=principal)
conn = ldap2(shared_instance=False, ldap_uri=api.env.ldap_uri, base_dn=api.env.basedn)
conn.connect(ccache=ccache)
conn.disconnect()
except krbV.Krb5Error, e:
raise StandardError('Unable to bind to LDAP. Error initializing principal %s in %s: %s' % (principal.name, keytab, str(e)))
finally:
del os.environ['KRB5CCNAME']
if tmpdir:
shutil.rmtree(tmpdir)
with private_ccache() as ccache_file:
try:
old_principal = getattr(context, 'principal', None)
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
store = {'ccache': ccache_file,
'client_keytab': keytab}
gssapi.Credentials(name=name, usage='initiate', store=store)
conn = ldap2(api)
conn.connect(ccache=ccache_file,
autobind=ipaldap.AUTOBIND_DISABLED)
yield conn
conn.disconnect()
except gssapi.exceptions.GSSError as e:
raise Exception('Unable to bind to LDAP. Error initializing '
'principal %s in %s: %s' % (principal, keytab,
str(e)))
finally:
setattr(context, 'principal', old_principal)
class test_ipagetkeytab(cmdline_test):
@pytest.fixture(scope='class')
def test_host(request):
host_tracker = host_plugin.HostTracker(u'test-host')
return host_tracker.make_fixture(request)
@pytest.fixture(scope='class')
def test_service(request, test_host, keytab_retrieval_setup):
service_tracker = service_plugin.ServiceTracker(u'srv', test_host.name)
test_host.ensure_exists()
return service_tracker.make_fixture(request)
@pytest.mark.needs_ipaapi
class KeytabRetrievalTest(cmdline_test):
"""
Base class for keytab retrieval tests
"""
command = "ipa-getkeytab"
keytabname = None
@pytest.fixture(autouse=True, scope="class")
def keytab_retrieval_setup(self, request, cmdline_setup):
cls = request.cls
keytabfd, keytabname = tempfile.mkstemp()
os.close(keytabfd)
os.unlink(keytabname)
cls.keytabname = keytabname
def fin():
try:
os.unlink(cls.keytabname)
except OSError:
pass
request.addfinalizer(fin)
def run_ipagetkeytab(self, service_principal, args=tuple(),
raiseonerr=False, stdin=None):
new_args = [self.command,
"-p", service_principal,
"-k", self.keytabname]
if not args:
new_args.extend(['-s', api.env.host])
else:
new_args.extend(list(args))
return ipautil.run(
new_args,
stdin=stdin,
raiseonerr=raiseonerr,
capture_error=True)
def assert_success(self, *args, **kwargs):
result = self.run_ipagetkeytab(*args, **kwargs)
expected = 'Keytab successfully retrieved and stored in: %s\n' % (
self.keytabname)
assert expected in result.error_output, (
'Success message not in output:\n%s' % result.error_output)
def assert_failure(self, retcode, message, *args, **kwargs):
result = self.run_ipagetkeytab(*args, **kwargs)
err = result.error_output
assert message in err
rc = result.returncode
assert rc == retcode
@pytest.mark.tier0
class test_ipagetkeytab(KeytabRetrievalTest):
"""
Test `ipa-getkeytab`.
"""
command = "ipa-getkeytab"
host_fqdn = u'ipatest.%s' % api.env.domain
service_princ = u'test/%s@%s' % (host_fqdn, api.env.realm)
[keytabfd, keytabname] = tempfile.mkstemp()
os.close(keytabfd)
keytabname = None
def test_0_setup(self):
"""
Create a host to test against.
"""
# Create the service
try:
api.Command['host_add'](self.host_fqdn, force=True)
except errors.DuplicateEntry:
# it already exists, no problem
pass
def test_1_run(self):
def test_1_run(self, test_service):
"""
Create a keytab with `ipa-getkeytab` for a non-existent service.
"""
new_args = [self.command,
"-s", api.env.host,
"-p", "test/notfound.example.com",
"-k", self.keytabname,
]
(out, err, rc) = ipautil.run(new_args, stdin=None, raiseonerr=False)
test_service.ensure_missing()
result = self.run_ipagetkeytab(test_service.name)
err = result.error_output
assert 'Failed to parse result: PrincipalName not found.\n' in err, err
rc = result.returncode
assert rc > 0, rc
def test_2_run(self):
def test_2_run(self, test_service):
"""
Create a keytab with `ipa-getkeytab` for an existing service.
"""
# Create the service
try:
api.Command['service_add'](self.service_princ, force=True)
except errors.DuplicateEntry:
# it already exists, no problem
pass
test_service.ensure_exists()
os.unlink(self.keytabname)
new_args = [self.command,
"-s", api.env.host,
"-p", self.service_princ,
"-k", self.keytabname,
]
try:
(out, err, rc) = ipautil.run(new_args, None)
expected = 'Keytab successfully retrieved and stored in: %s\n' % (
self.keytabname)
assert expected in err, 'Success message not in output:\n%s' % err
except ipautil.CalledProcessError, e:
assert (False)
self.assert_success(test_service.name, raiseonerr=True)
def test_3_use(self):
def test_3_use(self, test_service):
"""
Try to use the service keytab.
"""
use_keytab(self.service_princ, self.keytabname)
with use_keytab(test_service.name, self.keytabname) as conn:
assert conn.can_read(test_service.dn, 'objectclass') is True
assert getattr(context, 'principal') == test_service.name
def test_4_disable(self):
def test_4_disable(self, test_service):
"""
Disable a kerberos principal
"""
retrieve_cmd = test_service.make_retrieve_command()
result = retrieve_cmd()
# Verify that it has a principal key
entry = api.Command['service_show'](self.service_princ)['result']
assert(entry['has_keytab'] == True)
assert result[u'result'][u'has_keytab']
# Disable it
api.Command['service_disable'](self.service_princ)
disable_cmd = test_service.make_disable_command()
disable_cmd()
# Verify that it looks disabled
entry = api.Command['service_show'](self.service_princ)['result']
assert(entry['has_keytab'] == False)
result = retrieve_cmd()
assert not result[u'result'][u'has_keytab']
def test_5_use_disabled(self):
def test_5_use_disabled(self, test_service):
"""
Try to use the disabled keytab
"""
try:
use_keytab(self.service_princ, self.keytabname)
except StandardError, errmsg:
with use_keytab(test_service.name, self.keytabname) as conn:
assert conn.can_read(test_service.dn, 'objectclass') is True
assert getattr(context, 'principal') == test_service.name
except Exception as errmsg:
assert('Unable to bind to LDAP. Error initializing principal' in str(errmsg))
def test_9_cleanup(self):
def test_6_quiet_mode(self, test_service):
"""
Clean up test data
Try to use quiet mode
"""
# First create the host that will use this policy
os.unlink(self.keytabname)
api.Command['host_del'](self.host_fqdn)
test_service.ensure_exists()
# getkeytab without quiet mode option enabled
result = self.run_ipagetkeytab(test_service.name)
err = result.error_output.split("\n")[0]
assert err == f"Keytab successfully retrieved and stored in:" \
f" {self.keytabname}"
assert result.returncode == 0
# getkeytab with quiet mode option enabled
result1 = self.run_ipagetkeytab(test_service.name, args=tuple("-q"))
assert result1.returncode == 0
def test_7_server_name_check(self, test_service):
"""
Try to use -s for server name
"""
test_service.ensure_exists()
self.assert_success(test_service.name, args=["-s", api.env.host])
def test_8_keytab_encryption_check(self, test_service):
"""
Try to use -e for different types of encryption check
"""
encryptes_list = [
"aes256-cts-hmac-sha1-96",
"aes128-cts-hmac-sha256-128",
]
self.assert_success(
test_service.name, args=["-e", ",".join(encryptes_list)]
)
def test_dangling_symlink(self, test_service):
# see https://pagure.io/freeipa/issue/4607
test_service.ensure_exists()
fd, symlink_target = tempfile.mkstemp()
os.close(fd)
os.unlink(symlink_target)
# create dangling symlink
os.symlink(self.keytabname, symlink_target)
try:
self.assert_success(test_service.name, raiseonerr=True)
assert os.path.isfile(symlink_target)
assert os.path.samefile(self.keytabname, symlink_target)
finally:
os.unlink(symlink_target)
def retrieve_dm_password():
dmpw_file = os.path.join(api.env.dot_ipa, '.dmpw')
if not os.path.isfile(dmpw_file):
raise errors.NotFound(reason='{} file required '
'for this test'.format(dmpw_file))
with open(dmpw_file, 'r') as f:
dm_password = f.read().strip()
return dm_password
class TestBindMethods(KeytabRetrievalTest):
"""
Class that tests '-c'/'-H'/'-Y' flags
"""
dm_password = None
ca_cert = None
@pytest.fixture(autouse=True, scope="class")
def bindmethods_setup(self, request, keytab_retrieval_setup):
cls = request.cls
try:
cls.dm_password = retrieve_dm_password()
except errors.NotFound as e:
pytest.skip(e.args)
tempfd, temp_ca_cert = tempfile.mkstemp()
os.close(tempfd)
shutil.copy(os.path.join(paths.IPA_CA_CRT), temp_ca_cert)
cls.ca_cert = temp_ca_cert
def fin():
try:
os.unlink(cls.ca_cert)
except OSError:
pass
request.addfinalizer(fin)
def check_ldapi(self):
if not api.env.ldap_uri.startswith('ldapi://'):
pytest.skip("LDAP URI not pointing to LDAPI socket")
def test_retrieval_with_dm_creds(self, test_service):
test_service.ensure_exists()
self.assert_success(
test_service.name,
args=[
'-D', "cn=Directory Manager",
'-w', self.dm_password,
'-s', api.env.host])
def test_retrieval_using_plain_ldap(self, test_service):
test_service.ensure_exists()
ldap_uri = 'ldap://{}'.format(api.env.host)
self.assert_success(
test_service.name,
args=[
'-D', "cn=Directory Manager",
'-w', self.dm_password,
'-H', ldap_uri])
@pytest.mark.skipif(os.geteuid() != 0,
reason="Must have root privileges to run this test")
def test_retrieval_using_ldapi_external(self, test_service):
test_service.ensure_exists()
self.check_ldapi()
self.assert_success(
test_service.name,
args=[
'-Y',
'EXTERNAL',
'-H', api.env.ldap_uri])
def test_retrieval_using_ldap_gssapi(self, test_service):
test_service.ensure_exists()
self.check_ldapi()
self.assert_success(
test_service.name,
args=[
'-Y',
'GSSAPI',
'-H', api.env.ldap_uri])
def test_retrieval_using_ldaps_ca_cert(self, test_service):
test_service.ensure_exists()
self.assert_success(
test_service.name,
args=[
'-D', "cn=Directory Manager",
'-w', self.dm_password,
'-H', 'ldaps://{}'.format(api.env.host),
'--cacert', self.ca_cert])
def test_ldap_uri_server_raises_error(self, test_service):
test_service.ensure_exists()
self.assert_failure(
2,
"Cannot specify server and LDAP uri simultaneously",
test_service.name,
args=[
'-H', 'ldaps://{}'.format(api.env.host),
'-s', api.env.host],
raiseonerr=False)
def test_invalid_mech_raises_error(self, test_service):
test_service.ensure_exists()
self.assert_failure(
2,
"Invalid SASL bind mechanism",
test_service.name,
args=[
'-H', 'ldaps://{}'.format(api.env.host),
'-Y', 'BOGUS'],
raiseonerr=False)
def test_mech_bind_dn_raises_error(self, test_service):
test_service.ensure_exists()
self.assert_failure(
2,
"Cannot specify both SASL mechanism and bind DN simultaneously",
test_service.name,
args=[
'-D', "cn=Directory Manager",
'-w', self.dm_password,
'-H', 'ldaps://{}'.format(api.env.host),
'-Y', 'EXTERNAL'],
raiseonerr=False)
class SMBServiceTracker(service_plugin.ServiceTracker):
def __init__(self, name, host_fqdn, options=None):
super(SMBServiceTracker, self).__init__(name, host_fqdn,
options=options)
# Create SMB service principal that has POSIX attributes to allow
# generating SID and adding proper objectclasses
self.create_keys |= {u'uidnumber', u'gidnumber'}
self.options[u'addattr'] = [
u'objectclass=ipaIDObject', u'uidNumber=-1', u'gidNumber=-1']
def track_create(self, **options):
super(SMBServiceTracker, self).track_create(**options)
self.attrs[u'uidnumber'] = [fuzzy_digits]
self.attrs[u'gidnumber'] = [fuzzy_digits]
self.attrs[u'objectclass'].append(u'ipaIDObject')
@pytest.fixture(scope='class')
def test_smb_svc(request, test_host, smb_service_setup):
service_tracker = SMBServiceTracker(u'cifs', test_host.name)
test_host.ensure_exists()
return service_tracker.make_fixture(request)
@pytest.mark.tier0
@pytest.mark.skipif(u'ipantuserattrs' not in add_oc([], u'ipantuserattrs'),
reason="Must have trust support enabled for this test")
class test_smb_service(KeytabRetrievalTest):
"""
Test `ipa-getkeytab` for retrieving explicit enctypes
"""
command = "ipa-getkeytab"
dm_password = None
keytabname = None
@pytest.fixture(autouse=True, scope="class")
def smb_service_setup(self, request, keytab_retrieval_setup):
cls = request.cls
try:
cls.dm_password = retrieve_dm_password()
except errors.NotFound as e:
pytest.skip(e.args)
def test_create(self, test_smb_svc):
"""
Create a keytab with `ipa-getkeytab` for an existing service.
"""
test_smb_svc.ensure_exists()
# Request a keytab with explicit encryption types
enctypes = ['aes128-cts-hmac-sha1-96',
'aes256-cts-hmac-sha1-96', 'arcfour-hmac']
args = ['-e', ','.join(enctypes), '-s', api.env.host]
self.assert_success(test_smb_svc.name, args=args, raiseonerr=True)
def test_use(self, test_smb_svc):
"""
Try to use the service keytab to regenerate ipaNTHash value
"""
# Step 1. Extend objectclass to allow ipaNTHash attribute
# We cannot verify write access to objectclass
with use_keytab(test_smb_svc.name, self.keytabname) as conn:
entry = conn.get_entry(test_smb_svc.dn, ['objectclass'])
entry['objectclass'].extend(['ipaNTUserAttrs'])
try:
conn.update_entry(entry)
except errors.ACIError:
assert ('No correct ACI to the allow ipaNTUserAttrs '
'for SMB service' in "failure")
# Step 2. With ipaNTUserAttrs in place, we can ask to regenerate
# ipaNTHash value. We can also verify it is possible to write to
# ipaNTHash attribute while being an SMB service
with use_keytab(test_smb_svc.name, self.keytabname) as conn:
assert conn.can_write(test_smb_svc.dn, 'ipaNTHash') is True
entry = conn.get_entry(test_smb_svc.dn, ['ipaNTHash'])
entry['ipanthash'] = b'MagicRegen'
try:
conn.update_entry(entry)
except errors.ACIError:
assert ("No correct ACI to the ipaNTHash for SMB service"
in "failure")
except errors.EmptyResult:
assert "No arcfour-hmac in Kerberos keys" in "failure"
except errors.DatabaseError:
# Most likely ipaNTHash already existed -- we either get
# OPERATIONS_ERROR or UNWILLING_TO_PERFORM, both map to
# the same DatabaseError class.
assert "LDAP Entry corruption after generation" in "failure"
# Update succeeded, now we have either MagicRegen (broken) or
# a real NT hash in the entry. However, we can only retrieve it as
# a cn=Directory Manager. When bind_dn is None, ldap2.connect() wil
# default to cn=Directory Manager.
conn = ldap2(api)
conn.connect(bind_dn=None, bind_pw=self.dm_password,
autobind=ipaldap.AUTOBIND_DISABLED)
entry = conn.retrieve(test_smb_svc.dn, ['ipaNTHash'])
ipanthash = entry.single_value.get('ipanthash')
conn.disconnect()
assert ipanthash != b'MagicRegen', 'LDBM backend entry corruption'