Imported Upstream version 4.3.1

This commit is contained in:
Mario Fetka
2021-08-10 02:37:58 +02:00
parent a791de49a2
commit 2f177da8f2
2056 changed files with 421730 additions and 1668138 deletions

View File

@@ -1,169 +0,0 @@
from __future__ import absolute_import
import os
import pytest
from ipapython.certdb import NSSDatabase, TRUSTED_PEER_TRUST_FLAGS
from ipaplatform.osinfo import osinfo
CERTNICK = 'testcert'
if osinfo.id == 'fedora':
if int(osinfo.version_id) >= 28:
NSS_DEFAULT = 'sql'
else:
NSS_DEFAULT = 'dbm'
else:
NSS_DEFAULT = None
def create_selfsigned(nssdb):
# create self-signed cert + key
noisefile = os.path.join(nssdb.secdir, 'noise')
with open(noisefile, 'wb') as f:
f.write(os.urandom(64))
try:
nssdb.run_certutil([
'-S', '-x',
'-z', noisefile,
'-k', 'rsa', '-g', '2048', '-Z', 'SHA256',
'-t', 'CTu,Cu,Cu',
'-s', 'CN=testcert',
'-n', CERTNICK,
'-m', '365',
])
finally:
os.unlink(noisefile)
def test_dbm_tmp():
with NSSDatabase(dbtype='dbm') as nssdb:
assert nssdb.dbtype == 'dbm'
for filename in nssdb.filenames:
assert not os.path.isfile(filename)
assert not nssdb.exists()
nssdb.create_db()
for filename in nssdb.filenames:
assert os.path.isfile(filename)
assert os.path.dirname(filename) == nssdb.secdir
assert nssdb.exists()
assert os.path.basename(nssdb.certdb) == 'cert8.db'
assert nssdb.certdb in nssdb.filenames
assert os.path.basename(nssdb.keydb) == 'key3.db'
assert os.path.basename(nssdb.secmod) == 'secmod.db'
def test_sql_tmp():
with NSSDatabase(dbtype='sql') as nssdb:
assert nssdb.dbtype == 'sql'
for filename in nssdb.filenames:
assert not os.path.isfile(filename)
assert not nssdb.exists()
nssdb.create_db()
for filename in nssdb.filenames:
assert os.path.isfile(filename)
assert os.path.dirname(filename) == nssdb.secdir
assert nssdb.exists()
assert os.path.basename(nssdb.certdb) == 'cert9.db'
assert nssdb.certdb in nssdb.filenames
assert os.path.basename(nssdb.keydb) == 'key4.db'
assert os.path.basename(nssdb.secmod) == 'pkcs11.txt'
def test_convert_db():
with NSSDatabase(dbtype='dbm') as nssdb:
assert nssdb.dbtype == 'dbm'
nssdb.create_db()
assert nssdb.exists()
create_selfsigned(nssdb)
oldcerts = nssdb.list_certs()
assert len(oldcerts) == 1
oldkeys = nssdb.list_keys()
assert len(oldkeys) == 1
nssdb.convert_db()
assert nssdb.exists()
assert nssdb.dbtype == 'sql'
newcerts = nssdb.list_certs()
assert len(newcerts) == 1
assert newcerts == oldcerts
newkeys = nssdb.list_keys()
assert len(newkeys) == 1
assert newkeys == oldkeys
for filename in nssdb.filenames:
assert os.path.isfile(filename)
assert os.path.dirname(filename) == nssdb.secdir
assert os.path.basename(nssdb.certdb) == 'cert9.db'
assert nssdb.certdb in nssdb.filenames
assert os.path.basename(nssdb.keydb) == 'key4.db'
assert os.path.basename(nssdb.secmod) == 'pkcs11.txt'
def test_convert_db_nokey():
with NSSDatabase(dbtype='dbm') as nssdb:
assert nssdb.dbtype == 'dbm'
nssdb.create_db()
create_selfsigned(nssdb)
assert len(nssdb.list_certs()) == 1
assert len(nssdb.list_keys()) == 1
# remove key, readd cert
cert = nssdb.get_cert(CERTNICK)
nssdb.run_certutil(['-F', '-n', CERTNICK])
nssdb.add_cert(cert, CERTNICK, TRUSTED_PEER_TRUST_FLAGS)
assert len(nssdb.list_keys()) == 0
oldcerts = nssdb.list_certs()
assert len(oldcerts) == 1
nssdb.convert_db()
assert nssdb.dbtype == 'sql'
newcerts = nssdb.list_certs()
assert len(newcerts) == 1
assert newcerts == oldcerts
assert nssdb.get_cert(CERTNICK) == cert
newkeys = nssdb.list_keys()
assert newkeys == ()
for filename in nssdb.filenames:
assert os.path.isfile(filename)
assert os.path.dirname(filename) == nssdb.secdir
old = os.path.join(nssdb.secdir, 'cert8.db')
assert not os.path.isfile(old)
assert os.path.isfile(old + '.migrated')
assert os.path.basename(nssdb.certdb) == 'cert9.db'
assert nssdb.certdb in nssdb.filenames
assert os.path.basename(nssdb.keydb) == 'key4.db'
assert os.path.basename(nssdb.secmod) == 'pkcs11.txt'
def test_auto_db():
with NSSDatabase() as nssdb:
assert nssdb.dbtype == 'auto'
assert nssdb.filenames is None
assert not nssdb.exists()
with pytest.raises(RuntimeError):
nssdb.list_certs()
nssdb.create_db()
assert nssdb.dbtype in ('dbm', 'sql')
if NSS_DEFAULT is not None:
assert nssdb.dbtype == NSS_DEFAULT
assert nssdb.filenames is not None
assert nssdb.exists()
nssdb.list_certs()

View File

@@ -18,6 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import time
import datetime
import email.utils
import calendar
@@ -279,7 +280,7 @@ class TestInvalidAttributes(unittest.TestCase):
# Invalid Max-Age
s = 'color=blue; Max-Age=over-the-hill'
with self.assertRaises(ValueError):
Cookie.parse(s)
cookies = Cookie.parse(s)
cookie = Cookie('color', 'blue')
with self.assertRaises(ValueError):
@@ -288,7 +289,7 @@ class TestInvalidAttributes(unittest.TestCase):
# Invalid Expires
s = 'color=blue; Expires=Sun, 06 Xxx 1994 08:49:37 GMT'
with self.assertRaises(ValueError):
Cookie.parse(s)
cookies = Cookie.parse(s)
cookie = Cookie('color', 'blue')
with self.assertRaises(ValueError):

View File

