Imported Upstream version 4.0.5
This commit is contained in:
@@ -1,7 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
|
||||
#
|
||||
import ipatests.util
|
||||
|
||||
|
||||
ipatests.util.check_ipaclient_unittests()
|
||||
@@ -22,6 +22,7 @@ Base class for all cmdline tests
|
||||
"""
|
||||
|
||||
import nose
|
||||
import krbV
|
||||
import distutils.spawn
|
||||
import os
|
||||
|
||||
@@ -32,14 +33,16 @@ from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test
|
||||
from ipaserver.plugins.ldap2 import ldap2
|
||||
|
||||
# See if our LDAP server is up and we can talk to it over GSSAPI
|
||||
ccache = krbV.default_context().default_ccache()
|
||||
|
||||
try:
|
||||
conn = ldap2(api)
|
||||
conn.connect()
|
||||
conn = ldap2(shared_instance=False, ldap_uri=api.env.ldap_uri, base_dn=api.env.basedn)
|
||||
conn.connect(ccache=ccache)
|
||||
conn.disconnect()
|
||||
server_available = True
|
||||
except errors.DatabaseError:
|
||||
server_available = False
|
||||
except Exception as e:
|
||||
except Exception, e:
|
||||
server_available = False
|
||||
|
||||
class cmdline_test(XMLRPC_test):
|
||||
@@ -49,22 +52,27 @@ class cmdline_test(XMLRPC_test):
|
||||
# some reasonable default command
|
||||
command = paths.LS
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
def setUp(self):
|
||||
# Find the executable in $PATH
|
||||
# This is neded because ipautil.run resets the PATH to
|
||||
# a system default.
|
||||
original_command = cls.command
|
||||
if not os.path.isabs(cls.command):
|
||||
cls.command = distutils.spawn.find_executable(cls.command)
|
||||
original_command = self.command
|
||||
if not os.path.isabs(self.command):
|
||||
self.command = distutils.spawn.find_executable(self.command)
|
||||
# raise an error if the command is missing even if the remote
|
||||
# server is not available.
|
||||
if not cls.command:
|
||||
if not self.command:
|
||||
raise AssertionError(
|
||||
'Command %r not available' % original_command
|
||||
)
|
||||
super(cmdline_test, cls).setup_class()
|
||||
super(cmdline_test, self).setUp()
|
||||
if not server_available:
|
||||
raise nose.SkipTest(
|
||||
'Server not available: %r' % api.env.xmlrpc_uri
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
"""
|
||||
nose tear-down fixture.
|
||||
"""
|
||||
super(cmdline_test, self).tearDown()
|
||||
|
||||
@@ -1,28 +1,15 @@
|
||||
import contextlib
|
||||
import os
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
import contextlib
|
||||
import StringIO
|
||||
|
||||
import nose
|
||||
import six
|
||||
from six import StringIO
|
||||
|
||||
from ipatests import util
|
||||
from ipalib import api, errors
|
||||
import pytest
|
||||
|
||||
if six.PY3:
|
||||
unicode = str
|
||||
|
||||
TEST_ZONE = u'zoneadd.%(domain)s' % api.env
|
||||
|
||||
HERE = os.path.abspath(os.path.dirname(__file__))
|
||||
BASE_DIR = os.path.abspath(os.path.join(HERE, os.pardir, os.pardir))
|
||||
from ipapython.version import API_VERSION
|
||||
|
||||
|
||||
@pytest.mark.tier0
|
||||
@pytest.mark.needs_ipaapi
|
||||
class TestCLIParsing(object):
|
||||
"""Tests that commandlines are correctly parsed to Command keyword args
|
||||
"""
|
||||
@@ -39,7 +26,7 @@ class TestCLIParsing(object):
|
||||
def run_command(self, command_name, **kw):
|
||||
"""Run a command on the server"""
|
||||
if not api.Backend.rpcclient.isconnected():
|
||||
api.Backend.rpcclient.connect()
|
||||
api.Backend.rpcclient.connect(fallback=False)
|
||||
try:
|
||||
api.Command[command_name](**kw)
|
||||
except errors.NetworkError:
|
||||
@@ -50,29 +37,43 @@ class TestCLIParsing(object):
|
||||
def fake_stdin(self, string_in):
|
||||
"""Context manager that temporarily replaces stdin to read a string"""
|
||||
old_stdin = sys.stdin
|
||||
sys.stdin = StringIO(string_in)
|
||||
sys.stdin = StringIO.StringIO(string_in)
|
||||
yield
|
||||
sys.stdin = old_stdin
|
||||
|
||||
def test_ping(self):
|
||||
self.check_command('ping', 'ping')
|
||||
|
||||
def test_plugins(self):
|
||||
self.check_command('plugins', 'plugins')
|
||||
self.check_command('ping', 'ping',
|
||||
version=API_VERSION)
|
||||
|
||||
def test_user_show(self):
|
||||
self.check_command('user-show admin', 'user_show', uid=u'admin')
|
||||
self.check_command('user-show admin', 'user_show',
|
||||
uid=u'admin',
|
||||
rights=False,
|
||||
no_members=False,
|
||||
raw=False,
|
||||
all=False,
|
||||
version=API_VERSION)
|
||||
|
||||
def test_user_show_underscore(self):
|
||||
self.check_command('user_show admin', 'user_show', uid=u'admin')
|
||||
self.check_command('user_show admin', 'user_show',
|
||||
uid=u'admin',
|
||||
rights=False,
|
||||
no_members=False,
|
||||
raw=False,
|
||||
all=False,
|
||||
version=API_VERSION)
|
||||
|
||||
def test_group_add(self):
|
||||
self.check_command(
|
||||
'group-add tgroup1 --desc="Test group"',
|
||||
self.check_command('group-add tgroup1 --desc="Test group"',
|
||||
'group_add',
|
||||
cn=u'tgroup1',
|
||||
description=u'Test group',
|
||||
)
|
||||
nonposix=False,
|
||||
external=False,
|
||||
no_members=False,
|
||||
raw=False,
|
||||
all=False,
|
||||
version=API_VERSION)
|
||||
|
||||
def test_sudocmdgroup_add_member(self):
|
||||
# Test CSV splitting is not done
|
||||
@@ -82,73 +83,93 @@ class TestCLIParsing(object):
|
||||
'sudocmdgroup_add_member',
|
||||
cn=u'tcmdgroup1',
|
||||
sudocmd=[u'ab,c', u'd'],
|
||||
)
|
||||
no_members=False,
|
||||
raw=False,
|
||||
all=False,
|
||||
version=API_VERSION)
|
||||
|
||||
def test_group_add_nonposix(self):
|
||||
self.check_command(
|
||||
'group-add tgroup1 --desc="Test group" --nonposix',
|
||||
self.check_command('group-add tgroup1 --desc="Test group" --nonposix',
|
||||
'group_add',
|
||||
cn=u'tgroup1',
|
||||
description=u'Test group',
|
||||
nonposix=True,
|
||||
)
|
||||
external=False,
|
||||
no_members=False,
|
||||
raw=False,
|
||||
all=False,
|
||||
version=API_VERSION)
|
||||
|
||||
def test_group_add_gid(self):
|
||||
self.check_command(
|
||||
'group-add tgroup1 --desc="Test group" --gid=1234',
|
||||
self.check_command('group-add tgroup1 --desc="Test group" --gid=1234',
|
||||
'group_add',
|
||||
cn=u'tgroup1',
|
||||
description=u'Test group',
|
||||
gidnumber=u'1234',
|
||||
)
|
||||
nonposix=False,
|
||||
external=False,
|
||||
no_members=False,
|
||||
raw=False,
|
||||
all=False,
|
||||
version=API_VERSION)
|
||||
|
||||
def test_group_add_interactive(self):
|
||||
with self.fake_stdin('Test group\n'):
|
||||
self.check_command(
|
||||
'group-add tgroup1', 'group_add',
|
||||
self.check_command('group-add tgroup1', 'group_add',
|
||||
cn=u'tgroup1',
|
||||
)
|
||||
description=u'Test group',
|
||||
nonposix=False,
|
||||
external=False,
|
||||
no_members=False,
|
||||
raw=False,
|
||||
all=False,
|
||||
version=API_VERSION)
|
||||
|
||||
def test_dnsrecord_add(self):
|
||||
self.check_command(
|
||||
'dnsrecord-add %s ns --a-rec=1.2.3.4' % TEST_ZONE,
|
||||
self.check_command('dnsrecord-add test-example.com ns --a-rec=1.2.3.4',
|
||||
'dnsrecord_add',
|
||||
dnszoneidnsname=TEST_ZONE,
|
||||
dnszoneidnsname=u'test-example.com',
|
||||
idnsname=u'ns',
|
||||
arecord=u'1.2.3.4',
|
||||
)
|
||||
structured=False,
|
||||
force=False,
|
||||
raw=False,
|
||||
all=False,
|
||||
version=API_VERSION)
|
||||
|
||||
def test_dnsrecord_del_all(self):
|
||||
try:
|
||||
self.run_command('dnszone_add', idnsname=TEST_ZONE)
|
||||
self.run_command('dnszone_add', idnsname=u'test-example.com',
|
||||
idnssoamname=u'ns.test-example.com', force=True)
|
||||
except errors.NotFound:
|
||||
raise nose.SkipTest('DNS is not configured')
|
||||
try:
|
||||
self.run_command('dnsrecord_add',
|
||||
dnszoneidnsname=TEST_ZONE,
|
||||
idnsname=u'ns', arecord=u'1.2.3.4', force=True)
|
||||
dnszoneidnsname=u'test-example.com',
|
||||
idnsname=u'ns', arecord=u'1.2.3.4')
|
||||
with self.fake_stdin('yes\n'):
|
||||
self.check_command(
|
||||
'dnsrecord_del %s ns' % TEST_ZONE,
|
||||
self.check_command('dnsrecord_del test-example.com ns',
|
||||
'dnsrecord_del',
|
||||
dnszoneidnsname=TEST_ZONE,
|
||||
dnszoneidnsname=u'test-example.com',
|
||||
idnsname=u'ns',
|
||||
del_all=True,
|
||||
)
|
||||
structured=False,
|
||||
version=API_VERSION)
|
||||
with self.fake_stdin('YeS\n'):
|
||||
self.check_command(
|
||||
'dnsrecord_del %s ns' % TEST_ZONE,
|
||||
self.check_command('dnsrecord_del test-example.com ns',
|
||||
'dnsrecord_del',
|
||||
dnszoneidnsname=TEST_ZONE,
|
||||
dnszoneidnsname=u'test-example.com',
|
||||
idnsname=u'ns',
|
||||
del_all=True,
|
||||
)
|
||||
structured=False,
|
||||
version=API_VERSION)
|
||||
finally:
|
||||
self.run_command('dnszone_del', idnsname=TEST_ZONE)
|
||||
self.run_command('dnszone_del', idnsname=u'test-example.com')
|
||||
|
||||
def test_dnsrecord_del_one_by_one(self):
|
||||
try:
|
||||
self.run_command('dnszone_add', idnsname=TEST_ZONE)
|
||||
self.run_command('dnszone_add', idnsname=u'test-example.com',
|
||||
idnssoamname=u'ns.test-example.com', force=True)
|
||||
except errors.NotFound:
|
||||
raise nose.SkipTest('DNS is not configured')
|
||||
try:
|
||||
@@ -156,96 +177,161 @@ class TestCLIParsing(object):
|
||||
u'2 1 FD2693C1EFFC11A8D2BE57229212A04B45663791')
|
||||
for record in records:
|
||||
self.run_command('dnsrecord_add',
|
||||
dnszoneidnsname=TEST_ZONE, idnsname=u'ns',
|
||||
dnszoneidnsname=u'test-example.com', idnsname=u'ns',
|
||||
sshfprecord=record)
|
||||
with self.fake_stdin('no\nyes\nyes\n'):
|
||||
self.check_command(
|
||||
'dnsrecord_del %s ns' % TEST_ZONE,
|
||||
self.check_command('dnsrecord_del test-example.com ns',
|
||||
'dnsrecord_del',
|
||||
dnszoneidnsname=TEST_ZONE,
|
||||
dnszoneidnsname=u'test-example.com',
|
||||
idnsname=u'ns',
|
||||
del_all=False,
|
||||
sshfprecord=records,
|
||||
)
|
||||
structured=False,
|
||||
version=API_VERSION)
|
||||
finally:
|
||||
self.run_command('dnszone_del', idnsname=TEST_ZONE)
|
||||
self.run_command('dnszone_del', idnsname=u'test-example.com')
|
||||
|
||||
def test_dnsrecord_add_ask_for_missing_fields(self):
|
||||
sshfp_parts = (1, 1, u'E3B72BA346B90570EED94BE9334E34AA795CED23')
|
||||
|
||||
with self.fake_stdin('SSHFP\n%d\n%d\n%s' % sshfp_parts):
|
||||
self.check_command(
|
||||
'dnsrecord-add %s sshfp' % TEST_ZONE,
|
||||
self.check_command('dnsrecord-add test-example.com sshfp',
|
||||
'dnsrecord_add',
|
||||
dnszoneidnsname=TEST_ZONE,
|
||||
dnszoneidnsname=u'test-example.com',
|
||||
idnsname=u'sshfp',
|
||||
sshfp_part_fp_type=sshfp_parts[0],
|
||||
sshfp_part_algorithm=sshfp_parts[1],
|
||||
sshfp_part_fingerprint=sshfp_parts[2],
|
||||
)
|
||||
|
||||
# test with lowercase record type
|
||||
with self.fake_stdin('sshfp\n%d\n%d\n%s' % sshfp_parts):
|
||||
self.check_command(
|
||||
'dnsrecord-add %s sshfp' % TEST_ZONE,
|
||||
'dnsrecord_add',
|
||||
dnszoneidnsname=TEST_ZONE,
|
||||
idnsname=u'sshfp',
|
||||
sshfp_part_fp_type=sshfp_parts[0],
|
||||
sshfp_part_algorithm=sshfp_parts[1],
|
||||
sshfp_part_fingerprint=sshfp_parts[2],
|
||||
)
|
||||
structured=False,
|
||||
raw=False,
|
||||
all=False,
|
||||
force=False,
|
||||
version=API_VERSION)
|
||||
|
||||
# NOTE: when a DNS record part is passed via command line, it is not
|
||||
# converted to its base type when transfered via wire
|
||||
with self.fake_stdin('%d\n%s' % (sshfp_parts[1], sshfp_parts[2])):
|
||||
self.check_command(
|
||||
'dnsrecord-add %s sshfp --sshfp-algorithm=%d' % (
|
||||
TEST_ZONE, sshfp_parts[0]),
|
||||
self.check_command('dnsrecord-add test-example.com sshfp ' \
|
||||
'--sshfp-algorithm=%d' % sshfp_parts[0],
|
||||
'dnsrecord_add',
|
||||
dnszoneidnsname=TEST_ZONE,
|
||||
dnszoneidnsname=u'test-example.com',
|
||||
idnsname=u'sshfp',
|
||||
sshfp_part_fp_type=sshfp_parts[0],
|
||||
# passed via cmdline
|
||||
sshfp_part_algorithm=unicode(sshfp_parts[1]),
|
||||
sshfp_part_algorithm=unicode(sshfp_parts[1]), # passed via cmdline
|
||||
sshfp_part_fingerprint=sshfp_parts[2],
|
||||
)
|
||||
structured=False,
|
||||
raw=False,
|
||||
all=False,
|
||||
force=False,
|
||||
version=API_VERSION)
|
||||
|
||||
with self.fake_stdin(sshfp_parts[2]):
|
||||
self.check_command(
|
||||
'dnsrecord-add %s sshfp --sshfp-algorithm=%d '
|
||||
'--sshfp-fp-type=%d' % (
|
||||
TEST_ZONE, sshfp_parts[0], sshfp_parts[1]),
|
||||
self.check_command('dnsrecord-add test-example.com sshfp ' \
|
||||
'--sshfp-algorithm=%d --sshfp-fp-type=%d' % (sshfp_parts[0], sshfp_parts[1]),
|
||||
'dnsrecord_add',
|
||||
dnszoneidnsname=TEST_ZONE,
|
||||
dnszoneidnsname=u'test-example.com',
|
||||
idnsname=u'sshfp',
|
||||
# passed via cmdline
|
||||
sshfp_part_fp_type=unicode(sshfp_parts[0]),
|
||||
# passed via cmdline
|
||||
sshfp_part_algorithm=unicode(sshfp_parts[1]),
|
||||
sshfp_part_fp_type=unicode(sshfp_parts[0]), # passed via cmdline
|
||||
sshfp_part_algorithm=unicode(sshfp_parts[1]), # passed via cmdline
|
||||
sshfp_part_fingerprint=sshfp_parts[2],
|
||||
)
|
||||
structured=False,
|
||||
raw=False,
|
||||
all=False,
|
||||
force=False,
|
||||
version=API_VERSION)
|
||||
|
||||
def test_dnsrecord_del_comma(self):
|
||||
try:
|
||||
self.run_command(
|
||||
'dnszone_add', idnsname=TEST_ZONE)
|
||||
'dnszone_add', idnsname=u'test-example.com',
|
||||
idnssoamname=u'ns.test-example.com', force=True)
|
||||
except errors.NotFound:
|
||||
raise nose.SkipTest('DNS is not configured')
|
||||
try:
|
||||
self.run_command(
|
||||
'dnsrecord_add',
|
||||
dnszoneidnsname=TEST_ZONE,
|
||||
dnszoneidnsname=u'test-example.com',
|
||||
idnsname=u'test',
|
||||
txtrecord=u'"A pretty little problem," said Holmes.')
|
||||
with self.fake_stdin('no\nyes\n'):
|
||||
self.check_command(
|
||||
'dnsrecord_del %s test' % TEST_ZONE,
|
||||
'dnsrecord_del test-example.com test',
|
||||
'dnsrecord_del',
|
||||
dnszoneidnsname=TEST_ZONE,
|
||||
dnszoneidnsname=u'test-example.com',
|
||||
idnsname=u'test',
|
||||
txtrecord=[u'"A pretty little problem," said Holmes.'])
|
||||
del_all=False,
|
||||
txtrecord=[u'"A pretty little problem," said Holmes.'],
|
||||
structured=False,
|
||||
version=API_VERSION)
|
||||
finally:
|
||||
self.run_command('dnszone_del', idnsname=TEST_ZONE)
|
||||
self.run_command('dnszone_del', idnsname=u'test-example.com')
|
||||
|
||||
def test_dnszone_add(self):
|
||||
"""
|
||||
Test dnszone-add with nameserver IP passed interatively
|
||||
"""
|
||||
# Pass IP of nameserver interactively for nameserver in zone
|
||||
# (absolute name)
|
||||
with self.fake_stdin('1.1.1.1\n'):
|
||||
self.check_command(
|
||||
'dnszone_add example.com --name-server=ns.example.com. '
|
||||
'--admin-email=admin@example.com',
|
||||
'dnszone_add',
|
||||
idnsname=u'example.com',
|
||||
idnssoamname=u'ns.example.com.',
|
||||
idnssoarname=u'admin@example.com',
|
||||
ip_address=u'1.1.1.1',
|
||||
idnssoaexpire=util.Fuzzy(type=int),
|
||||
idnssoaserial=util.Fuzzy(type=int),
|
||||
idnssoaretry=util.Fuzzy(type=int),
|
||||
idnssoaminimum=util.Fuzzy(type=int),
|
||||
idnssoarefresh=util.Fuzzy(type=int),
|
||||
all=False,
|
||||
raw=False,
|
||||
force=False,
|
||||
version=API_VERSION
|
||||
)
|
||||
|
||||
# Pass IP of nameserver interactively for nameserver in zone
|
||||
# (relative name)
|
||||
with self.fake_stdin('1.1.1.1\n'):
|
||||
self.check_command(
|
||||
'dnszone_add example.com --name-server=ns '
|
||||
'--admin-email=admin@example.com',
|
||||
'dnszone_add',
|
||||
idnsname=u'example.com',
|
||||
idnssoamname=u'ns',
|
||||
idnssoarname=u'admin@example.com',
|
||||
ip_address=u'1.1.1.1',
|
||||
idnssoaexpire=util.Fuzzy(type=int),
|
||||
idnssoaserial=util.Fuzzy(type=int),
|
||||
idnssoaretry=util.Fuzzy(type=int),
|
||||
idnssoaminimum=util.Fuzzy(type=int),
|
||||
idnssoarefresh=util.Fuzzy(type=int),
|
||||
all=False,
|
||||
raw=False,
|
||||
force=False,
|
||||
version=API_VERSION
|
||||
)
|
||||
|
||||
# Nameserver is outside the zone - no need to pass the IP
|
||||
self.check_command(
|
||||
'dnszone_add example.com --name-server=ns.example.net. '
|
||||
'--admin-email=admin@example.com',
|
||||
'dnszone_add',
|
||||
idnsname=u'example.com',
|
||||
idnssoamname=u'ns.example.net.',
|
||||
idnssoarname=u'admin@example.com',
|
||||
idnssoaexpire=util.Fuzzy(type=int),
|
||||
idnssoaserial=util.Fuzzy(type=int),
|
||||
idnssoaretry=util.Fuzzy(type=int),
|
||||
idnssoaminimum=util.Fuzzy(type=int),
|
||||
idnssoarefresh=util.Fuzzy(type=int),
|
||||
all=False,
|
||||
raw=False,
|
||||
force=False,
|
||||
version=API_VERSION
|
||||
)
|
||||
|
||||
def test_idrange_add(self):
|
||||
"""
|
||||
@@ -261,6 +347,9 @@ class TestCLIParsing(object):
|
||||
ipaidrangesize=u'1',
|
||||
ipabaserid=5,
|
||||
ipasecondarybaserid=500000,
|
||||
all=False,
|
||||
raw=False,
|
||||
version=API_VERSION
|
||||
)
|
||||
|
||||
def test_with_command_line_options():
|
||||
@@ -273,6 +362,9 @@ class TestCLIParsing(object):
|
||||
ipaidrangesize=u'1',
|
||||
ipabaserid=u'5',
|
||||
ipasecondarybaserid=u'500000',
|
||||
all=False,
|
||||
raw=False,
|
||||
version=API_VERSION
|
||||
)
|
||||
|
||||
def test_without_options():
|
||||
@@ -282,6 +374,9 @@ class TestCLIParsing(object):
|
||||
cn=u'range1',
|
||||
ipabaseid=u'1',
|
||||
ipaidrangesize=u'1',
|
||||
all=False,
|
||||
raw=False,
|
||||
version=API_VERSION
|
||||
)
|
||||
|
||||
adtrust_dn = 'cn=ADTRUST,cn=%s,cn=masters,cn=ipa,cn=etc,%s' % \
|
||||
@@ -296,8 +391,8 @@ class TestCLIParsing(object):
|
||||
|
||||
# Create a mock service object to test against
|
||||
adtrust_add = dict(
|
||||
ipaconfigstring=b'enabledService',
|
||||
objectclass=[b'top', b'nsContainer', b'ipaConfigObject']
|
||||
ipaconfigstring='enabledService',
|
||||
objectclass=['top', 'nsContainer', 'ipaConfigObject']
|
||||
)
|
||||
|
||||
mockldap = util.MockLDAP()
|
||||
@@ -311,22 +406,3 @@ class TestCLIParsing(object):
|
||||
|
||||
if not adtrust_is_enabled:
|
||||
mockldap.del_entry(adtrust_dn)
|
||||
|
||||
|
||||
def test_cli_fsencoding():
|
||||
# https://pagure.io/freeipa/issue/5887
|
||||
env = {
|
||||
key: value for key, value in os.environ.items()
|
||||
if not key.startswith(('LC_', 'LANG'))
|
||||
}
|
||||
env['LC_ALL'] = 'C'
|
||||
env['PYTHONPATH'] = BASE_DIR
|
||||
p = subprocess.Popen(
|
||||
[sys.executable, '-m', 'ipaclient', 'help'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
env=env,
|
||||
)
|
||||
out, err = p.communicate()
|
||||
assert p.returncode > 0, (out, err)
|
||||
assert b'System encoding must be UTF-8' in err, (out, err)
|
||||
|
||||
@@ -18,22 +18,15 @@
|
||||
#
|
||||
|
||||
import sys
|
||||
import contextlib
|
||||
import StringIO
|
||||
|
||||
import six
|
||||
from six import StringIO
|
||||
from nose.tools import assert_raises # pylint: disable=E0611
|
||||
|
||||
from ipalib import api, errors
|
||||
from ipaserver.plugins.user import user_add
|
||||
import pytest
|
||||
|
||||
if six.PY3:
|
||||
unicode = str
|
||||
from ipalib.plugins.user import user_add
|
||||
|
||||
|
||||
pytestmark = pytest.mark.needs_ipaapi
|
||||
|
||||
|
||||
@pytest.mark.tier0
|
||||
class CLITestContext(object):
|
||||
"""Context manager that replaces stdout & stderr, and catches SystemExit
|
||||
|
||||
@@ -48,8 +41,8 @@ class CLITestContext(object):
|
||||
|
||||
def __enter__(self):
|
||||
self.old_streams = sys.stdout, sys.stderr
|
||||
self.stdout_fileobj = sys.stdout = StringIO()
|
||||
self.stderr_fileobj = sys.stderr = StringIO()
|
||||
self.stdout_fileobj = sys.stdout = StringIO.StringIO()
|
||||
self.stderr_fileobj = sys.stderr = StringIO.StringIO()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
@@ -117,7 +110,7 @@ def test_command_help():
|
||||
assert h_ctx.stderr == ''
|
||||
|
||||
assert h_ctx.stdout == help_ctx.stdout
|
||||
assert unicode(user_add.doc) in help_ctx.stdout
|
||||
assert unicode(user_add.__doc__) in help_ctx.stdout
|
||||
|
||||
|
||||
def test_ambiguous_command_or_topic():
|
||||
@@ -144,6 +137,6 @@ def test_multiline_description():
|
||||
assert '\n\n' in unicode(api.Command.trust_add.doc).strip()
|
||||
|
||||
with CLITestContext(exception=SystemExit) as help_ctx:
|
||||
api.Backend.cli.run(['trust-add', '-h'])
|
||||
return_value = api.Backend.cli.run(['trust-add', '-h'])
|
||||
|
||||
assert unicode(api.Command.trust_add.doc).strip() in help_ctx.stdout
|
||||
|
||||
@@ -22,304 +22,132 @@ Test `ipa-getkeytab`
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
import gssapi
|
||||
import pytest
|
||||
|
||||
from cmdline import cmdline_test
|
||||
from ipalib import api
|
||||
from ipaplatform.paths import paths
|
||||
from ipapython import ipautil, ipaldap
|
||||
from ipalib import errors
|
||||
import tempfile
|
||||
from ipapython import ipautil
|
||||
import nose
|
||||
import tempfile
|
||||
import krbV
|
||||
from ipaserver.plugins.ldap2 import ldap2
|
||||
from ipatests.test_cmdline.cmdline import cmdline_test
|
||||
from ipatests.test_xmlrpc.tracker import host_plugin, service_plugin
|
||||
from ipapython.dn import DN
|
||||
|
||||
def use_keytab(principal, keytab):
|
||||
try:
|
||||
tmpdir = tempfile.mkdtemp(prefix = "tmp-")
|
||||
ccache_file = 'FILE:%s/ccache' % tmpdir
|
||||
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
|
||||
store = {'ccache': ccache_file,
|
||||
'client_keytab': keytab}
|
||||
krbcontext = krbV.default_context()
|
||||
principal = str(principal)
|
||||
keytab = krbV.Keytab(name=keytab, context=krbcontext)
|
||||
principal = krbV.Principal(name=principal, context=krbcontext)
|
||||
os.environ['KRB5CCNAME'] = ccache_file
|
||||
gssapi.Credentials(name=name, usage='initiate', store=store)
|
||||
conn = ldap2(api)
|
||||
conn.connect(autobind=ipaldap.AUTOBIND_DISABLED)
|
||||
ccache = krbV.CCache(name=ccache_file, context=krbcontext, primary_principal=principal)
|
||||
ccache.init(principal)
|
||||
ccache.init_creds_keytab(keytab=keytab, principal=principal)
|
||||
conn = ldap2(shared_instance=False, ldap_uri=api.env.ldap_uri, base_dn=api.env.basedn)
|
||||
conn.connect(ccache=ccache)
|
||||
conn.disconnect()
|
||||
except gssapi.exceptions.GSSError as e:
|
||||
raise Exception('Unable to bind to LDAP. Error initializing principal %s in %s: %s' % (principal, keytab, str(e)))
|
||||
except krbV.Krb5Error, e:
|
||||
raise StandardError('Unable to bind to LDAP. Error initializing principal %s in %s: %s' % (principal.name, keytab, str(e)))
|
||||
finally:
|
||||
os.environ.pop('KRB5CCNAME', None)
|
||||
del os.environ['KRB5CCNAME']
|
||||
if tmpdir:
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
@pytest.fixture(scope='class')
|
||||
def test_host(request):
|
||||
host_tracker = host_plugin.HostTracker(u'test-host')
|
||||
return host_tracker.make_fixture(request)
|
||||
|
||||
|
||||
@pytest.fixture(scope='class')
|
||||
def test_service(request, test_host):
|
||||
service_tracker = service_plugin.ServiceTracker(u'srv', test_host.name)
|
||||
test_host.ensure_exists()
|
||||
return service_tracker.make_fixture(request)
|
||||
|
||||
|
||||
@pytest.mark.needs_ipaapi
|
||||
class KeytabRetrievalTest(cmdline_test):
|
||||
"""
|
||||
Base class for keytab retrieval tests
|
||||
"""
|
||||
command = "ipa-getkeytab"
|
||||
keytabname = None
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
super(KeytabRetrievalTest, cls).setup_class()
|
||||
|
||||
keytabfd, keytabname = tempfile.mkstemp()
|
||||
|
||||
os.close(keytabfd)
|
||||
os.unlink(keytabname)
|
||||
|
||||
cls.keytabname = keytabname
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
super(KeytabRetrievalTest, cls).teardown_class()
|
||||
|
||||
try:
|
||||
os.unlink(cls.keytabname)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def run_ipagetkeytab(self, service_principal, args=tuple(),
|
||||
raiseonerr=False):
|
||||
new_args = [self.command,
|
||||
"-p", service_principal,
|
||||
"-k", self.keytabname]
|
||||
|
||||
if not args:
|
||||
new_args.extend(['-s', api.env.host])
|
||||
else:
|
||||
new_args.extend(list(args))
|
||||
|
||||
return ipautil.run(
|
||||
new_args,
|
||||
stdin=None,
|
||||
raiseonerr=raiseonerr,
|
||||
capture_error=True)
|
||||
|
||||
def assert_success(self, *args, **kwargs):
|
||||
result = self.run_ipagetkeytab(*args, **kwargs)
|
||||
expected = 'Keytab successfully retrieved and stored in: %s\n' % (
|
||||
self.keytabname)
|
||||
assert expected in result.error_output, (
|
||||
'Success message not in output:\n%s' % result.error_output)
|
||||
|
||||
def assert_failure(self, retcode, message, *args, **kwargs):
|
||||
result = self.run_ipagetkeytab(*args, **kwargs)
|
||||
err = result.error_output
|
||||
|
||||
assert message in err
|
||||
rc = result.returncode
|
||||
assert rc == retcode
|
||||
|
||||
|
||||
@pytest.mark.tier0
|
||||
class test_ipagetkeytab(KeytabRetrievalTest):
|
||||
class test_ipagetkeytab(cmdline_test):
|
||||
"""
|
||||
Test `ipa-getkeytab`.
|
||||
"""
|
||||
command = "ipa-getkeytab"
|
||||
keytabname = None
|
||||
host_fqdn = u'ipatest.%s' % api.env.domain
|
||||
service_princ = u'test/%s@%s' % (host_fqdn, api.env.realm)
|
||||
[keytabfd, keytabname] = tempfile.mkstemp()
|
||||
os.close(keytabfd)
|
||||
|
||||
def test_1_run(self, test_service):
|
||||
def test_0_setup(self):
|
||||
"""
|
||||
Create a host to test against.
|
||||
"""
|
||||
# Create the service
|
||||
try:
|
||||
api.Command['host_add'](self.host_fqdn, force=True)
|
||||
except errors.DuplicateEntry:
|
||||
# it already exists, no problem
|
||||
pass
|
||||
|
||||
def test_1_run(self):
|
||||
"""
|
||||
Create a keytab with `ipa-getkeytab` for a non-existent service.
|
||||
"""
|
||||
test_service.ensure_missing()
|
||||
result = self.run_ipagetkeytab(test_service.name)
|
||||
err = result.error_output
|
||||
|
||||
new_args = [self.command,
|
||||
"-s", api.env.host,
|
||||
"-p", "test/notfound.example.com",
|
||||
"-k", self.keytabname,
|
||||
]
|
||||
(out, err, rc) = ipautil.run(new_args, stdin=None, raiseonerr=False)
|
||||
assert 'Failed to parse result: PrincipalName not found.\n' in err, err
|
||||
rc = result.returncode
|
||||
assert rc > 0, rc
|
||||
|
||||
def test_2_run(self, test_service):
|
||||
def test_2_run(self):
|
||||
"""
|
||||
Create a keytab with `ipa-getkeytab` for an existing service.
|
||||
"""
|
||||
test_service.ensure_exists()
|
||||
# Create the service
|
||||
try:
|
||||
api.Command['service_add'](self.service_princ, force=True)
|
||||
except errors.DuplicateEntry:
|
||||
# it already exists, no problem
|
||||
pass
|
||||
|
||||
self.assert_success(test_service.name, raiseonerr=True)
|
||||
os.unlink(self.keytabname)
|
||||
new_args = [self.command,
|
||||
"-s", api.env.host,
|
||||
"-p", self.service_princ,
|
||||
"-k", self.keytabname,
|
||||
]
|
||||
try:
|
||||
(out, err, rc) = ipautil.run(new_args, None)
|
||||
expected = 'Keytab successfully retrieved and stored in: %s\n' % (
|
||||
self.keytabname)
|
||||
assert expected in err, 'Success message not in output:\n%s' % err
|
||||
except ipautil.CalledProcessError, e:
|
||||
assert (False)
|
||||
|
||||
def test_3_use(self, test_service):
|
||||
def test_3_use(self):
|
||||
"""
|
||||
Try to use the service keytab.
|
||||
"""
|
||||
use_keytab(test_service.name, self.keytabname)
|
||||
use_keytab(self.service_princ, self.keytabname)
|
||||
|
||||
def test_4_disable(self, test_service):
|
||||
def test_4_disable(self):
|
||||
"""
|
||||
Disable a kerberos principal
|
||||
"""
|
||||
retrieve_cmd = test_service.make_retrieve_command()
|
||||
result = retrieve_cmd()
|
||||
# Verify that it has a principal key
|
||||
assert result[u'result'][u'has_keytab']
|
||||
entry = api.Command['service_show'](self.service_princ)['result']
|
||||
assert(entry['has_keytab'] == True)
|
||||
|
||||
# Disable it
|
||||
disable_cmd = test_service.make_disable_command()
|
||||
disable_cmd()
|
||||
api.Command['service_disable'](self.service_princ)
|
||||
|
||||
# Verify that it looks disabled
|
||||
result = retrieve_cmd()
|
||||
assert not result[u'result'][u'has_keytab']
|
||||
entry = api.Command['service_show'](self.service_princ)['result']
|
||||
assert(entry['has_keytab'] == False)
|
||||
|
||||
def test_5_use_disabled(self, test_service):
|
||||
def test_5_use_disabled(self):
|
||||
"""
|
||||
Try to use the disabled keytab
|
||||
"""
|
||||
try:
|
||||
use_keytab(test_service.name, self.keytabname)
|
||||
except Exception as errmsg:
|
||||
use_keytab(self.service_princ, self.keytabname)
|
||||
except StandardError, errmsg:
|
||||
assert('Unable to bind to LDAP. Error initializing principal' in str(errmsg))
|
||||
|
||||
|
||||
class TestBindMethods(KeytabRetrievalTest):
|
||||
"""
|
||||
Class that tests '-c'/'-H'/'-Y' flags
|
||||
"""
|
||||
|
||||
dm_password = None
|
||||
ca_cert = None
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
super(TestBindMethods, cls).setup_class()
|
||||
|
||||
dmpw_file = os.path.join(api.env.dot_ipa, '.dmpw')
|
||||
|
||||
if not os.path.isfile(dmpw_file):
|
||||
pytest.skip('{} file required for this test'.format(dmpw_file))
|
||||
|
||||
with open(dmpw_file, 'r') as f:
|
||||
cls.dm_password = f.read().strip()
|
||||
|
||||
tempfd, temp_ca_cert = tempfile.mkstemp()
|
||||
|
||||
os.close(tempfd)
|
||||
|
||||
shutil.copy(os.path.join(paths.IPA_CA_CRT), temp_ca_cert)
|
||||
|
||||
cls.ca_cert = temp_ca_cert
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
super(TestBindMethods, cls).teardown_class()
|
||||
|
||||
try:
|
||||
os.unlink(cls.ca_cert)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def check_ldapi(self):
|
||||
if not api.env.ldap_uri.startswith('ldapi://'):
|
||||
pytest.skip("LDAP URI not pointing to LDAPI socket")
|
||||
|
||||
def test_retrieval_with_dm_creds(self, test_service):
|
||||
test_service.ensure_exists()
|
||||
|
||||
self.assert_success(
|
||||
test_service.name,
|
||||
args=[
|
||||
'-D', "cn=Directory Manager",
|
||||
'-w', self.dm_password,
|
||||
'-s', api.env.host])
|
||||
|
||||
def test_retrieval_using_plain_ldap(self, test_service):
|
||||
test_service.ensure_exists()
|
||||
ldap_uri = 'ldap://{}'.format(api.env.host)
|
||||
|
||||
self.assert_success(
|
||||
test_service.name,
|
||||
args=[
|
||||
'-D', "cn=Directory Manager",
|
||||
'-w', self.dm_password,
|
||||
'-H', ldap_uri])
|
||||
|
||||
@pytest.mark.skipif(os.geteuid() != 0,
|
||||
reason="Must have root privileges to run this test")
|
||||
def test_retrieval_using_ldapi_external(self, test_service):
|
||||
test_service.ensure_exists()
|
||||
self.check_ldapi()
|
||||
|
||||
self.assert_success(
|
||||
test_service.name,
|
||||
args=[
|
||||
'-Y',
|
||||
'EXTERNAL',
|
||||
'-H', api.env.ldap_uri])
|
||||
|
||||
def test_retrieval_using_ldap_gssapi(self, test_service):
|
||||
test_service.ensure_exists()
|
||||
self.check_ldapi()
|
||||
|
||||
self.assert_success(
|
||||
test_service.name,
|
||||
args=[
|
||||
'-Y',
|
||||
'GSSAPI',
|
||||
'-H', api.env.ldap_uri])
|
||||
|
||||
def test_retrieval_using_ldaps_ca_cert(self, test_service):
|
||||
test_service.ensure_exists()
|
||||
|
||||
self.assert_success(
|
||||
test_service.name,
|
||||
args=[
|
||||
'-D', "cn=Directory Manager",
|
||||
'-w', self.dm_password,
|
||||
'-H', 'ldaps://{}'.format(api.env.host),
|
||||
'--cacert', self.ca_cert])
|
||||
|
||||
def test_ldap_uri_server_raises_error(self, test_service):
|
||||
test_service.ensure_exists()
|
||||
|
||||
self.assert_failure(
|
||||
2,
|
||||
"Cannot specify server and LDAP uri simultaneously",
|
||||
test_service.name,
|
||||
args=[
|
||||
'-H', 'ldaps://{}'.format(api.env.host),
|
||||
'-s', api.env.host],
|
||||
raiseonerr=False)
|
||||
|
||||
def test_invalid_mech_raises_error(self, test_service):
|
||||
test_service.ensure_exists()
|
||||
|
||||
self.assert_failure(
|
||||
2,
|
||||
"Invalid SASL bind mechanism",
|
||||
test_service.name,
|
||||
args=[
|
||||
'-H', 'ldaps://{}'.format(api.env.host),
|
||||
'-Y', 'BOGUS'],
|
||||
raiseonerr=False)
|
||||
|
||||
def test_mech_bind_dn_raises_error(self, test_service):
|
||||
test_service.ensure_exists()
|
||||
|
||||
self.assert_failure(
|
||||
2,
|
||||
"Cannot specify both SASL mechanism and bind DN simultaneously",
|
||||
test_service.name,
|
||||
args=[
|
||||
'-D', "cn=Directory Manager",
|
||||
'-w', self.dm_password,
|
||||
'-H', 'ldaps://{}'.format(api.env.host),
|
||||
'-Y', 'EXTERNAL'],
|
||||
raiseonerr=False)
|
||||
def test_9_cleanup(self):
|
||||
"""
|
||||
Clean up test data
|
||||
"""
|
||||
# First create the host that will use this policy
|
||||
os.unlink(self.keytabname)
|
||||
api.Command['host_del'](self.host_fqdn)
|
||||
|
||||
Reference in New Issue
Block a user