@@ -1,177 +0,0 @@
#
# Copyright (C) 2017 FreeIPA Contributors. See COPYING for license
#
from __future__ import absolute_import
import os
import shutil
import tempfile
import pytest
from ipapython import directivesetter
EXAMPLE_CONFIG = [
'foo=1\n',
'foobar=2\n',
]
WHITESPACE_CONFIG = [
'foo 1\n',
'foobar\t2\n',
]
@pytest.fixture
def tempdir(request):
tempdir = tempfile.mkdtemp()
def fin():
shutil.rmtree(tempdir)
request.addfinalizer(fin)
return tempdir
class test_set_directive_lines(object):
def test_remove_directive(self):
lines = directivesetter.set_directive_lines(
False, '=', 'foo', None, EXAMPLE_CONFIG, comment="#")
assert list(lines) == ['foobar=2\n']
def test_add_directive(self):
lines = directivesetter.set_directive_lines(
False, '=', 'baz', '4', EXAMPLE_CONFIG, comment="#")
assert list(lines) == ['foo=1\n', 'foobar=2\n', 'baz=4\n']
def test_set_directive_does_not_clobber_suffix_key(self):
lines = directivesetter.set_directive_lines(
False, '=', 'foo', '3', EXAMPLE_CONFIG, comment="#")
assert list(lines) == ['foo=3\n', 'foobar=2\n']
class test_set_directive_lines_whitespace(object):
def test_remove_directive(self):
lines = directivesetter.set_directive_lines(
False, ' ', 'foo', None, WHITESPACE_CONFIG, comment="#")
assert list(lines) == ['foobar\t2\n']
def test_add_directive(self):
lines = directivesetter.set_directive_lines(
False, ' ', 'baz', '4', WHITESPACE_CONFIG, comment="#")
assert list(lines) == ['foo 1\n', 'foobar\t2\n', 'baz 4\n']
def test_set_directive_does_not_clobber_suffix_key(self):
lines = directivesetter.set_directive_lines(
False, ' ', 'foo', '3', WHITESPACE_CONFIG, comment="#")
assert list(lines) == ['foo 3\n', 'foobar\t2\n']
def test_set_directive_with_tab(self):
lines = directivesetter.set_directive_lines(
False, ' ', 'foobar', '6', WHITESPACE_CONFIG, comment="#")
assert list(lines) == ['foo 1\n', 'foobar 6\n']
class test_set_directive(object):
def test_set_directive(self):
"""Check that set_directive writes the new data and preserves mode."""
fd, filename = tempfile.mkstemp()
try:
os.close(fd)
stat_pre = os.stat(filename)
with open(filename, 'w') as f:
for line in EXAMPLE_CONFIG:
f.write(line)
directivesetter.set_directive(
filename, 'foo', '3', False, '=', "#")
stat_post = os.stat(filename)
with open(filename, 'r') as f:
lines = list(f)
assert lines == ['foo=3\n', 'foobar=2\n']
assert stat_pre.st_mode == stat_post.st_mode
assert stat_pre.st_uid == stat_post.st_uid
assert stat_pre.st_gid == stat_post.st_gid
finally:
os.remove(filename)
class test_get_directive(object):
def test_get_directive(self, tmpdir):
configfile = tmpdir.join('config')
configfile.write(''.join(EXAMPLE_CONFIG))
assert '1' == directivesetter.get_directive(str(configfile),
'foo',
separator='=')
assert '2' == directivesetter.get_directive(str(configfile),
'foobar',
separator='=')
class test_get_directive_whitespace(object):
def test_get_directive(self, tmpdir):
configfile = tmpdir.join('config')
configfile.write(''.join(WHITESPACE_CONFIG))
assert '1' == directivesetter.get_directive(str(configfile),
'foo')
assert '2' == directivesetter.get_directive(str(configfile),
'foobar')
def test_directivesetter(tempdir):
filename = os.path.join(tempdir, 'example.conf')
with open(filename, 'w') as f:
for line in EXAMPLE_CONFIG:
f.write(line)
ds = directivesetter.DirectiveSetter(filename)
assert ds.lines is None
with ds:
assert ds.lines == EXAMPLE_CONFIG
ds.set('foo', '3') # quoted, space separated, doesn't change 'foo='
ds.set('foobar', None, separator='=') # remove
ds.set('baz', '4', False, '=') # add
ds.setitems([
('list1', 'value1'),
('list2', 'value2'),
])
ds.setitems({
'dict1': 'value1',
'dict2': 'value2',
})
with open(filename, 'r') as f:
lines = list(f)
assert lines == [
'foo=1\n',
'foo "3"\n',
'baz=4\n',
'list1 "value1"\n',
'list2 "value2"\n',
'dict1 "value1"\n',
'dict2 "value2"\n',
]
with directivesetter.DirectiveSetter(filename, True, '=') as ds:
ds.set('foo', '4') # doesn't change 'foo '
with open(filename, 'r') as f:
lines = list(f)
assert lines == [
'foo="4"\n',
'foo "3"\n',
'baz=4\n',
'list1 "value1"\n',
'list2 "value2"\n',
'dict1 "value1"\n',
'dict2 "value2"\n',
]

View File

@@ -1,8 +1,8 @@
import contextlib
import unittest
import pytest
from ipapython.dn import * # FIXME
from cryptography import x509
import six
from ipapython.dn import DN, RDN, AVA
@@ -139,10 +139,10 @@ class TestAVA(unittest.TestCase):
self.assertEqual(ava1[1], self.value1)
with self.assertRaises(KeyError):
ava1['foo'] # pylint: disable=pointless-statement
ava1['foo']
with self.assertRaises(KeyError):
ava1[3] # pylint: disable=pointless-statement
ava1[3]
def test_properties(self):
ava1 = AVA(self.ava1)
@@ -501,25 +501,25 @@ class TestRDN(unittest.TestCase):
self.assertEqual(rdn1[0], self.ava1)
self.assertEqual(rdn1[self.ava1.attr], self.ava1.value)
with self.assertRaises(KeyError):
rdn1['foo'] # pylint: disable=pointless-statement
rdn1['foo']
self.assertEqual(rdn2[0], self.ava2)
self.assertEqual(rdn2[self.ava2.attr], self.ava2.value)
with self.assertRaises(KeyError):
rdn2['foo'] # pylint: disable=pointless-statement
rdn2['foo']
self.assertEqual(rdn3[0], self.ava1)
self.assertEqual(rdn3[self.ava1.attr], self.ava1.value)
self.assertEqual(rdn3[1], self.ava2)
self.assertEqual(rdn3[self.ava2.attr], self.ava2.value)
with self.assertRaises(KeyError):
rdn3['foo'] # pylint: disable=pointless-statement
rdn3['foo']
self.assertEqual(rdn1.attr, self.attr1)
self.assertEqual(rdn1.value, self.value1)
with self.assertRaises(TypeError):
rdn3[1.0] # pylint: disable=pointless-statement
rdn3[1.0]
# Slices
self.assertEqual(rdn3[0:1], [self.ava1])
@@ -528,7 +528,6 @@ class TestRDN(unittest.TestCase):
def test_assignments(self):
rdn = RDN((self.attr1, self.value1))
with self.assertRaises(TypeError):
# pylint: disable=unsupported-assignment-operation
rdn[0] = self.ava2
def test_iter(self):
@@ -623,7 +622,7 @@ class TestDN(unittest.TestCase):
def setUp(self):
# ava1 must sort before ava2
self.attr1 = 'cn'
self.value1 = u'Bob'
self.value1 = 'Bob'
self.str_ava1 = '%s=%s' % (self.attr1, self.value1)
self.ava1 = AVA(self.attr1, self.value1)
@@ -631,7 +630,7 @@ class TestDN(unittest.TestCase):
self.rdn1 = RDN((self.attr1, self.value1))
self.attr2 = 'ou'
self.value2 = u'people'
self.value2 = 'people'
self.str_ava2 = '%s=%s' % (self.attr2, self.value2)
self.ava2 = AVA(self.attr2, self.value2)
@@ -658,11 +657,6 @@ class TestDN(unittest.TestCase):
self.base_container_dn = DN((self.attr1, self.value1),
self.container_dn, self.base_dn)
self.x500name = x509.Name([
x509.NameAttribute(
x509.NameOID.ORGANIZATIONAL_UNIT_NAME, self.value2),
x509.NameAttribute(x509.NameOID.COMMON_NAME, self.value1),
])
def assertExpectedClass(self, klass, obj, component):
self.assertIs(obj.__class__, expected_class(klass, component))
@@ -801,19 +795,6 @@ class TestDN(unittest.TestCase):
self.assertEqual(dn1[0], self.rdn1)
self.assertEqual(dn1[1], self.rdn2)
# Create with a python-cryptography 'Name'
dn1 = DN(self.x500name)
self.assertEqual(len(dn1), 2)
self.assertExpectedClass(DN, dn1, 'self')
for i in range(0, len(dn1)):
self.assertExpectedClass(DN, dn1[i], 'RDN')
for j in range(0, len(dn1[i])):
self.assertExpectedClass(DN, dn1[i][j], 'AVA')
self.assertIsInstance(dn1[i].attr, unicode)
self.assertIsInstance(dn1[i].value, unicode)
self.assertEqual(dn1[0], self.rdn1)
self.assertEqual(dn1[1], self.rdn2)
# Create with RDN, and 2 DN's (e.g. attr + container + base)
dn1 = DN((self.attr1, self.value1), self.container_dn, self.base_dn)
self.assertEqual(len(dn1), 5)
@@ -913,7 +894,6 @@ class TestDN(unittest.TestCase):
# Test "in" membership
self.assertTrue(self.container_rdn1 in container_dn)
# pylint: disable=comparison-with-itself
self.assertTrue(container_dn in container_dn)
self.assertFalse(self.base_rdn1 in container_dn)
@@ -936,30 +916,28 @@ class TestDN(unittest.TestCase):
self.assertEqual(dn1[0], self.rdn1)
self.assertEqual(dn1[self.rdn1.attr], self.rdn1.value)
with self.assertRaises(KeyError):
dn1['foo'] # pylint: disable=pointless-statement
dn1['foo']
self.assertEqual(dn2[0], self.rdn2)
self.assertEqual(dn2[self.rdn2.attr], self.rdn2.value)
with self.assertRaises(KeyError):
dn2['foo'] # pylint: disable=pointless-statement
dn2['foo']
self.assertEqual(dn3[0], self.rdn1)
self.assertEqual(dn3[self.rdn1.attr], self.rdn1.value)
self.assertEqual(dn3[1], self.rdn2)
self.assertEqual(dn3[self.rdn2.attr], self.rdn2.value)
with self.assertRaises(KeyError):
dn3['foo'] # pylint: disable=pointless-statement
dn3['foo']
with self.assertRaises(TypeError):
dn3[1.0] # pylint: disable=pointless-statement
dn3[1.0]
def test_assignments(self):
dn = DN('t=0,t=1,t=2,t=3,t=4,t=5,t=6,t=7,t=8,t=9')
dn = dn2 = DN('t=0,t=1,t=2,t=3,t=4,t=5,t=6,t=7,t=8,t=9')
with self.assertRaises(TypeError):
# pylint: disable=unsupported-assignment-operation
dn[0] = RDN('t=a')
with self.assertRaises(TypeError):
# pylint: disable=unsupported-assignment-operation
dn[0:1] = [RDN('t=a'), RDN('t=b')]
def test_iter(self):
@@ -1125,7 +1103,7 @@ class TestDN(unittest.TestCase):
# pylint: disable=no-member
dn = DN('t=0,t=1,t=2,t=3,t=4,t=5,t=6,t=7,t=8,t=9')
with self.assertRaises(AttributeError):
dn.replace # pylint: disable=pointless-statement
dn.replace
def test_hashing(self):
# create DN's that are equal but differ in case
@@ -1188,26 +1166,6 @@ class TestDN(unittest.TestCase):
self.assertFalse(dn3_a in s)
self.assertFalse(dn3_b in s)
def test_x500_text(self):
# null DN x500 ordering and LDAP ordering are the same
nulldn = DN()
self.assertEqual(nulldn.ldap_text(), nulldn.x500_text())
# reverse a DN with a single RDN
self.assertEqual(self.dn1.ldap_text(), self.dn1.x500_text())
# reverse a DN with 2 RDNs
dn3_x500 = self.dn3.x500_text()
dn3_rev = DN(self.rdn2, self.rdn1)
self.assertEqual(dn3_rev.ldap_text(), dn3_x500)
# reverse a longer DN
longdn_x500 = self.base_container_dn.x500_text()
longdn_rev = DN(longdn_x500)
l = len(self.base_container_dn)
for i in range(l):
self.assertEqual(longdn_rev[i], self.base_container_dn[l-1-i])
class TestEscapes(unittest.TestCase):
def setUp(self):

View File

@@ -1,106 +0,0 @@
#
# Copyright (C) 2018 FreeIPA Contributors. See COPYING for license
#
import dns.name
import dns.rdataclass
import dns.rdatatype
from dns.rdtypes.IN.SRV import SRV
from dns.rdtypes.ANY.URI import URI
from ipapython import dnsutil
import pytest
def mksrv(priority, weight, port, target):
return SRV(
rdclass=dns.rdataclass.IN,
rdtype=dns.rdatatype.SRV,
priority=priority,
weight=weight,
port=port,
target=dns.name.from_text(target)
)
def mkuri(priority, weight, target):
return URI(
rdclass=dns.rdataclass.IN,
rdtype=dns.rdatatype.URI,
priority=priority,
weight=weight,
target=target
)
class TestSortSRV(object):
def test_empty(self):
assert dnsutil.sort_prio_weight([]) == []
def test_one(self):
h1 = mksrv(1, 0, 443, u"host1")
assert dnsutil.sort_prio_weight([h1]) == [h1]
h2 = mksrv(10, 5, 443, u"host2")
assert dnsutil.sort_prio_weight([h2]) == [h2]
def test_prio(self):
h1 = mksrv(1, 0, 443, u"host1")
h2 = mksrv(2, 0, 443, u"host2")
h3 = mksrv(3, 0, 443, u"host3")
assert dnsutil.sort_prio_weight([h3, h2, h1]) == [h1, h2, h3]
assert dnsutil.sort_prio_weight([h3, h3, h3]) == [h3]
assert dnsutil.sort_prio_weight([h2, h2, h1, h1]) == [h1, h2]
h380 = mksrv(4, 0, 80, u"host3")
assert dnsutil.sort_prio_weight([h1, h3, h380]) == [h1, h3, h380]
hs = mksrv(-1, 0, 443, u"special")
assert dnsutil.sort_prio_weight([h1, h2, hs]) == [hs, h1, h2]
def assert_permutations(self, answers, permutations):
seen = set()
for _unused in range(1000):
result = tuple(dnsutil.sort_prio_weight(answers))
assert result in permutations
seen.add(result)
if seen == permutations:
break
else:
pytest.fail("sorting didn't exhaust all permutations.")
def test_sameprio(self):
h1 = mksrv(1, 0, 443, u"host1")
h2 = mksrv(1, 0, 443, u"host2")
permutations = {
(h1, h2),
(h2, h1),
}
self.assert_permutations([h1, h2], permutations)
def test_weight(self):
h1 = mksrv(1, 0, 443, u"host1")
h2_w15 = mksrv(2, 15, 443, u"host2")
h3_w10 = mksrv(2, 10, 443, u"host3")
permutations = {
(h1, h2_w15, h3_w10),
(h1, h3_w10, h2_w15),
}
self.assert_permutations([h1, h2_w15, h3_w10], permutations)
def test_large(self):
records = tuple(
mksrv(1, i, 443, "host{}".format(i)) for i in range(1000)
)
assert len(dnsutil.sort_prio_weight(records)) == len(records)
class TestSortURI(object):
def test_prio(self):
h1 = mkuri(1, 0, u"https://host1/api")
h2 = mkuri(2, 0, u"https://host2/api")
h3 = mkuri(3, 0, u"https://host3/api")
assert dnsutil.sort_prio_weight([h3, h2, h1]) == [h1, h2, h3]
assert dnsutil.sort_prio_weight([h3, h3, h3]) == [h3]
assert dnsutil.sort_prio_weight([h2, h2, h1, h1]) == [h1, h2]

View File

@@ -0,0 +1,277 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
"""
Test the `ipapython/ipap11helper/p11helper.c` module.
"""
from binascii import hexlify
import os
import os.path
import logging
import subprocess
import tempfile
import pytest
from ipaplatform.paths import paths
from ipapython import p11helper as _ipap11helper
pytestmark = pytest.mark.tier0
CONFIG_DATA = """
# SoftHSM v2 configuration file
directories.tokendir = %s/tokens
objectstore.backend = file
"""
LIBSOFTHSM = paths.LIBSOFTHSM2_SO
SOFTHSM2_UTIL = paths.SOFTHSM2_UTIL
logging.basicConfig(level=logging.INFO)
log = logging.getLogger('t')
master_key_label = u"master-ž" # random non-ascii character to test unicode
master_key_id = "m"
replica1_key_label = u"replica1"
replica1_key_id = "id1"
replica1_import_label = u"replica1-import"
replica1_import_id = "id1-import"
replica1_new_label = u"replica1-new-label-ž"
replica2_key_label = u"replica2"
replica2_key_id = "id2"
replica_non_existent_label = u"replica-nonexistent"
@pytest.fixture(scope="module")
def p11(request):
token_path = tempfile.mkdtemp(prefix='pytest_', suffix='_pkcs11')
os.chdir(token_path)
os.mkdir('tokens')
with open('softhsm2.conf', 'w') as cfg:
cfg.write(CONFIG_DATA % token_path)
os.environ['SOFTHSM2_CONF'] = os.path.join(token_path, 'softhsm2.conf')
subprocess.check_call([SOFTHSM2_UTIL, '--init-token', '--slot', '0',
'--label', 'test', '--pin', '1234', '--so-pin',
'1234'])
try:
p11 = _ipap11helper.P11_Helper(0, "1234", LIBSOFTHSM)
except _ipap11helper.Error:
pytest.fail('Failed to initialize the helper object.', pytrace=False)
def fin():
try:
p11.finalize()
except _ipap11helper.Error:
pytest.fail('Failed to finalize the helper object.', pytrace=False)
finally:
del os.environ['SOFTHSM2_CONF']
request.addfinalizer(fin)
return p11
class test_p11helper(object):
def test_generate_master_key(self, p11):
assert p11.generate_master_key(master_key_label, master_key_id,
key_length=16, cka_wrap=True,
cka_unwrap=True)
def test_search_for_master_key(self, p11):
master_key = p11.find_keys(_ipap11helper.KEY_CLASS_SECRET_KEY,
label=master_key_label, id=master_key_id)
assert len(master_key) == 1, "The master key should exist."
def test_generate_replica_key_pair(self, p11):
assert p11.generate_replica_key_pair(replica1_key_label,
replica1_key_id,
pub_cka_wrap=True,
priv_cka_unwrap=True)
def test_find_key(self, p11):
rep1_pub = p11.find_keys(_ipap11helper.KEY_CLASS_PUBLIC_KEY,
label=replica1_key_label, cka_wrap=True)
assert len(rep1_pub) == 1, ("replica key pair has to contain "
"1 pub key instead of %s" % len(rep1_pub))
rep1_priv = p11.find_keys(_ipap11helper.KEY_CLASS_PRIVATE_KEY,
label=replica1_key_label, cka_unwrap=True)
assert len(rep1_priv) == 1, ("replica key pair has to contain 1 "
"private key instead of %s" %
len(rep1_priv))
def test_find_key_by_uri(self, p11):
rep1_pub = p11.find_keys(uri="pkcs11:object=replica1;objecttype=public")
assert len(rep1_pub) == 1, ("replica key pair has to contain 1 pub "
"key instead of %s" % len(rep1_pub))
def test_get_attribute_from_object(self, p11):
rep1_pub = p11.find_keys(_ipap11helper.KEY_CLASS_PUBLIC_KEY,
label=replica1_key_label, cka_wrap=True)[0]
iswrap = p11.get_attribute(rep1_pub, _ipap11helper.CKA_WRAP)
assert iswrap is True, "replica public key has to have CKA_WRAP = TRUE"
def test_generate_replica_keypair_with_extractable_private_key(self, p11):
assert p11.generate_replica_key_pair(replica2_key_label,
replica2_key_id,
pub_cka_wrap=True,
priv_cka_unwrap=True,
priv_cka_extractable=True)
def test_find_key_on_nonexistent_key_pair(self, p11):
test_list = p11.find_keys(_ipap11helper.KEY_CLASS_PUBLIC_KEY,
label=replica_non_existent_label)
assert len(test_list) == 0, ("list should be empty because label "
"'%s' should not exist" %
replica_non_existent_label)
def test_export_import_of_public_key(self, p11):
rep1_pub = p11.find_keys(_ipap11helper.KEY_CLASS_PUBLIC_KEY,
label=replica1_key_label, cka_wrap=True)[0]
pub = p11.export_public_key(rep1_pub)
log.debug("Exported public key %s", hexlify(pub))
with open("public_key.asn1.der", "wb") as f:
f.write(pub)
rep1_pub_import = p11.import_public_key(replica1_import_label,
replica1_import_id,
pub,
cka_wrap=True)
log.debug('imported replica 1 public key: %s', rep1_pub_import)
# test public key import
rep1_modulus_orig = p11.get_attribute(rep1_pub,
_ipap11helper.CKA_MODULUS)
rep1_modulus_import = p11.get_attribute(rep1_pub_import,
_ipap11helper.CKA_MODULUS)
log.debug('rep1_modulus_orig = 0x%s', hexlify(rep1_modulus_orig))
log.debug('rep1_modulus_import = 0x%s', hexlify(rep1_modulus_import))
assert rep1_modulus_import == rep1_modulus_orig
rep1_pub_exp_orig = p11.get_attribute(
rep1_pub, _ipap11helper.CKA_PUBLIC_EXPONENT)
rep1_pub_exp_import = p11.get_attribute(
rep1_pub_import, _ipap11helper.CKA_PUBLIC_EXPONENT)
log.debug('rep1_pub_exp_orig = 0x%s', hexlify(rep1_pub_exp_orig))
log.debug('rep1_pub_exp_import = 0x%s', hexlify(rep1_pub_exp_import))
assert rep1_pub_exp_import == rep1_pub_exp_orig
def test_wrap_unwrap_key_by_master_key_with_AES(self, p11):
master_key = p11.find_keys(_ipap11helper.KEY_CLASS_SECRET_KEY,
label=master_key_label, id=master_key_id)[0]
rep2_priv = p11.find_keys(_ipap11helper.KEY_CLASS_PRIVATE_KEY,
label=replica2_key_label, cka_unwrap=True)[0]
log.debug("wrapping dnssec priv key by master key")
wrapped_priv = p11.export_wrapped_key(
rep2_priv, master_key, _ipap11helper.MECH_AES_KEY_WRAP_PAD
)
assert wrapped_priv
log.debug("wrapped_dnssec priv key: %s", hexlify(wrapped_priv))
with open("wrapped_priv.der", "wb") as f:
f.write(wrapped_priv)
assert p11.import_wrapped_private_key(
u'test_import_wrapped_priv',
'1',
wrapped_priv,
master_key,
_ipap11helper.MECH_AES_KEY_WRAP_PAD,
_ipap11helper.KEY_TYPE_RSA
)
def test_wrap_unwrap_key_by_master_key_with_RSA_PKCS(self, p11):
master_key = p11.find_keys(_ipap11helper.KEY_CLASS_SECRET_KEY,
label=master_key_label, id=master_key_id)[0]
rep2_pub = p11.find_keys(_ipap11helper.KEY_CLASS_PUBLIC_KEY,
label=replica2_key_label, cka_wrap=True)[0]
rep2_priv = p11.find_keys(_ipap11helper.KEY_CLASS_PRIVATE_KEY,
label=replica2_key_label, cka_unwrap=True)[0]
wrapped = p11.export_wrapped_key(master_key,
rep2_pub,
_ipap11helper.MECH_RSA_PKCS)
assert wrapped
log.debug("wrapped key MECH_RSA_PKCS (secret master wrapped by pub "
"key): %s", hexlify(wrapped))
assert p11.import_wrapped_secret_key(u'test_import_wrapped',
'2',
wrapped,
rep2_priv,
_ipap11helper.MECH_RSA_PKCS,
_ipap11helper.KEY_TYPE_AES)
def test_wrap_unwrap_by_master_key_with_RSA_PKCS_OAEP(self, p11):
master_key = p11.find_keys(_ipap11helper.KEY_CLASS_SECRET_KEY,
label=master_key_label, id=master_key_id)[0]
rep2_pub = p11.find_keys(_ipap11helper.KEY_CLASS_PUBLIC_KEY,
label=replica2_key_label, cka_wrap=True)[0]
rep2_priv = p11.find_keys(_ipap11helper.KEY_CLASS_PRIVATE_KEY,
label=replica2_key_label, cka_unwrap=True)[0]
wrapped = p11.export_wrapped_key(master_key,
rep2_pub,
_ipap11helper.MECH_RSA_PKCS_OAEP)
assert wrapped
log.debug("wrapped key MECH_RSA_PKCS_OAEP (secret master wrapped by "
"pub key): %s", hexlify(wrapped))
assert p11.import_wrapped_secret_key(u'test_import_wrapped',
'3',
wrapped,
rep2_priv,
_ipap11helper.MECH_RSA_PKCS_OAEP,
_ipap11helper.KEY_TYPE_AES)
def test_set_attribute_on_object(self, p11):
rep1_pub = p11.find_keys(_ipap11helper.KEY_CLASS_PUBLIC_KEY,
label=replica1_key_label, cka_wrap=True)[0]
test_label = replica1_new_label
p11.set_attribute(rep1_pub, _ipap11helper.CKA_LABEL, test_label)
assert p11.get_attribute(rep1_pub, _ipap11helper.CKA_LABEL) \
== test_label, "The labels do not match."
def test_do_not_generate_identical_master_keys(self, p11):
with pytest.raises(_ipap11helper.DuplicationError):
p11.generate_master_key(master_key_label, master_key_id,
key_length=16)
master_key = p11.find_keys(_ipap11helper.KEY_CLASS_SECRET_KEY,
label=master_key_label)
assert len(master_key) == 1, ("There shouldn't be multiple keys "
"with the same label.")
def test_delete_key(self, p11):
master_key = p11.find_keys(_ipap11helper.KEY_CLASS_SECRET_KEY,
label=master_key_label, id=master_key_id)[0]
rep1_pub = p11.find_keys(_ipap11helper.KEY_CLASS_PUBLIC_KEY,
label=replica1_new_label, cka_wrap=True)[0]
rep2_priv = p11.find_keys(_ipap11helper.KEY_CLASS_PRIVATE_KEY,
label=replica2_key_label, cka_unwrap=True)[0]
for key in (rep1_pub, rep2_priv, master_key):
p11.delete_key(key)
master_key = p11.find_keys(_ipap11helper.KEY_CLASS_SECRET_KEY,
label=master_key_label, id=master_key_id)
assert len(master_key) == 0, "The master key should be deleted."
rep1_pub = p11.find_keys(_ipap11helper.KEY_CLASS_PUBLIC_KEY,
label=replica1_new_label, cka_wrap=True)
assert len(rep1_pub) == 0, ("The public key of replica1 pair should "
"be deleted.")
rep2_priv = p11.find_keys(_ipap11helper.KEY_CLASS_PRIVATE_KEY,
label=replica2_key_label, cka_unwrap=True)
assert len(rep2_priv) == 0, ("The private key of replica2 pair should"
" be deleted.")

View File

@@ -21,67 +21,63 @@
"""
Test the `ipapython/ipautil.py` module.
"""
from __future__ import absolute_import
import os
import pwd
import socket
import sys
import tempfile
import nose
import pytest
import six
from ipalib.constants import IPAAPI_USER
from ipaplatform.paths import paths
from ipapython import ipautil
import pytest
pytestmark = pytest.mark.tier0
def assert_equal(a, b):
__tracebackhide__ = True
assert a == b
def make_ipaddress_checker(addr, words=None, prefixlen=None):
def check_ipaddress():
try:
ip = ipautil.CheckedIPAddress(addr, match_local=False)
assert ip.words == words and ip.prefixlen == prefixlen
except:
assert words is None and prefixlen is None
check_ipaddress.description = "Test IP address parsing and verification (%s)" % addr
return check_ipaddress
@pytest.mark.parametrize("addr,words,prefixlen", [
('0.0.0.0/0', None, None),
('10.11.12.13', (10, 11, 12, 13), 8),
('10.11.12.13/14', (10, 11, 12, 13), 14),
('10.11.12.13%zoneid', None, None),
('10.11.12.13%zoneid/14', None, None),
('10.11.12.1337', None, None),
('10.11.12.13/33', None, None),
('127.0.0.1', None, None),
('241.1.2.3', None, None),
('169.254.1.2', None, None),
('10.11.12.0/24', (10, 11, 12, 0), 24),
('10.0.0.255', (10, 0, 0, 255), 8),
('224.5.6.7', None, None),
('10.11.12.255/24', (10, 11, 12, 255), 24),
('255.255.255.255', None, None),
('::/0', None, None),
('2001::1', (0x2001, 0, 0, 0, 0, 0, 0, 1), 64),
('2001::1/72', (0x2001, 0, 0, 0, 0, 0, 0, 1), 72),
('2001::1%zoneid', (0x2001, 0, 0, 0, 0, 0, 0, 1), 64),
('2001::1%zoneid/72', None, None),
('2001::1beef', None, None),
('2001::1/129', None, None),
('::1', None, None),
('6789::1', None, None),
('fe89::1', None, None),
('2001::/64', (0x2001, 0, 0, 0, 0, 0, 0, 0), 64),
('ff01::1', None, None),
('junk', None, None)
])
def test_ip_address(addr, words, prefixlen):
if words is None:
pytest.raises(
ValueError, ipautil.CheckedIPAddress, addr)
else:
ip = ipautil.CheckedIPAddress(addr)
assert ip.words == words
assert ip.prefixlen == prefixlen
def test_ip_address():
addrs = [
('10.11.12.13', (10, 11, 12, 13), 8),
('10.11.12.13/14', (10, 11, 12, 13), 14),
('10.11.12.13%zoneid',),
('10.11.12.13%zoneid/14',),
('10.11.12.1337',),
('10.11.12.13/33',),
('127.0.0.1',),
('241.1.2.3',),
('169.254.1.2',),
('10.11.12.0/24',),
('224.5.6.7',),
('10.11.12.255/24',),
('2001::1', (0x2001, 0, 0, 0, 0, 0, 0, 1), 64),
('2001::1/72', (0x2001, 0, 0, 0, 0, 0, 0, 1), 72),
('2001::1%zoneid', (0x2001, 0, 0, 0, 0, 0, 0, 1), 64),
('2001::1%zoneid/72',),
('2001::1beef',),
('2001::1/129',),
('::1',),
('6789::1',),
('fe89::1',),
('2001::/64',),
('ff01::1',),
('junk',)
]
for addr in addrs:
yield make_ipaddress_checker(*addr)
class TestCIDict(object):
@@ -104,30 +100,30 @@ class TestCIDict(object):
assert dict(cidict.items()) == {'a': 2, 'b': 3, 'C': 4}
def test_len(self):
assert_equal(3, len(self.cidict))
nose.tools.assert_equal(3, len(self.cidict))
def test_getitem(self):
assert_equal("val1", self.cidict["Key1"])
assert_equal("val1", self.cidict["key1"])
assert_equal("val2", self.cidict["KEY2"])
assert_equal("VAL3", self.cidict["key3"])
assert_equal("VAL3", self.cidict["KEY3"])
with pytest.raises(KeyError):
self.cidict["key4"] # pylint: disable=pointless-statement
nose.tools.assert_equal("val1", self.cidict["Key1"])
nose.tools.assert_equal("val1", self.cidict["key1"])
nose.tools.assert_equal("val2", self.cidict["KEY2"])
nose.tools.assert_equal("VAL3", self.cidict["key3"])
nose.tools.assert_equal("VAL3", self.cidict["KEY3"])
with nose.tools.assert_raises(KeyError):
self.cidict["key4"]
def test_get(self):
assert_equal("val1", self.cidict.get("Key1"))
assert_equal("val1", self.cidict.get("key1"))
assert_equal("val2", self.cidict.get("KEY2"))
assert_equal("VAL3", self.cidict.get("key3"))
assert_equal("VAL3", self.cidict.get("KEY3"))
assert_equal("default", self.cidict.get("key4", "default"))
nose.tools.assert_equal("val1", self.cidict.get("Key1"))
nose.tools.assert_equal("val1", self.cidict.get("key1"))
nose.tools.assert_equal("val2", self.cidict.get("KEY2"))
nose.tools.assert_equal("VAL3", self.cidict.get("key3"))
nose.tools.assert_equal("VAL3", self.cidict.get("KEY3"))
nose.tools.assert_equal("default", self.cidict.get("key4", "default"))
def test_setitem(self):
self.cidict["key4"] = "val4"
assert_equal("val4", self.cidict["key4"])
nose.tools.assert_equal("val4", self.cidict["key4"])
self.cidict["KEY4"] = "newval4"
assert_equal("newval4", self.cidict["key4"])
nose.tools.assert_equal("newval4", self.cidict["key4"])
def test_del(self):
assert "Key1" in self.cidict
@@ -139,9 +135,9 @@ class TestCIDict(object):
assert "key2" not in self.cidict
def test_clear(self):
assert_equal(3, len(self.cidict))
nose.tools.assert_equal(3, len(self.cidict))
self.cidict.clear()
assert_equal(0, len(self.cidict))
nose.tools.assert_equal(0, len(self.cidict))
assert self.cidict == {}
assert list(self.cidict) == []
assert list(self.cidict.values()) == []
@@ -155,10 +151,10 @@ class TestCIDict(object):
def test_copy(self):
copy = self.cidict.copy()
assert copy == self.cidict
assert_equal(3, len(copy))
nose.tools.assert_equal(3, len(copy))
assert "Key1" in copy
assert "key1" in copy
assert_equal("val1", copy["Key1"])
nose.tools.assert_equal("val1", copy["Key1"])
@pytest.mark.skipif(not six.PY2, reason="Python 2 only")
def test_haskey(self):
@@ -177,26 +173,25 @@ class TestCIDict(object):
def test_items(self):
items = list(self.cidict.items())
assert_equal(3, len(items))
nose.tools.assert_equal(3, len(items))
items_set = set(items)
assert ("Key1", "val1") in items_set
assert ("key2", "val2") in items_set
assert ("KEY3", "VAL3") in items_set
# pylint: disable=dict-iter-method
# pylint: disable=dict-keys-not-iterating, dict-values-not-iterating
assert list(self.cidict.items()) == list(self.cidict.iteritems()) == list(zip(
self.cidict.keys(), self.cidict.values()))
def test_iter(self):
items = []
assert list(self.cidict) == list(self.cidict.keys())
assert sorted(self.cidict) == sorted(['Key1', 'key2', 'KEY3'])
def test_iteritems(self):
items = []
# pylint: disable=dict-iter-method
for k, v in self.cidict.iteritems():
items.append((k, v))
assert_equal(3, len(items))
for (k,v) in self.cidict.iteritems():
items.append((k,v))
nose.tools.assert_equal(3, len(items))
items_set = set(items)
assert ("Key1", "val1") in items_set
assert ("key2", "val2") in items_set
@@ -204,10 +199,9 @@ class TestCIDict(object):
def test_iterkeys(self):
keys = []
# pylint: disable=dict-iter-method
for k in self.cidict.iterkeys():
keys.append(k)
assert_equal(3, len(keys))
nose.tools.assert_equal(3, len(keys))
keys_set = set(keys)
assert "Key1" in keys_set
assert "key2" in keys_set
@@ -215,10 +209,9 @@ class TestCIDict(object):
def test_itervalues(self):
values = []
# pylint: disable=dict-iter-method
for k in self.cidict.itervalues():
values.append(k)
assert_equal(3, len(values))
nose.tools.assert_equal(3, len(values))
values_set = set(values)
assert "val1" in values_set
assert "val2" in values_set
@@ -226,32 +219,32 @@ class TestCIDict(object):
def test_keys(self):
keys = list(self.cidict.keys())
assert_equal(3, len(keys))
nose.tools.assert_equal(3, len(keys))
keys_set = set(keys)
assert "Key1" in keys_set
assert "key2" in keys_set
assert "KEY3" in keys_set
# pylint: disable=dict-iter-method
assert list(self.cidict.keys()) == list(self.cidict.iterkeys())
def test_values(self):
values = list(self.cidict.values())
assert_equal(3, len(values))
nose.tools.assert_equal(3, len(values))
values_set = set(values)
assert "val1" in values_set
assert "val2" in values_set
assert "VAL3" in values_set
# pylint: disable=dict-iter-method
assert list(self.cidict.values()) == list(self.cidict.itervalues())
def test_update(self):
newdict = { "KEY2": "newval2",
"key4": "val4" }
self.cidict.update(newdict)
assert_equal(4, len(self.cidict))
nose.tools.assert_equal(4, len(self.cidict))
items = list(self.cidict.items())
assert_equal(4, len(items))
nose.tools.assert_equal(4, len(items))
items_set = set(items)
assert ("Key1", "val1") in items_set
# note the update "overwrites" the case of the key2
@@ -272,15 +265,15 @@ class TestCIDict(object):
'Key1': 'val1', 'key2': 'val2', 'KEY3': 'VAL3'}
def test_update_duplicate_values_dict(self):
with pytest.raises(ValueError):
with nose.tools.assert_raises(ValueError):
self.cidict.update({'a': 'va', 'A': None, 'b': 3})
def test_update_duplicate_values_list(self):
with pytest.raises(ValueError):
with nose.tools.assert_raises(ValueError):
self.cidict.update([('a', 'va'), ('A', None), ('b', 3)])
def test_update_duplicate_values_kwargs(self):
with pytest.raises(ValueError):
with nose.tools.assert_raises(ValueError):
self.cidict.update(a='va', A=None, b=3)
def test_update_kwargs(self):
@@ -289,45 +282,45 @@ class TestCIDict(object):
'b': 'vb', 'Key1': 'val1', 'key2': 'val2', 'KEY3': 'VAL3'}
def test_setdefault(self):
assert_equal("val1", self.cidict.setdefault("KEY1", "default"))
nose.tools.assert_equal("val1", self.cidict.setdefault("KEY1", "default"))
assert "KEY4" not in self.cidict
assert_equal("default", self.cidict.setdefault("KEY4", "default"))
nose.tools.assert_equal("default", self.cidict.setdefault("KEY4", "default"))
assert "KEY4" in self.cidict
assert_equal("default", self.cidict["key4"])
nose.tools.assert_equal("default", self.cidict["key4"])
assert "KEY5" not in self.cidict
assert_equal(None, self.cidict.setdefault("KEY5"))
nose.tools.assert_equal(None, self.cidict.setdefault("KEY5"))
assert "KEY5" in self.cidict
assert_equal(None, self.cidict["key5"])
nose.tools.assert_equal(None, self.cidict["key5"])
def test_pop(self):
assert_equal("val1", self.cidict.pop("KEY1", "default"))
nose.tools.assert_equal("val1", self.cidict.pop("KEY1", "default"))
assert "key1" not in self.cidict
assert_equal("val2", self.cidict.pop("KEY2"))
nose.tools.assert_equal("val2", self.cidict.pop("KEY2"))
assert "key2" not in self.cidict
assert_equal("default", self.cidict.pop("key4", "default"))
with pytest.raises(KeyError):
nose.tools.assert_equal("default", self.cidict.pop("key4", "default"))
with nose.tools.assert_raises(KeyError):
self.cidict.pop("key4")
def test_popitem(self):
items = set(self.cidict.items())
assert_equal(3, len(self.cidict))
nose.tools.assert_equal(3, len(self.cidict))
item = self.cidict.popitem()
assert_equal(2, len(self.cidict))
nose.tools.assert_equal(2, len(self.cidict))
assert item in items
items.discard(item)
item = self.cidict.popitem()
assert_equal(1, len(self.cidict))
nose.tools.assert_equal(1, len(self.cidict))
assert item in items
items.discard(item)
item = self.cidict.popitem()
assert_equal(0, len(self.cidict))
nose.tools.assert_equal(0, len(self.cidict))
assert item in items
items.discard(item)
@@ -342,55 +335,55 @@ class TestTimeParser(object):
timestr = "20070803"
time = ipautil.parse_generalized_time(timestr)
assert_equal(2007, time.year)
assert_equal(8, time.month)
assert_equal(3, time.day)
assert_equal(0, time.hour)
assert_equal(0, time.minute)
assert_equal(0, time.second)
nose.tools.assert_equal(2007, time.year)
nose.tools.assert_equal(8, time.month)
nose.tools.assert_equal(3, time.day)
nose.tools.assert_equal(0, time.hour)
nose.tools.assert_equal(0, time.minute)
nose.tools.assert_equal(0, time.second)
def test_hour_min_sec(self):
timestr = "20051213141205"
time = ipautil.parse_generalized_time(timestr)
assert_equal(2005, time.year)
assert_equal(12, time.month)
assert_equal(13, time.day)
assert_equal(14, time.hour)
assert_equal(12, time.minute)
assert_equal(5, time.second)
nose.tools.assert_equal(2005, time.year)
nose.tools.assert_equal(12, time.month)
nose.tools.assert_equal(13, time.day)
nose.tools.assert_equal(14, time.hour)
nose.tools.assert_equal(12, time.minute)
nose.tools.assert_equal(5, time.second)
def test_fractions(self):
timestr = "2003092208.5"
time = ipautil.parse_generalized_time(timestr)
assert_equal(2003, time.year)
assert_equal(9, time.month)
assert_equal(22, time.day)
assert_equal(8, time.hour)
assert_equal(30, time.minute)
assert_equal(0, time.second)
nose.tools.assert_equal(2003, time.year)
nose.tools.assert_equal(9, time.month)
nose.tools.assert_equal(22, time.day)
nose.tools.assert_equal(8, time.hour)
nose.tools.assert_equal(30, time.minute)
nose.tools.assert_equal(0, time.second)
timestr = "199203301544,25"
time = ipautil.parse_generalized_time(timestr)
assert_equal(1992, time.year)
assert_equal(3, time.month)
assert_equal(30, time.day)
assert_equal(15, time.hour)
assert_equal(44, time.minute)
assert_equal(15, time.second)
nose.tools.assert_equal(1992, time.year)
nose.tools.assert_equal(3, time.month)
nose.tools.assert_equal(30, time.day)
nose.tools.assert_equal(15, time.hour)
nose.tools.assert_equal(44, time.minute)
nose.tools.assert_equal(15, time.second)
timestr = "20060401185912,8"
time = ipautil.parse_generalized_time(timestr)
assert_equal(2006, time.year)
assert_equal(4, time.month)
assert_equal(1, time.day)
assert_equal(18, time.hour)
assert_equal(59, time.minute)
assert_equal(12, time.second)
assert_equal(800000, time.microsecond)
nose.tools.assert_equal(2006, time.year)
nose.tools.assert_equal(4, time.month)
nose.tools.assert_equal(1, time.day)
nose.tools.assert_equal(18, time.hour)
nose.tools.assert_equal(59, time.minute)
nose.tools.assert_equal(12, time.second)
nose.tools.assert_equal(800000, time.microsecond)
def test_time_zones(self):
# pylint: disable=no-member
@@ -398,40 +391,40 @@ class TestTimeParser(object):
timestr = "20051213141205Z"
time = ipautil.parse_generalized_time(timestr)
assert_equal(0, time.tzinfo.houroffset)
assert_equal(0, time.tzinfo.minoffset)
nose.tools.assert_equal(0, time.tzinfo.houroffset)
nose.tools.assert_equal(0, time.tzinfo.minoffset)
offset = time.tzinfo.utcoffset(time.tzinfo.dst())
assert_equal(0, offset.seconds)
nose.tools.assert_equal(0, offset.seconds)
timestr = "20051213141205+0500"
time = ipautil.parse_generalized_time(timestr)
assert_equal(5, time.tzinfo.houroffset)
assert_equal(0, time.tzinfo.minoffset)
nose.tools.assert_equal(5, time.tzinfo.houroffset)
nose.tools.assert_equal(0, time.tzinfo.minoffset)
offset = time.tzinfo.utcoffset(time.tzinfo.dst())
assert_equal(5 * 60 * 60, offset.seconds)
nose.tools.assert_equal(5 * 60 * 60, offset.seconds)
timestr = "20051213141205-0500"
time = ipautil.parse_generalized_time(timestr)
assert_equal(-5, time.tzinfo.houroffset)
assert_equal(0, time.tzinfo.minoffset)
nose.tools.assert_equal(-5, time.tzinfo.houroffset)
nose.tools.assert_equal(0, time.tzinfo.minoffset)
# NOTE - the offset is always positive - it's minutes
# _east_ of UTC
offset = time.tzinfo.utcoffset(time.tzinfo.dst())
assert_equal((24 - 5) * 60 * 60, offset.seconds)
nose.tools.assert_equal((24 - 5) * 60 * 60, offset.seconds)
timestr = "20051213141205-0930"
time = ipautil.parse_generalized_time(timestr)
assert_equal(-9, time.tzinfo.houroffset)
assert_equal(-30, time.tzinfo.minoffset)
nose.tools.assert_equal(-9, time.tzinfo.houroffset)
nose.tools.assert_equal(-30, time.tzinfo.minoffset)
offset = time.tzinfo.utcoffset(time.tzinfo.dst())
assert_equal(((24 - 9) * 60 * 60) - (30 * 60), offset.seconds)
nose.tools.assert_equal(((24 - 9) * 60 * 60) - (30 * 60), offset.seconds)
def test_run():
result = ipautil.run([paths.ECHO, 'foo\x02bar'],
result = ipautil.run(['echo', 'foo\x02bar'],
capture_output=True,
capture_error=True)
assert result.returncode == 0
@@ -442,7 +435,7 @@ def test_run():
def test_run_no_capture_output():
result = ipautil.run([paths.ECHO, 'foo\x02bar'])
result = ipautil.run(['echo', 'foo\x02bar'])
assert result.returncode == 0
assert result.output is None
assert result.raw_output == b'foo\x02bar\n'
@@ -451,13 +444,13 @@ def test_run_no_capture_output():
def test_run_bytes():
result = ipautil.run([paths.ECHO, b'\x01\x02'], capture_output=True)
result = ipautil.run(['echo', b'\x01\x02'], capture_output=True)
assert result.returncode == 0
assert result.raw_output == b'\x01\x02\n'
def test_run_decode():
result = ipautil.run([paths.ECHO, u'á'.encode('utf-8')],
result = ipautil.run(['echo', u'á'.encode('utf-8')],
encoding='utf-8', capture_output=True)
assert result.returncode == 0
if six.PY3:
@@ -469,11 +462,11 @@ def test_run_decode():
def test_run_decode_bad():
if six.PY3:
with pytest.raises(UnicodeDecodeError):
ipautil.run([paths.ECHO, b'\xa0\xa1'],
ipautil.run(['echo', b'\xa0\xa1'],
capture_output=True,
encoding='utf-8')
else:
result = ipautil.run([paths.ECHO, '\xa0\xa1'],
result = ipautil.run(['echo', '\xa0\xa1'],
capture_output=True,
encoding='utf-8')
assert result.returncode == 0
@@ -481,99 +474,9 @@ def test_run_decode_bad():
def test_backcompat():
result = out, err, rc = ipautil.run([paths.ECHO, 'foo\x02bar'],
result = out, err, rc = ipautil.run(['echo', 'foo\x02bar'],
capture_output=True,
capture_error=True)
assert rc is result.returncode
assert out is result.output
assert err is result.error_output
def test_flush_sync():
with tempfile.NamedTemporaryFile('wb+') as f:
f.write(b'data')
ipautil.flush_sync(f)
def test_run_stderr():
args = [
sys.executable, '-c',
'import sys; sys.exit(" ".join(("error", "message")))'
]
with pytest.raises(ipautil.CalledProcessError) as cm:
ipautil.run(args)
assert cm.value.cmd == repr(args)
assert cm.value.stderr == "error message\n"
assert "CalledProcessError(" in str(cm.value)
assert repr(args) in str(cm.value)
assert str(cm.value).endswith("'error message\\n')")
with pytest.raises(ipautil.CalledProcessError) as cm:
ipautil.run(args, nolog=["message"])
assert cm.value.cmd == repr(args).replace("message", "XXXXXXXX")
assert str(cm.value).endswith("'error XXXXXXXX\\n')")
assert "message" not in str(cm.value)
assert "message" not in str(cm.value.output)
assert "message" not in str(cm.value.stderr)
@pytest.mark.skipif(os.geteuid() != 0,
reason="Must have root privileges to run this test")
def test_run_runas():
"""
Test run method with the runas parameter.
The test executes 'id' to make sure that the process is
executed with the user identity specified in runas parameter.
The test is using 'ipaapi' user as it is configured when
ipa-server-common package is installed.
"""
user = pwd.getpwnam(IPAAPI_USER)
res = ipautil.run(['/usr/bin/id', '-u'], runas=IPAAPI_USER)
assert res.returncode == 0
assert res.raw_output == b'%d\n' % user.pw_uid
res = ipautil.run(['/usr/bin/id', '-g'], runas=IPAAPI_USER)
assert res.returncode == 0
assert res.raw_output == b'%d\n' % user.pw_gid
@pytest.fixture(scope='function')
def tcp_listen():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
# port 0 means the OS selects a random, unused port for the test.
s.bind(('', 0))
s.listen(1)
yield s.getsockname()[-1], s
finally:
s.close()
@pytest.fixture(scope='function')
def udp_listen():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# port 0 means the OS selects a random, unused port for the test.
s.bind(('', 0))
yield s.getsockname()[-1], s
finally:
s.close()
def test_check_port_bindable_tcp(tcp_listen):
port, sock = tcp_listen
assert not ipautil.check_port_bindable(port)
assert not ipautil.check_port_bindable(port, socket.SOCK_STREAM)
sock.close()
assert ipautil.check_port_bindable(port)
def test_check_port_bindable_udp(udp_listen):
port, sock = udp_listen
assert not ipautil.check_port_bindable(port, socket.SOCK_DGRAM)
sock.close()
assert ipautil.check_port_bindable(port, socket.SOCK_DGRAM)

View File

@@ -1,3 +1,4 @@
#! /usr/bin/python2 -E
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
@@ -88,3 +89,6 @@ class TestValidate(unittest.TestCase):
self.assertEqual(False, ipavalidate.GoodName("foo%bar"))
self.assertEqual(False, ipavalidate.GoodName("*foo"))
self.assertEqual(False, ipavalidate.GoodName("$foo.bar$"))
if __name__ == '__main__':
unittest.main()

View File

@@ -1,139 +0,0 @@
#
# Copyright (C) 2016 FreeIPA Project Contributors - see LICENSE file
#
import pytest
import six
from ipapython.kerberos import Principal
if six.PY3:
unicode = str
valid_principals = {
u'tuser@REALM.TEST': {
'components': (u'tuser',),
'realm': u'REALM.TEST',
'username': u'tuser'
},
u'tuser\\@tupn.test@REALM.TEST': {
'components': (u'tuser@tupn.test',),
'realm': u'REALM.TEST',
'username': u'tuser@tupn.test',
'upn_suffix': u'tupn.test'
},
u'test/host.ipa.test@REALM.TEST': {
'components': (u'test', u'host.ipa.test'),
'realm': u'REALM.TEST',
'hostname': u'host.ipa.test'
},
u'test/service/host.ipa.test@REALM.TEST': {
'components': (u'test', u'service', u'host.ipa.test'),
'realm': u'REALM.TEST',
'service_name': u'test/service'
},
u'tuser': {
'components': (u'tuser',),
'realm': None,
'username': u'tuser'
},
u'$%user@REALM.TEST': {
'components': (u'$%user',),
'realm': u'REALM.TEST',
'username': u'$%user'
},
u'host/host.ipa.test': {
'components': (u'host', u'host.ipa.test'),
'realm': None,
'hostname': u'host.ipa.test'
},
u's$c/$%^.ipa.t%$t': {
'components': (u's$c', u'$%^.ipa.t%$t'),
'realm': None,
'hostname': u'$%^.ipa.t%$t',
'service_name': u's$c'
},
u'test\\/service/test\\/host@REALM\\@TEST': {
'components': (u'test/service', u'test/host'),
'realm': u'REALM@TEST',
'hostname': u'test/host',
'service_name': r'test\/service'
}
}
def valid_principal_iter(principals):
for princ, data in principals.items():
yield princ, data
@pytest.fixture(params=list(valid_principal_iter(valid_principals)))
def valid_principal(request):
return request.param
def test_principals(valid_principal):
principal_name, data = valid_principal
princ = Principal(principal_name)
for name, value in data.items():
assert getattr(princ, name) == value
assert unicode(princ) == principal_name
assert repr(princ) == "ipapython.kerberos.Principal('{}')".format(
principal_name)
def test_multiple_unescaped_ats_raise_error():
pytest.raises(ValueError, Principal, u'too@many@realms')
principals_properties = {
u'user@REALM': {
'property_true': ('is_user',),
'property_raises': ('upn_suffix', 'hostname', 'service_name')
},
u'host/m1.ipa.test@REALM': {
'property_true': ('is_host', 'is_service'),
'property_raises': ('username', 'upn_suffix')
},
u'service/m1.ipa.test@REALM': {
'property_true': ('is_service'),
'property_raises': ('username', 'upn_suffix')
},
u'user\\@domain@REALM': {
'property_true': ('is_user', 'is_enterprise'),
'property_raises': ('hostname', 'service_name')
}
}
def principal_properties_iter(principals_properties):
for p, data in principals_properties.items():
yield p, data
@pytest.fixture(params=list(principal_properties_iter(principals_properties)))
def principal_properties(request):
return request.param
def test_principal_properties(principal_properties):
principal, data = principal_properties
princ = Principal(principal)
boolean_propertes = [prop for prop in dir(princ) if
prop.startswith('is_')]
for b in boolean_propertes:
if b in data['property_true']:
assert getattr(princ, b)
else:
assert not getattr(princ, b)
for property_raises in data['property_raises']:
with pytest.raises(ValueError):
getattr(princ, property_raises)

View File

@@ -20,6 +20,7 @@
Test the `kernel_keyring.py` module.
"""
from nose.tools import raises, assert_raises # pylint: disable=E0611
from ipapython import kernel_keyring
import pytest
@@ -27,7 +28,6 @@ import pytest
pytestmark = pytest.mark.tier0
TEST_KEY = 'ipa_test'
TEST_UNICODEKEY = u'ipa_unicode'
TEST_VALUE = b'abc123'
UPDATE_VALUE = b'123abc'
@@ -49,10 +49,6 @@ class test_keyring(object):
kernel_keyring.del_key(SIZE_256)
except ValueError:
pass
try:
kernel_keyring.del_key(TEST_UNICODEKEY)
except ValueError:
pass
def test_01(self):
"""
@@ -80,13 +76,13 @@ class test_keyring(object):
except ValueError:
pass
@raises(ValueError)
def test_03(self):
"""
Add a duplicate key
"""
kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
with pytest.raises(ValueError):
kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
def test_04(self):
"""
@@ -105,12 +101,12 @@ class test_keyring(object):
kernel_keyring.del_key(TEST_KEY)
@raises(ValueError)
def test_05(self):
"""
Read a non-existent key
"""
with pytest.raises(ValueError):
kernel_keyring.read_key(TEST_KEY)
result = kernel_keyring.read_key(TEST_KEY)
def test_06(self):
"""
@@ -154,13 +150,3 @@ class test_keyring(object):
assert(result == SIZE_1024.encode('ascii'))
kernel_keyring.del_key(TEST_KEY)
def test_10(self):
"""
Test a unicode key
"""
kernel_keyring.add_key(TEST_UNICODEKEY, TEST_VALUE)
result = kernel_keyring.read_key(TEST_UNICODEKEY)
assert(result == TEST_VALUE)
kernel_keyring.del_key(TEST_UNICODEKEY)

View File

@@ -0,0 +1,55 @@
# Copyright (C) 2015 FreeIPA Project Contributors - see LICENSE file
from __future__ import print_function
from ipapython.secrets.store import iSecStore, NAME_DB_MAP, NSSCertDB
import os
import shutil
import subprocess
import unittest
def _test_password_callback():
with open('test-ipa-sec-store/pwfile') as f:
password = f.read()
return password
class TestiSecStore(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
shutil.rmtree('test-ipa-sec-store')
except Exception: # pylint: disable=broad-except
pass
testdir = 'test-ipa-sec-store'
pwfile = os.path.join(testdir, 'pwfile')
os.mkdir(testdir)
with open(pwfile, 'w') as f:
f.write('testpw')
cls.certdb = os.path.join(testdir, 'certdb')
os.mkdir(cls.certdb)
cls.cert2db = os.path.join(testdir, 'cert2db')
os.mkdir(cls.cert2db)
seedfile = os.path.join(testdir, 'seedfile')
with open(seedfile, 'wb') as f:
seed = os.urandom(1024)
f.write(seed)
subprocess.call(['certutil', '-d', cls.certdb, '-N', '-f', pwfile])
subprocess.call(['certutil', '-d', cls.cert2db, '-N', '-f', pwfile])
subprocess.call(['certutil', '-d', cls.certdb, '-S', '-f', pwfile,
'-s', 'CN=testCA', '-n', 'testCACert', '-x',
'-t', 'CT,C,C', '-m', '1', '-z', seedfile])
def test_iSecStore(self):
iss = iSecStore({})
NAME_DB_MAP['test'] = {
'type': 'NSSDB',
'path': self.certdb,
'handler': NSSCertDB,
'pwcallback': _test_password_callback,
}
value = iss.get('keys/test/testCACert')
NAME_DB_MAP['test']['path'] = self.cert2db
iss.set('keys/test/testCACert', value)

View File

@@ -1,40 +0,0 @@
#
# Copyright (C) 2017 FreeIPA Contributors see COPYING for license
#
"""
Test the `session_storage.py` module.
"""
import pytest
from ipapython import session_storage
@pytest.mark.skip_ipaclient_unittest
@pytest.mark.needs_ipaapi
class test_session_storage(object):
"""
Test the session storage interface
"""
def setup(self):
# TODO: set up test user and kinit to it
# tmpdir = tempfile.mkdtemp(prefix = "tmp-")
# os.environ['KRB5CCNAME'] = 'FILE:%s/ccache' % tmpdir
self.principal = 'admin'
self.key = 'X-IPA-test-session-storage'
self.data = b'Test Data'
def test_01(self):
session_storage.store_data(self.principal, self.key, self.data)
def test_02(self):
data = session_storage.get_data(self.principal, self.key)
assert(data == self.data)
def test_03(self):
session_storage.remove_data(self.principal, self.key)
try:
session_storage.get_data(self.principal, self.key)
except session_storage.KRB5Error:
pass

View File

@@ -23,6 +23,7 @@ Test the `ipapython/ssh.py` module.
import base64
import six
import nose
import pytest
from ipapython import ssh
@@ -32,56 +33,61 @@ if six.PY3:
pytestmark = pytest.mark.tier0
b64 = 'AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L'
raw = base64.b64decode(b64)
openssh = 'ssh-rsa %s' % b64
def make_public_key_checker(pk, out):
def check_public_key():
try:
parsed = ssh.SSHPublicKey(pk)
assert parsed.openssh() == out
except Exception as e:
assert type(e) is out
check_public_key.description = "Test SSH public key parsing (%s)" % repr(pk)
return check_public_key
@pytest.mark.parametrize("pk,out", [
(b'\xff', UnicodeDecodeError),
(u'\xff', ValueError),
def test_public_key_parsing():
b64 = 'AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L'
raw = base64.b64decode(b64)
openssh = 'ssh-rsa %s' % b64
(raw, openssh),
(b'\0\0\0\x04none', u'none AAAABG5vbmU='),
(b'\0\0\0', ValueError),
(b'\0\0\0\0', ValueError),
(b'\0\0\0\x01', ValueError),
(b'\0\0\0\x01\xff', ValueError),
pks = [
(b'\xff', UnicodeDecodeError),
(u'\xff', ValueError),
(u'\0\0\0\x04none', ValueError),
(u'\0\0\0', ValueError),
(u'\0\0\0\0', ValueError),
(u'\0\0\0\x01', ValueError),
(u'\0\0\0\x01\xff', ValueError),
(raw, openssh),
(b'\0\0\0\x04none', u'none AAAABG5vbmU='),
(b'\0\0\0', ValueError),
(b'\0\0\0\0', ValueError),
(b'\0\0\0\x01', ValueError),
(b'\0\0\0\x01\xff', ValueError),
(b64, openssh),
(unicode(b64), openssh),
(b64.encode('ascii'), openssh),
(u'\n%s\n\n' % b64, openssh),
(u'AAAABG5vbmU=', u'none AAAABG5vbmU='),
(u'AAAAB', ValueError),
(u'\0\0\0\x04none', ValueError),
(u'\0\0\0', ValueError),
(u'\0\0\0\0', ValueError),
(u'\0\0\0\x01', ValueError),
(u'\0\0\0\x01\xff', ValueError),
(openssh, openssh),
(unicode(openssh), openssh),
(openssh.encode('ascii'), openssh),
(u'none AAAABG5vbmU=', u'none AAAABG5vbmU='),
(u'\t \t ssh-rsa \t \t%s\t \tthis is a comment\t \t ' % b64,
u'%s this is a comment' % openssh),
(u'opt3,opt2="\tx ",opt1,opt2="\\"x " %s comment ' % openssh,
u'opt1,opt2="\\"x ",opt3 %s comment' % openssh),
(u'ssh-rsa\n%s' % b64, ValueError),
(u'ssh-rsa\t%s' % b64, ValueError),
(u'vanitas %s' % b64, ValueError),
(u'@opt %s' % openssh, ValueError),
(u'opt=val %s' % openssh, ValueError),
(u'opt, %s' % openssh, ValueError)],
# ids=repr is workaround for pytest issue with NULL bytes,
# see https://github.com/pytest-dev/pytest/issues/2644
ids=repr
)
def test_public_key_parsing(pk, out):
if isinstance(out, type) and issubclass(out, Exception):
pytest.raises(out, ssh.SSHPublicKey, pk)
else:
parsed = ssh.SSHPublicKey(pk)
assert parsed.openssh() == out
(b64, openssh),
(unicode(b64), openssh),
(b64.encode('ascii'), openssh),
(u'\n%s\n\n' % b64, openssh),
(u'AAAABG5vbmU=', u'none AAAABG5vbmU='),
(u'AAAAB', ValueError),
(openssh, openssh),
(unicode(openssh), openssh),
(openssh.encode('ascii'), openssh),
(u'none AAAABG5vbmU=', u'none AAAABG5vbmU='),
(u'\t \t ssh-rsa \t \t%s\t \tthis is a comment\t \t ' % b64,
u'%s this is a comment' % openssh),
(u'opt3,opt2="\tx ",opt1,opt2="\\"x " %s comment ' % openssh,
u'opt1,opt2="\\"x ",opt3 %s comment' % openssh),
(u'ssh-rsa\n%s' % b64, ValueError),
(u'ssh-rsa\t%s' % b64, ValueError),
(u'vanitas %s' % b64, ValueError),
(u'@opt %s' % openssh, ValueError),
(u'opt=val %s' % openssh, ValueError),
(u'opt, %s' % openssh, ValueError),
]
for pk in pks:
yield make_public_key_checker(*pk)