Imported Upstream version 4.7.2

This commit is contained in:
Mario Fetka
2021-08-09 20:54:00 +02:00
parent 3bfaa6e020
commit a791de49a2
2175 changed files with 1764288 additions and 331861 deletions

View File

@@ -0,0 +1,169 @@
from __future__ import absolute_import
import os
import pytest
from ipapython.certdb import NSSDatabase, TRUSTED_PEER_TRUST_FLAGS
from ipaplatform.osinfo import osinfo
CERTNICK = 'testcert'
if osinfo.id == 'fedora':
if int(osinfo.version_id) >= 28:
NSS_DEFAULT = 'sql'
else:
NSS_DEFAULT = 'dbm'
else:
NSS_DEFAULT = None
def create_selfsigned(nssdb):
# create self-signed cert + key
noisefile = os.path.join(nssdb.secdir, 'noise')
with open(noisefile, 'wb') as f:
f.write(os.urandom(64))
try:
nssdb.run_certutil([
'-S', '-x',
'-z', noisefile,
'-k', 'rsa', '-g', '2048', '-Z', 'SHA256',
'-t', 'CTu,Cu,Cu',
'-s', 'CN=testcert',
'-n', CERTNICK,
'-m', '365',
])
finally:
os.unlink(noisefile)
def test_dbm_tmp():
with NSSDatabase(dbtype='dbm') as nssdb:
assert nssdb.dbtype == 'dbm'
for filename in nssdb.filenames:
assert not os.path.isfile(filename)
assert not nssdb.exists()
nssdb.create_db()
for filename in nssdb.filenames:
assert os.path.isfile(filename)
assert os.path.dirname(filename) == nssdb.secdir
assert nssdb.exists()
assert os.path.basename(nssdb.certdb) == 'cert8.db'
assert nssdb.certdb in nssdb.filenames
assert os.path.basename(nssdb.keydb) == 'key3.db'
assert os.path.basename(nssdb.secmod) == 'secmod.db'
def test_sql_tmp():
with NSSDatabase(dbtype='sql') as nssdb:
assert nssdb.dbtype == 'sql'
for filename in nssdb.filenames:
assert not os.path.isfile(filename)
assert not nssdb.exists()
nssdb.create_db()
for filename in nssdb.filenames:
assert os.path.isfile(filename)
assert os.path.dirname(filename) == nssdb.secdir
assert nssdb.exists()
assert os.path.basename(nssdb.certdb) == 'cert9.db'
assert nssdb.certdb in nssdb.filenames
assert os.path.basename(nssdb.keydb) == 'key4.db'
assert os.path.basename(nssdb.secmod) == 'pkcs11.txt'
def test_convert_db():
with NSSDatabase(dbtype='dbm') as nssdb:
assert nssdb.dbtype == 'dbm'
nssdb.create_db()
assert nssdb.exists()
create_selfsigned(nssdb)
oldcerts = nssdb.list_certs()
assert len(oldcerts) == 1
oldkeys = nssdb.list_keys()
assert len(oldkeys) == 1
nssdb.convert_db()
assert nssdb.exists()
assert nssdb.dbtype == 'sql'
newcerts = nssdb.list_certs()
assert len(newcerts) == 1
assert newcerts == oldcerts
newkeys = nssdb.list_keys()
assert len(newkeys) == 1
assert newkeys == oldkeys
for filename in nssdb.filenames:
assert os.path.isfile(filename)
assert os.path.dirname(filename) == nssdb.secdir
assert os.path.basename(nssdb.certdb) == 'cert9.db'
assert nssdb.certdb in nssdb.filenames
assert os.path.basename(nssdb.keydb) == 'key4.db'
assert os.path.basename(nssdb.secmod) == 'pkcs11.txt'
def test_convert_db_nokey():
with NSSDatabase(dbtype='dbm') as nssdb:
assert nssdb.dbtype == 'dbm'
nssdb.create_db()
create_selfsigned(nssdb)
assert len(nssdb.list_certs()) == 1
assert len(nssdb.list_keys()) == 1
# remove key, readd cert
cert = nssdb.get_cert(CERTNICK)
nssdb.run_certutil(['-F', '-n', CERTNICK])
nssdb.add_cert(cert, CERTNICK, TRUSTED_PEER_TRUST_FLAGS)
assert len(nssdb.list_keys()) == 0
oldcerts = nssdb.list_certs()
assert len(oldcerts) == 1
nssdb.convert_db()
assert nssdb.dbtype == 'sql'
newcerts = nssdb.list_certs()
assert len(newcerts) == 1
assert newcerts == oldcerts
assert nssdb.get_cert(CERTNICK) == cert
newkeys = nssdb.list_keys()
assert newkeys == ()
for filename in nssdb.filenames:
assert os.path.isfile(filename)
assert os.path.dirname(filename) == nssdb.secdir
old = os.path.join(nssdb.secdir, 'cert8.db')
assert not os.path.isfile(old)
assert os.path.isfile(old + '.migrated')
assert os.path.basename(nssdb.certdb) == 'cert9.db'
assert nssdb.certdb in nssdb.filenames
assert os.path.basename(nssdb.keydb) == 'key4.db'
assert os.path.basename(nssdb.secmod) == 'pkcs11.txt'
def test_auto_db():
with NSSDatabase() as nssdb:
assert nssdb.dbtype == 'auto'
assert nssdb.filenames is None
assert not nssdb.exists()
with pytest.raises(RuntimeError):
nssdb.list_certs()
nssdb.create_db()
assert nssdb.dbtype in ('dbm', 'sql')
if NSS_DEFAULT is not None:
assert nssdb.dbtype == NSS_DEFAULT
assert nssdb.filenames is not None
assert nssdb.exists()
nssdb.list_certs()

View File

@@ -18,12 +18,15 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import time
import datetime
import email.utils
import calendar
from ipapython.cookie import Cookie
import pytest
pytestmark = pytest.mark.tier0
class TestParse(unittest.TestCase):
def test_parse(self):
@@ -47,6 +50,22 @@ class TestParse(unittest.TestCase):
with self.assertRaises(ValueError):
cookies = Cookie.parse(s)
# 1 cookie with empty value
s = 'color='
cookies = Cookie.parse(s)
self.assertEqual(len(cookies), 1)
cookie = cookies[0]
self.assertEqual(cookie.key, 'color')
self.assertEqual(cookie.value, '')
self.assertEqual(cookie.domain, None)
self.assertEqual(cookie.path, None)
self.assertEqual(cookie.max_age, None)
self.assertEqual(cookie.expires, None)
self.assertEqual(cookie.secure, None)
self.assertEqual(cookie.httponly, None)
self.assertEqual(str(cookie), "color=")
self.assertEqual(cookie.http_cookie(), "color=;")
# 1 cookie with name/value
s = 'color=blue'
cookies = Cookie.parse(s)
@@ -260,7 +279,7 @@ class TestInvalidAttributes(unittest.TestCase):
# Invalid Max-Age
s = 'color=blue; Max-Age=over-the-hill'
with self.assertRaises(ValueError):
cookies = Cookie.parse(s)
Cookie.parse(s)
cookie = Cookie('color', 'blue')
with self.assertRaises(ValueError):
@@ -269,7 +288,7 @@ class TestInvalidAttributes(unittest.TestCase):
# Invalid Expires
s = 'color=blue; Expires=Sun, 06 Xxx 1994 08:49:37 GMT'
with self.assertRaises(ValueError):
cookies = Cookie.parse(s)
Cookie.parse(s)
cookie = Cookie('color', 'blue')
with self.assertRaises(ValueError):

View File

@@ -0,0 +1,177 @@
#
# Copyright (C) 2017 FreeIPA Contributors. See COPYING for license
#
from __future__ import absolute_import
import os
import shutil
import tempfile
import pytest
from ipapython import directivesetter
EXAMPLE_CONFIG = [
'foo=1\n',
'foobar=2\n',
]
WHITESPACE_CONFIG = [
'foo 1\n',
'foobar\t2\n',
]
@pytest.fixture
def tempdir(request):
tempdir = tempfile.mkdtemp()
def fin():
shutil.rmtree(tempdir)
request.addfinalizer(fin)
return tempdir
class test_set_directive_lines(object):
def test_remove_directive(self):
lines = directivesetter.set_directive_lines(
False, '=', 'foo', None, EXAMPLE_CONFIG, comment="#")
assert list(lines) == ['foobar=2\n']
def test_add_directive(self):
lines = directivesetter.set_directive_lines(
False, '=', 'baz', '4', EXAMPLE_CONFIG, comment="#")
assert list(lines) == ['foo=1\n', 'foobar=2\n', 'baz=4\n']
def test_set_directive_does_not_clobber_suffix_key(self):
lines = directivesetter.set_directive_lines(
False, '=', 'foo', '3', EXAMPLE_CONFIG, comment="#")
assert list(lines) == ['foo=3\n', 'foobar=2\n']
class test_set_directive_lines_whitespace(object):
def test_remove_directive(self):
lines = directivesetter.set_directive_lines(
False, ' ', 'foo', None, WHITESPACE_CONFIG, comment="#")
assert list(lines) == ['foobar\t2\n']
def test_add_directive(self):
lines = directivesetter.set_directive_lines(
False, ' ', 'baz', '4', WHITESPACE_CONFIG, comment="#")
assert list(lines) == ['foo 1\n', 'foobar\t2\n', 'baz 4\n']
def test_set_directive_does_not_clobber_suffix_key(self):
lines = directivesetter.set_directive_lines(
False, ' ', 'foo', '3', WHITESPACE_CONFIG, comment="#")
assert list(lines) == ['foo 3\n', 'foobar\t2\n']
def test_set_directive_with_tab(self):
lines = directivesetter.set_directive_lines(
False, ' ', 'foobar', '6', WHITESPACE_CONFIG, comment="#")
assert list(lines) == ['foo 1\n', 'foobar 6\n']
class test_set_directive(object):
def test_set_directive(self):
"""Check that set_directive writes the new data and preserves mode."""
fd, filename = tempfile.mkstemp()
try:
os.close(fd)
stat_pre = os.stat(filename)
with open(filename, 'w') as f:
for line in EXAMPLE_CONFIG:
f.write(line)
directivesetter.set_directive(
filename, 'foo', '3', False, '=', "#")
stat_post = os.stat(filename)
with open(filename, 'r') as f:
lines = list(f)
assert lines == ['foo=3\n', 'foobar=2\n']
assert stat_pre.st_mode == stat_post.st_mode
assert stat_pre.st_uid == stat_post.st_uid
assert stat_pre.st_gid == stat_post.st_gid
finally:
os.remove(filename)
class test_get_directive(object):
def test_get_directive(self, tmpdir):
configfile = tmpdir.join('config')
configfile.write(''.join(EXAMPLE_CONFIG))
assert '1' == directivesetter.get_directive(str(configfile),
'foo',
separator='=')
assert '2' == directivesetter.get_directive(str(configfile),
'foobar',
separator='=')
class test_get_directive_whitespace(object):
def test_get_directive(self, tmpdir):
configfile = tmpdir.join('config')
configfile.write(''.join(WHITESPACE_CONFIG))
assert '1' == directivesetter.get_directive(str(configfile),
'foo')
assert '2' == directivesetter.get_directive(str(configfile),
'foobar')
def test_directivesetter(tempdir):
filename = os.path.join(tempdir, 'example.conf')
with open(filename, 'w') as f:
for line in EXAMPLE_CONFIG:
f.write(line)
ds = directivesetter.DirectiveSetter(filename)
assert ds.lines is None
with ds:
assert ds.lines == EXAMPLE_CONFIG
ds.set('foo', '3') # quoted, space separated, doesn't change 'foo='
ds.set('foobar', None, separator='=') # remove
ds.set('baz', '4', False, '=') # add
ds.setitems([
('list1', 'value1'),
('list2', 'value2'),
])
ds.setitems({
'dict1': 'value1',
'dict2': 'value2',
})
with open(filename, 'r') as f:
lines = list(f)
assert lines == [
'foo=1\n',
'foo "3"\n',
'baz=4\n',
'list1 "value1"\n',
'list2 "value2"\n',
'dict1 "value1"\n',
'dict2 "value2"\n',
]
with directivesetter.DirectiveSetter(filename, True, '=') as ds:
ds.set('foo', '4') # doesn't change 'foo '
with open(filename, 'r') as f:
lines = list(f)
assert lines == [
'foo="4"\n',
'foo "3"\n',
'baz=4\n',
'list1 "value1"\n',
'list2 "value2"\n',
'dict1 "value1"\n',
'dict2 "value2"\n',
]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,106 @@
#
# Copyright (C) 2018 FreeIPA Contributors. See COPYING for license
#
import dns.name
import dns.rdataclass
import dns.rdatatype
from dns.rdtypes.IN.SRV import SRV
from dns.rdtypes.ANY.URI import URI
from ipapython import dnsutil
import pytest
def mksrv(priority, weight, port, target):
return SRV(
rdclass=dns.rdataclass.IN,
rdtype=dns.rdatatype.SRV,
priority=priority,
weight=weight,
port=port,
target=dns.name.from_text(target)
)
def mkuri(priority, weight, target):
return URI(
rdclass=dns.rdataclass.IN,
rdtype=dns.rdatatype.URI,
priority=priority,
weight=weight,
target=target
)
class TestSortSRV(object):
def test_empty(self):
assert dnsutil.sort_prio_weight([]) == []
def test_one(self):
h1 = mksrv(1, 0, 443, u"host1")
assert dnsutil.sort_prio_weight([h1]) == [h1]
h2 = mksrv(10, 5, 443, u"host2")
assert dnsutil.sort_prio_weight([h2]) == [h2]
def test_prio(self):
h1 = mksrv(1, 0, 443, u"host1")
h2 = mksrv(2, 0, 443, u"host2")
h3 = mksrv(3, 0, 443, u"host3")
assert dnsutil.sort_prio_weight([h3, h2, h1]) == [h1, h2, h3]
assert dnsutil.sort_prio_weight([h3, h3, h3]) == [h3]
assert dnsutil.sort_prio_weight([h2, h2, h1, h1]) == [h1, h2]
h380 = mksrv(4, 0, 80, u"host3")
assert dnsutil.sort_prio_weight([h1, h3, h380]) == [h1, h3, h380]
hs = mksrv(-1, 0, 443, u"special")
assert dnsutil.sort_prio_weight([h1, h2, hs]) == [hs, h1, h2]
def assert_permutations(self, answers, permutations):
seen = set()
for _unused in range(1000):
result = tuple(dnsutil.sort_prio_weight(answers))
assert result in permutations
seen.add(result)
if seen == permutations:
break
else:
pytest.fail("sorting didn't exhaust all permutations.")
def test_sameprio(self):
h1 = mksrv(1, 0, 443, u"host1")
h2 = mksrv(1, 0, 443, u"host2")
permutations = {
(h1, h2),
(h2, h1),
}
self.assert_permutations([h1, h2], permutations)
def test_weight(self):
h1 = mksrv(1, 0, 443, u"host1")
h2_w15 = mksrv(2, 15, 443, u"host2")
h3_w10 = mksrv(2, 10, 443, u"host3")
permutations = {
(h1, h2_w15, h3_w10),
(h1, h3_w10, h2_w15),
}
self.assert_permutations([h1, h2_w15, h3_w10], permutations)
def test_large(self):
records = tuple(
mksrv(1, i, 443, "host{}".format(i)) for i in range(1000)
)
assert len(dnsutil.sort_prio_weight(records)) == len(records)
class TestSortURI(object):
def test_prio(self):
h1 = mkuri(1, 0, u"https://host1/api")
h2 = mkuri(2, 0, u"https://host2/api")
h3 = mkuri(3, 0, u"https://host3/api")
assert dnsutil.sort_prio_weight([h3, h2, h1]) == [h1, h2, h3]
assert dnsutil.sort_prio_weight([h3, h3, h3]) == [h3]
assert dnsutil.sort_prio_weight([h2, h2, h1, h1]) == [h1, h2]

View File

@@ -1,3 +1,5 @@
# encoding: utf-8
# Authors:
# Jan Cholasta <jcholast@redhat.com>
#
@@ -19,54 +21,67 @@
"""
Test the `ipapython/ipautil.py` module.
"""
from __future__ import absolute_import
import nose
import os
import pwd
import socket
import sys
import tempfile
import pytest
import six
from ipalib.constants import IPAAPI_USER
from ipaplatform.paths import paths
from ipapython import ipautil
class CheckIPAddress:
def __init__(self, addr):
self.description = "Test IP address parsing and verification (%s)" % addr
pytestmark = pytest.mark.tier0
def __call__(self, addr, words=None, prefixlen=None):
try:
ip = ipautil.CheckedIPAddress(addr, match_local=False)
assert ip.words == words and ip.prefixlen == prefixlen
except:
assert words is None and prefixlen is None
def test_ip_address():
addrs = [
('10.11.12.13', (10, 11, 12, 13), 8),
('10.11.12.13/14', (10, 11, 12, 13), 14),
('10.11.12.13%zoneid',),
('10.11.12.13%zoneid/14',),
('10.11.12.1337',),
('10.11.12.13/33',),
('127.0.0.1',),
('241.1.2.3',),
('169.254.1.2',),
('10.11.12.0/24',),
('224.5.6.7',),
('10.11.12.255/24',),
def assert_equal(a, b):
__tracebackhide__ = True
assert a == b
('2001::1', (0x2001, 0, 0, 0, 0, 0, 0, 1), 64),
('2001::1/72', (0x2001, 0, 0, 0, 0, 0, 0, 1), 72),
('2001::1%zoneid', (0x2001, 0, 0, 0, 0, 0, 0, 1), 64),
('2001::1%zoneid/72',),
('2001::1beef',),
('2001::1/129',),
('::1',),
('6789::1',),
('fe89::1',),
('2001::/64',),
('ff01::1',),
('junk',)
]
for addr in addrs:
yield (CheckIPAddress(addr[0]),) + addr
@pytest.mark.parametrize("addr,words,prefixlen", [
('0.0.0.0/0', None, None),
('10.11.12.13', (10, 11, 12, 13), 8),
('10.11.12.13/14', (10, 11, 12, 13), 14),
('10.11.12.13%zoneid', None, None),
('10.11.12.13%zoneid/14', None, None),
('10.11.12.1337', None, None),
('10.11.12.13/33', None, None),
('127.0.0.1', None, None),
('241.1.2.3', None, None),
('169.254.1.2', None, None),
('10.11.12.0/24', (10, 11, 12, 0), 24),
('10.0.0.255', (10, 0, 0, 255), 8),
('224.5.6.7', None, None),
('10.11.12.255/24', (10, 11, 12, 255), 24),
('255.255.255.255', None, None),
('::/0', None, None),
('2001::1', (0x2001, 0, 0, 0, 0, 0, 0, 1), 64),
('2001::1/72', (0x2001, 0, 0, 0, 0, 0, 0, 1), 72),
('2001::1%zoneid', (0x2001, 0, 0, 0, 0, 0, 0, 1), 64),
('2001::1%zoneid/72', None, None),
('2001::1beef', None, None),
('2001::1/129', None, None),
('::1', None, None),
('6789::1', None, None),
('fe89::1', None, None),
('2001::/64', (0x2001, 0, 0, 0, 0, 0, 0, 0), 64),
('ff01::1', None, None),
('junk', None, None)
])
def test_ip_address(addr, words, prefixlen):
if words is None:
pytest.raises(
ValueError, ipautil.CheckedIPAddress, addr)
else:
ip = ipautil.CheckedIPAddress(addr)
assert ip.words == words
assert ip.prefixlen == prefixlen
class TestCIDict(object):
@@ -89,53 +104,63 @@ class TestCIDict(object):
assert dict(cidict.items()) == {'a': 2, 'b': 3, 'C': 4}
def test_len(self):
nose.tools.assert_equal(3, len(self.cidict))
assert_equal(3, len(self.cidict))
def test_getitem(self):
nose.tools.assert_equal("val1", self.cidict["Key1"])
nose.tools.assert_equal("val1", self.cidict["key1"])
nose.tools.assert_equal("val2", self.cidict["KEY2"])
nose.tools.assert_equal("VAL3", self.cidict["key3"])
nose.tools.assert_equal("VAL3", self.cidict["KEY3"])
with nose.tools.assert_raises(KeyError):
self.cidict["key4"]
assert_equal("val1", self.cidict["Key1"])
assert_equal("val1", self.cidict["key1"])
assert_equal("val2", self.cidict["KEY2"])
assert_equal("VAL3", self.cidict["key3"])
assert_equal("VAL3", self.cidict["KEY3"])
with pytest.raises(KeyError):
self.cidict["key4"] # pylint: disable=pointless-statement
def test_get(self):
nose.tools.assert_equal("val1", self.cidict.get("Key1"))
nose.tools.assert_equal("val1", self.cidict.get("key1"))
nose.tools.assert_equal("val2", self.cidict.get("KEY2"))
nose.tools.assert_equal("VAL3", self.cidict.get("key3"))
nose.tools.assert_equal("VAL3", self.cidict.get("KEY3"))
nose.tools.assert_equal("default", self.cidict.get("key4", "default"))
assert_equal("val1", self.cidict.get("Key1"))
assert_equal("val1", self.cidict.get("key1"))
assert_equal("val2", self.cidict.get("KEY2"))
assert_equal("VAL3", self.cidict.get("key3"))
assert_equal("VAL3", self.cidict.get("KEY3"))
assert_equal("default", self.cidict.get("key4", "default"))
def test_setitem(self):
self.cidict["key4"] = "val4"
nose.tools.assert_equal("val4", self.cidict["key4"])
assert_equal("val4", self.cidict["key4"])
self.cidict["KEY4"] = "newval4"
nose.tools.assert_equal("newval4", self.cidict["key4"])
assert_equal("newval4", self.cidict["key4"])
def test_del(self):
assert self.cidict.has_key("Key1")
assert "Key1" in self.cidict
del(self.cidict["Key1"])
assert not self.cidict.has_key("Key1")
assert "Key1" not in self.cidict
assert self.cidict.has_key("key2")
assert "key2" in self.cidict
del(self.cidict["KEY2"])
assert not self.cidict.has_key("key2")
assert "key2" not in self.cidict
def test_clear(self):
nose.tools.assert_equal(3, len(self.cidict))
assert_equal(3, len(self.cidict))
self.cidict.clear()
nose.tools.assert_equal(0, len(self.cidict))
assert_equal(0, len(self.cidict))
assert self.cidict == {}
assert list(self.cidict) == []
assert list(self.cidict.values()) == []
assert list(self.cidict.items()) == []
if six.PY2:
assert self.cidict.keys() == []
assert self.cidict.values() == []
assert self.cidict.items() == []
assert self.cidict._keys == {}
def test_copy(self):
copy = self.cidict.copy()
assert copy == self.cidict
nose.tools.assert_equal(3, len(copy))
assert copy.has_key("Key1")
assert copy.has_key("key1")
nose.tools.assert_equal("val1", copy["Key1"])
assert_equal(3, len(copy))
assert "Key1" in copy
assert "key1" in copy
assert_equal("val1", copy["Key1"])
@pytest.mark.skipif(not six.PY2, reason="Python 2 only")
def test_haskey(self):
assert self.cidict.has_key("KEY1")
assert self.cidict.has_key("key2")
@@ -151,26 +176,27 @@ class TestCIDict(object):
assert "Key4" not in self.cidict
def test_items(self):
items = self.cidict.items()
nose.tools.assert_equal(3, len(items))
items = list(self.cidict.items())
assert_equal(3, len(items))
items_set = set(items)
assert ("Key1", "val1") in items_set
assert ("key2", "val2") in items_set
assert ("KEY3", "VAL3") in items_set
assert self.cidict.items() == list(self.cidict.iteritems()) == zip(
self.cidict.iterkeys(), self.cidict.itervalues())
# pylint: disable=dict-iter-method
# pylint: disable=dict-keys-not-iterating, dict-values-not-iterating
assert list(self.cidict.items()) == list(self.cidict.iteritems()) == list(zip(
self.cidict.keys(), self.cidict.values()))
def test_iter(self):
items = []
assert list(self.cidict) == list(self.cidict.keys())
assert sorted(self.cidict) == sorted(['Key1', 'key2', 'KEY3'])
def test_iteritems(self):
items = []
for (k,v) in self.cidict.iteritems():
items.append((k,v))
nose.tools.assert_equal(3, len(items))
# pylint: disable=dict-iter-method
for k, v in self.cidict.iteritems():
items.append((k, v))
assert_equal(3, len(items))
items_set = set(items)
assert ("Key1", "val1") in items_set
assert ("key2", "val2") in items_set
@@ -178,9 +204,10 @@ class TestCIDict(object):
def test_iterkeys(self):
keys = []
# pylint: disable=dict-iter-method
for k in self.cidict.iterkeys():
keys.append(k)
nose.tools.assert_equal(3, len(keys))
assert_equal(3, len(keys))
keys_set = set(keys)
assert "Key1" in keys_set
assert "key2" in keys_set
@@ -188,42 +215,43 @@ class TestCIDict(object):
def test_itervalues(self):
values = []
# pylint: disable=dict-iter-method
for k in self.cidict.itervalues():
values.append(k)
nose.tools.assert_equal(3, len(values))
assert_equal(3, len(values))
values_set = set(values)
assert "val1" in values_set
assert "val2" in values_set
assert "VAL3" in values_set
def test_keys(self):
keys = self.cidict.keys()
nose.tools.assert_equal(3, len(keys))
keys = list(self.cidict.keys())
assert_equal(3, len(keys))
keys_set = set(keys)
assert "Key1" in keys_set
assert "key2" in keys_set
assert "KEY3" in keys_set
assert self.cidict.keys() == list(self.cidict.iterkeys())
# pylint: disable=dict-iter-method
assert list(self.cidict.keys()) == list(self.cidict.iterkeys())
def test_values(self):
values = self.cidict.values()
nose.tools.assert_equal(3, len(values))
values = list(self.cidict.values())
assert_equal(3, len(values))
values_set = set(values)
assert "val1" in values_set
assert "val2" in values_set
assert "VAL3" in values_set
assert self.cidict.values() == list(self.cidict.itervalues())
# pylint: disable=dict-iter-method
assert list(self.cidict.values()) == list(self.cidict.itervalues())
def test_update(self):
newdict = { "KEY2": "newval2",
"key4": "val4" }
self.cidict.update(newdict)
nose.tools.assert_equal(4, len(self.cidict))
assert_equal(4, len(self.cidict))
items = self.cidict.items()
nose.tools.assert_equal(4, len(items))
items = list(self.cidict.items())
assert_equal(4, len(items))
items_set = set(items)
assert ("Key1", "val1") in items_set
# note the update "overwrites" the case of the key2
@@ -244,15 +272,15 @@ class TestCIDict(object):
'Key1': 'val1', 'key2': 'val2', 'KEY3': 'VAL3'}
def test_update_duplicate_values_dict(self):
with nose.tools.assert_raises(ValueError):
with pytest.raises(ValueError):
self.cidict.update({'a': 'va', 'A': None, 'b': 3})
def test_update_duplicate_values_list(self):
with nose.tools.assert_raises(ValueError):
with pytest.raises(ValueError):
self.cidict.update([('a', 'va'), ('A', None), ('b', 3)])
def test_update_duplicate_values_kwargs(self):
with nose.tools.assert_raises(ValueError):
with pytest.raises(ValueError):
self.cidict.update(a='va', A=None, b=3)
def test_update_kwargs(self):
@@ -261,60 +289,52 @@ class TestCIDict(object):
'b': 'vb', 'Key1': 'val1', 'key2': 'val2', 'KEY3': 'VAL3'}
def test_setdefault(self):
nose.tools.assert_equal("val1", self.cidict.setdefault("KEY1", "default"))
assert_equal("val1", self.cidict.setdefault("KEY1", "default"))
assert not self.cidict.has_key("KEY4")
nose.tools.assert_equal("default", self.cidict.setdefault("KEY4", "default"))
assert self.cidict.has_key("KEY4")
nose.tools.assert_equal("default", self.cidict["key4"])
assert "KEY4" not in self.cidict
assert_equal("default", self.cidict.setdefault("KEY4", "default"))
assert "KEY4" in self.cidict
assert_equal("default", self.cidict["key4"])
assert not self.cidict.has_key("KEY5")
nose.tools.assert_equal(None, self.cidict.setdefault("KEY5"))
assert self.cidict.has_key("KEY5")
nose.tools.assert_equal(None, self.cidict["key5"])
assert "KEY5" not in self.cidict
assert_equal(None, self.cidict.setdefault("KEY5"))
assert "KEY5" in self.cidict
assert_equal(None, self.cidict["key5"])
def test_pop(self):
nose.tools.assert_equal("val1", self.cidict.pop("KEY1", "default"))
assert not self.cidict.has_key("key1")
assert_equal("val1", self.cidict.pop("KEY1", "default"))
assert "key1" not in self.cidict
nose.tools.assert_equal("val2", self.cidict.pop("KEY2"))
assert not self.cidict.has_key("key2")
assert_equal("val2", self.cidict.pop("KEY2"))
assert "key2" not in self.cidict
nose.tools.assert_equal("default", self.cidict.pop("key4", "default"))
with nose.tools.assert_raises(KeyError):
assert_equal("default", self.cidict.pop("key4", "default"))
with pytest.raises(KeyError):
self.cidict.pop("key4")
def test_popitem(self):
items = set(self.cidict.items())
nose.tools.assert_equal(3, len(self.cidict))
assert_equal(3, len(self.cidict))
item = self.cidict.popitem()
nose.tools.assert_equal(2, len(self.cidict))
assert_equal(2, len(self.cidict))
assert item in items
items.discard(item)
item = self.cidict.popitem()
nose.tools.assert_equal(1, len(self.cidict))
assert_equal(1, len(self.cidict))
assert item in items
items.discard(item)
item = self.cidict.popitem()
nose.tools.assert_equal(0, len(self.cidict))
assert_equal(0, len(self.cidict))
assert item in items
items.discard(item)
def test_fromkeys(self):
dct = ipautil.CIDict.fromkeys(('A', 'b', 'C'))
assert sorted(dct.keys()) == sorted(['A', 'b', 'C'])
assert sorted(dct.values()) == [None] * 3
def test_clear(self):
self.cidict.clear()
assert self.cidict == {}
assert self.cidict.keys() == []
assert self.cidict.values() == []
assert self.cidict.items() == []
assert self.cidict._keys == {}
assert list(dct.values()) == [None] * 3
class TestTimeParser(object):
@@ -322,87 +342,238 @@ class TestTimeParser(object):
timestr = "20070803"
time = ipautil.parse_generalized_time(timestr)
nose.tools.assert_equal(2007, time.year)
nose.tools.assert_equal(8, time.month)
nose.tools.assert_equal(3, time.day)
nose.tools.assert_equal(0, time.hour)
nose.tools.assert_equal(0, time.minute)
nose.tools.assert_equal(0, time.second)
assert_equal(2007, time.year)
assert_equal(8, time.month)
assert_equal(3, time.day)
assert_equal(0, time.hour)
assert_equal(0, time.minute)
assert_equal(0, time.second)
def test_hour_min_sec(self):
timestr = "20051213141205"
time = ipautil.parse_generalized_time(timestr)
nose.tools.assert_equal(2005, time.year)
nose.tools.assert_equal(12, time.month)
nose.tools.assert_equal(13, time.day)
nose.tools.assert_equal(14, time.hour)
nose.tools.assert_equal(12, time.minute)
nose.tools.assert_equal(5, time.second)
assert_equal(2005, time.year)
assert_equal(12, time.month)
assert_equal(13, time.day)
assert_equal(14, time.hour)
assert_equal(12, time.minute)
assert_equal(5, time.second)
def test_fractions(self):
timestr = "2003092208.5"
time = ipautil.parse_generalized_time(timestr)
nose.tools.assert_equal(2003, time.year)
nose.tools.assert_equal(9, time.month)
nose.tools.assert_equal(22, time.day)
nose.tools.assert_equal(8, time.hour)
nose.tools.assert_equal(30, time.minute)
nose.tools.assert_equal(0, time.second)
assert_equal(2003, time.year)
assert_equal(9, time.month)
assert_equal(22, time.day)
assert_equal(8, time.hour)
assert_equal(30, time.minute)
assert_equal(0, time.second)
timestr = "199203301544,25"
time = ipautil.parse_generalized_time(timestr)
nose.tools.assert_equal(1992, time.year)
nose.tools.assert_equal(3, time.month)
nose.tools.assert_equal(30, time.day)
nose.tools.assert_equal(15, time.hour)
nose.tools.assert_equal(44, time.minute)
nose.tools.assert_equal(15, time.second)
assert_equal(1992, time.year)
assert_equal(3, time.month)
assert_equal(30, time.day)
assert_equal(15, time.hour)
assert_equal(44, time.minute)
assert_equal(15, time.second)
timestr = "20060401185912,8"
time = ipautil.parse_generalized_time(timestr)
nose.tools.assert_equal(2006, time.year)
nose.tools.assert_equal(4, time.month)
nose.tools.assert_equal(1, time.day)
nose.tools.assert_equal(18, time.hour)
nose.tools.assert_equal(59, time.minute)
nose.tools.assert_equal(12, time.second)
nose.tools.assert_equal(800000, time.microsecond)
assert_equal(2006, time.year)
assert_equal(4, time.month)
assert_equal(1, time.day)
assert_equal(18, time.hour)
assert_equal(59, time.minute)
assert_equal(12, time.second)
assert_equal(800000, time.microsecond)
def test_time_zones(self):
# pylint: disable=no-member
timestr = "20051213141205Z"
time = ipautil.parse_generalized_time(timestr)
nose.tools.assert_equal(0, time.tzinfo.houroffset)
nose.tools.assert_equal(0, time.tzinfo.minoffset)
assert_equal(0, time.tzinfo.houroffset)
assert_equal(0, time.tzinfo.minoffset)
offset = time.tzinfo.utcoffset(time.tzinfo.dst())
nose.tools.assert_equal(0, offset.seconds)
assert_equal(0, offset.seconds)
timestr = "20051213141205+0500"
time = ipautil.parse_generalized_time(timestr)
nose.tools.assert_equal(5, time.tzinfo.houroffset)
nose.tools.assert_equal(0, time.tzinfo.minoffset)
assert_equal(5, time.tzinfo.houroffset)
assert_equal(0, time.tzinfo.minoffset)
offset = time.tzinfo.utcoffset(time.tzinfo.dst())
nose.tools.assert_equal(5 * 60 * 60, offset.seconds)
assert_equal(5 * 60 * 60, offset.seconds)
timestr = "20051213141205-0500"
time = ipautil.parse_generalized_time(timestr)
nose.tools.assert_equal(-5, time.tzinfo.houroffset)
nose.tools.assert_equal(0, time.tzinfo.minoffset)
assert_equal(-5, time.tzinfo.houroffset)
assert_equal(0, time.tzinfo.minoffset)
# NOTE - the offset is always positive - it's minutes
# _east_ of UTC
offset = time.tzinfo.utcoffset(time.tzinfo.dst())
nose.tools.assert_equal((24 - 5) * 60 * 60, offset.seconds)
assert_equal((24 - 5) * 60 * 60, offset.seconds)
timestr = "20051213141205-0930"
time = ipautil.parse_generalized_time(timestr)
nose.tools.assert_equal(-9, time.tzinfo.houroffset)
nose.tools.assert_equal(-30, time.tzinfo.minoffset)
assert_equal(-9, time.tzinfo.houroffset)
assert_equal(-30, time.tzinfo.minoffset)
offset = time.tzinfo.utcoffset(time.tzinfo.dst())
nose.tools.assert_equal(((24 - 9) * 60 * 60) - (30 * 60), offset.seconds)
assert_equal(((24 - 9) * 60 * 60) - (30 * 60), offset.seconds)
def test_run():
result = ipautil.run([paths.ECHO, 'foo\x02bar'],
capture_output=True,
capture_error=True)
assert result.returncode == 0
assert result.output == 'foo\x02bar\n'
assert result.raw_output == b'foo\x02bar\n'
assert result.error_output == ''
assert result.raw_error_output == b''
def test_run_no_capture_output():
result = ipautil.run([paths.ECHO, 'foo\x02bar'])
assert result.returncode == 0
assert result.output is None
assert result.raw_output == b'foo\x02bar\n'
assert result.error_output is None
assert result.raw_error_output == b''
def test_run_bytes():
result = ipautil.run([paths.ECHO, b'\x01\x02'], capture_output=True)
assert result.returncode == 0
assert result.raw_output == b'\x01\x02\n'
def test_run_decode():
result = ipautil.run([paths.ECHO, u'á'.encode('utf-8')],
encoding='utf-8', capture_output=True)
assert result.returncode == 0
if six.PY3:
assert result.output == 'á\n'
else:
assert result.output == 'á\n'.encode('utf-8')
def test_run_decode_bad():
if six.PY3:
with pytest.raises(UnicodeDecodeError):
ipautil.run([paths.ECHO, b'\xa0\xa1'],
capture_output=True,
encoding='utf-8')
else:
result = ipautil.run([paths.ECHO, '\xa0\xa1'],
capture_output=True,
encoding='utf-8')
assert result.returncode == 0
assert result.output == '\xa0\xa1\n'
def test_backcompat():
result = out, err, rc = ipautil.run([paths.ECHO, 'foo\x02bar'],
capture_output=True,
capture_error=True)
assert rc is result.returncode
assert out is result.output
assert err is result.error_output
def test_flush_sync():
with tempfile.NamedTemporaryFile('wb+') as f:
f.write(b'data')
ipautil.flush_sync(f)
def test_run_stderr():
args = [
sys.executable, '-c',
'import sys; sys.exit(" ".join(("error", "message")))'
]
with pytest.raises(ipautil.CalledProcessError) as cm:
ipautil.run(args)
assert cm.value.cmd == repr(args)
assert cm.value.stderr == "error message\n"
assert "CalledProcessError(" in str(cm.value)
assert repr(args) in str(cm.value)
assert str(cm.value).endswith("'error message\\n')")
with pytest.raises(ipautil.CalledProcessError) as cm:
ipautil.run(args, nolog=["message"])
assert cm.value.cmd == repr(args).replace("message", "XXXXXXXX")
assert str(cm.value).endswith("'error XXXXXXXX\\n')")
assert "message" not in str(cm.value)
assert "message" not in str(cm.value.output)
assert "message" not in str(cm.value.stderr)
@pytest.mark.skipif(os.geteuid() != 0,
reason="Must have root privileges to run this test")
def test_run_runas():
"""
Test run method with the runas parameter.
The test executes 'id' to make sure that the process is
executed with the user identity specified in runas parameter.
The test is using 'ipaapi' user as it is configured when
ipa-server-common package is installed.
"""
user = pwd.getpwnam(IPAAPI_USER)
res = ipautil.run(['/usr/bin/id', '-u'], runas=IPAAPI_USER)
assert res.returncode == 0
assert res.raw_output == b'%d\n' % user.pw_uid
res = ipautil.run(['/usr/bin/id', '-g'], runas=IPAAPI_USER)
assert res.returncode == 0
assert res.raw_output == b'%d\n' % user.pw_gid
@pytest.fixture(scope='function')
def tcp_listen():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
# port 0 means the OS selects a random, unused port for the test.
s.bind(('', 0))
s.listen(1)
yield s.getsockname()[-1], s
finally:
s.close()
@pytest.fixture(scope='function')
def udp_listen():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# port 0 means the OS selects a random, unused port for the test.
s.bind(('', 0))
yield s.getsockname()[-1], s
finally:
s.close()
def test_check_port_bindable_tcp(tcp_listen):
port, sock = tcp_listen
assert not ipautil.check_port_bindable(port)
assert not ipautil.check_port_bindable(port, socket.SOCK_STREAM)
sock.close()
assert ipautil.check_port_bindable(port)
def test_check_port_bindable_udp(udp_listen):
port, sock = udp_listen
assert not ipautil.check_port_bindable(port, socket.SOCK_DGRAM)
sock.close()
assert ipautil.check_port_bindable(port, socket.SOCK_DGRAM)

View File

@@ -1,4 +1,3 @@
#! /usr/bin/python2 -E
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
@@ -21,16 +20,13 @@ import sys
sys.path.insert(0, ".")
import unittest
import pytest
from ipapython import ipavalidate
pytestmark = pytest.mark.tier0
class TestValidate(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_validEmail(self):
self.assertEqual(True, ipavalidate.Email("test@freeipa.org"))
self.assertEqual(True, ipavalidate.Email("", notEmpty=False))
@@ -92,6 +88,3 @@ class TestValidate(unittest.TestCase):
self.assertEqual(False, ipavalidate.GoodName("foo%bar"))
self.assertEqual(False, ipavalidate.GoodName("*foo"))
self.assertEqual(False, ipavalidate.GoodName("$foo.bar$"))
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,139 @@
#
# Copyright (C) 2016 FreeIPA Project Contributors - see LICENSE file
#
import pytest
import six
from ipapython.kerberos import Principal
if six.PY3:
unicode = str
valid_principals = {
u'tuser@REALM.TEST': {
'components': (u'tuser',),
'realm': u'REALM.TEST',
'username': u'tuser'
},
u'tuser\\@tupn.test@REALM.TEST': {
'components': (u'tuser@tupn.test',),
'realm': u'REALM.TEST',
'username': u'tuser@tupn.test',
'upn_suffix': u'tupn.test'
},
u'test/host.ipa.test@REALM.TEST': {
'components': (u'test', u'host.ipa.test'),
'realm': u'REALM.TEST',
'hostname': u'host.ipa.test'
},
u'test/service/host.ipa.test@REALM.TEST': {
'components': (u'test', u'service', u'host.ipa.test'),
'realm': u'REALM.TEST',
'service_name': u'test/service'
},
u'tuser': {
'components': (u'tuser',),
'realm': None,
'username': u'tuser'
},
u'$%user@REALM.TEST': {
'components': (u'$%user',),
'realm': u'REALM.TEST',
'username': u'$%user'
},
u'host/host.ipa.test': {
'components': (u'host', u'host.ipa.test'),
'realm': None,
'hostname': u'host.ipa.test'
},
u's$c/$%^.ipa.t%$t': {
'components': (u's$c', u'$%^.ipa.t%$t'),
'realm': None,
'hostname': u'$%^.ipa.t%$t',
'service_name': u's$c'
},
u'test\\/service/test\\/host@REALM\\@TEST': {
'components': (u'test/service', u'test/host'),
'realm': u'REALM@TEST',
'hostname': u'test/host',
'service_name': r'test\/service'
}
}
def valid_principal_iter(principals):
for princ, data in principals.items():
yield princ, data
@pytest.fixture(params=list(valid_principal_iter(valid_principals)))
def valid_principal(request):
return request.param
def test_principals(valid_principal):
principal_name, data = valid_principal
princ = Principal(principal_name)
for name, value in data.items():
assert getattr(princ, name) == value
assert unicode(princ) == principal_name
assert repr(princ) == "ipapython.kerberos.Principal('{}')".format(
principal_name)
def test_multiple_unescaped_ats_raise_error():
pytest.raises(ValueError, Principal, u'too@many@realms')
principals_properties = {
u'user@REALM': {
'property_true': ('is_user',),
'property_raises': ('upn_suffix', 'hostname', 'service_name')
},
u'host/m1.ipa.test@REALM': {
'property_true': ('is_host', 'is_service'),
'property_raises': ('username', 'upn_suffix')
},
u'service/m1.ipa.test@REALM': {
'property_true': ('is_service'),
'property_raises': ('username', 'upn_suffix')
},
u'user\\@domain@REALM': {
'property_true': ('is_user', 'is_enterprise'),
'property_raises': ('hostname', 'service_name')
}
}
def principal_properties_iter(principals_properties):
for p, data in principals_properties.items():
yield p, data
@pytest.fixture(params=list(principal_properties_iter(principals_properties)))
def principal_properties(request):
return request.param
def test_principal_properties(principal_properties):
principal, data = principal_properties
princ = Principal(principal)
boolean_propertes = [prop for prop in dir(princ) if
prop.startswith('is_')]
for b in boolean_propertes:
if b in data['property_true']:
assert getattr(princ, b)
else:
assert not getattr(princ, b)
for property_raises in data['property_raises']:
with pytest.raises(ValueError):
getattr(princ, property_raises)

View File

@@ -20,12 +20,16 @@
Test the `kernel_keyring.py` module.
"""
from nose.tools import raises, assert_raises # pylint: disable=E0611
from ipapython import kernel_keyring
import pytest
pytestmark = pytest.mark.tier0
TEST_KEY = 'ipa_test'
TEST_VALUE = 'abc123'
UPDATE_VALUE = '123abc'
TEST_UNICODEKEY = u'ipa_unicode'
TEST_VALUE = b'abc123'
UPDATE_VALUE = b'123abc'
SIZE_256 = 'abcdefgh' * 32
SIZE_512 = 'abcdefgh' * 64
@@ -36,7 +40,7 @@ class test_keyring(object):
Test the kernel keyring interface
"""
def setUp(self):
def setup(self):
try:
kernel_keyring.del_key(TEST_KEY)
except ValueError:
@@ -45,6 +49,10 @@ class test_keyring(object):
kernel_keyring.del_key(SIZE_256)
except ValueError:
pass
try:
kernel_keyring.del_key(TEST_UNICODEKEY)
except ValueError:
pass
def test_01(self):
"""
@@ -59,8 +67,8 @@ class test_keyring(object):
# Make sure it is gone
try:
result = kernel_keyring.read_key(TEST_KEY)
except ValueError, e:
assert e.message == 'key %s not found' % TEST_KEY
except ValueError as e:
assert str(e) == 'key %s not found' % TEST_KEY
def test_02(self):
"""
@@ -72,13 +80,13 @@ class test_keyring(object):
except ValueError:
pass
@raises(ValueError)
def test_03(self):
"""
Add a duplicate key
"""
kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
with pytest.raises(ValueError):
kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
def test_04(self):
"""
@@ -89,19 +97,20 @@ class test_keyring(object):
assert(result == UPDATE_VALUE)
# Now update it 10 times
for i in xrange(10):
kernel_keyring.update_key(TEST_KEY, 'test %d' % i)
for i in range(10):
value = ('test %d' % i).encode('ascii')
kernel_keyring.update_key(TEST_KEY, value)
result = kernel_keyring.read_key(TEST_KEY)
assert(result == 'test %d' % i)
assert(result == value)
kernel_keyring.del_key(TEST_KEY)
@raises(ValueError)
def test_05(self):
"""
Read a non-existent key
"""
result = kernel_keyring.read_key(TEST_KEY)
with pytest.raises(ValueError):
kernel_keyring.read_key(TEST_KEY)
def test_06(self):
"""
@@ -130,9 +139,9 @@ class test_keyring(object):
"""
Test 512-bytes of data
"""
kernel_keyring.add_key(TEST_KEY, SIZE_512)
kernel_keyring.add_key(TEST_KEY, SIZE_512.encode('ascii'))
result = kernel_keyring.read_key(TEST_KEY)
assert(result == SIZE_512)
assert(result == SIZE_512.encode('ascii'))
kernel_keyring.del_key(TEST_KEY)
@@ -140,8 +149,18 @@ class test_keyring(object):
"""
Test 1k bytes of data
"""
kernel_keyring.add_key(TEST_KEY, SIZE_1024)
kernel_keyring.add_key(TEST_KEY, SIZE_1024.encode('ascii'))
result = kernel_keyring.read_key(TEST_KEY)
assert(result == SIZE_1024)
assert(result == SIZE_1024.encode('ascii'))
kernel_keyring.del_key(TEST_KEY)
def test_10(self):
"""
Test a unicode key
"""
kernel_keyring.add_key(TEST_UNICODEKEY, TEST_VALUE)
result = kernel_keyring.read_key(TEST_UNICODEKEY)
assert(result == TEST_VALUE)
kernel_keyring.del_key(TEST_UNICODEKEY)

View File

@@ -0,0 +1,40 @@
#
# Copyright (C) 2017 FreeIPA Contributors see COPYING for license
#
"""
Test the `session_storage.py` module.
"""
import pytest
from ipapython import session_storage
@pytest.mark.skip_ipaclient_unittest
@pytest.mark.needs_ipaapi
class test_session_storage(object):
"""
Test the session storage interface
"""
def setup(self):
# TODO: set up test user and kinit to it
# tmpdir = tempfile.mkdtemp(prefix = "tmp-")
# os.environ['KRB5CCNAME'] = 'FILE:%s/ccache' % tmpdir
self.principal = 'admin'
self.key = 'X-IPA-test-session-storage'
self.data = b'Test Data'
def test_01(self):
session_storage.store_data(self.principal, self.key, self.data)
def test_02(self):
data = session_storage.get_data(self.principal, self.key)
assert(data == self.data)
def test_03(self):
session_storage.remove_data(self.principal, self.key)
try:
session_storage.get_data(self.principal, self.key)
except session_storage.KRB5Error:
pass

View File

@@ -21,56 +21,67 @@ Test the `ipapython/ssh.py` module.
"""
import base64
import nose
import six
import pytest
from ipapython import ssh
class CheckPublicKey:
def __init__(self, pk):
self.description = "Test SSH public key parsing (%s)" % repr(pk)
if six.PY3:
unicode = str
def __call__(self, pk, out):
try:
parsed = ssh.SSHPublicKey(pk)
assert parsed.openssh() == out
except Exception, e:
assert type(e) is out
pytestmark = pytest.mark.tier0
def test_public_key_parsing():
b64 = 'AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L'
raw = base64.b64decode(b64)
openssh = 'ssh-rsa %s' % b64
b64 = 'AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L'
raw = base64.b64decode(b64)
openssh = 'ssh-rsa %s' % b64
pks = [
('\xff', UnicodeDecodeError),
(raw, openssh),
('\0\0\0\x04none', u'none AAAABG5vbmU='),
('\0\0\0', ValueError),
('\0\0\0\0', ValueError),
('\0\0\0\x01', ValueError),
('\0\0\0\x01\xff', ValueError),
@pytest.mark.parametrize("pk,out", [
(b'\xff', UnicodeDecodeError),
(u'\xff', ValueError),
(b64, openssh),
(unicode(b64), openssh),
(u'\n%s\n\n' % b64, openssh),
(u'AAAABG5vbmU=', u'none AAAABG5vbmU='),
(u'AAAAB', ValueError),
(raw, openssh),
(b'\0\0\0\x04none', u'none AAAABG5vbmU='),
(b'\0\0\0', ValueError),
(b'\0\0\0\0', ValueError),
(b'\0\0\0\x01', ValueError),
(b'\0\0\0\x01\xff', ValueError),
(openssh, openssh),
(unicode(openssh), openssh),
(u'none AAAABG5vbmU=', u'none AAAABG5vbmU='),
(u'\t \t ssh-rsa \t \t%s\t \tthis is a comment\t \t ' % b64,
u'%s this is a comment' % openssh),
(u'opt3,opt2="\tx ",opt1,opt2="\\"x " %s comment ' % openssh,
u'opt1,opt2="\\"x ",opt3 %s comment' % openssh),
(u'ssh-rsa\n%s' % b64, ValueError),
(u'ssh-rsa\t%s' % b64, ValueError),
(u'vanitas %s' % b64, ValueError),
(u'@opt %s' % openssh, ValueError),
(u'opt=val %s' % openssh, ValueError),
(u'opt, %s' % openssh, ValueError),
]
(u'\0\0\0\x04none', ValueError),
(u'\0\0\0', ValueError),
(u'\0\0\0\0', ValueError),
(u'\0\0\0\x01', ValueError),
(u'\0\0\0\x01\xff', ValueError),
for pk in pks:
yield (CheckPublicKey(pk[0]),) + pk
(b64, openssh),
(unicode(b64), openssh),
(b64.encode('ascii'), openssh),
(u'\n%s\n\n' % b64, openssh),
(u'AAAABG5vbmU=', u'none AAAABG5vbmU='),
(u'AAAAB', ValueError),
(openssh, openssh),
(unicode(openssh), openssh),
(openssh.encode('ascii'), openssh),
(u'none AAAABG5vbmU=', u'none AAAABG5vbmU='),
(u'\t \t ssh-rsa \t \t%s\t \tthis is a comment\t \t ' % b64,
u'%s this is a comment' % openssh),
(u'opt3,opt2="\tx ",opt1,opt2="\\"x " %s comment ' % openssh,
u'opt1,opt2="\\"x ",opt3 %s comment' % openssh),
(u'ssh-rsa\n%s' % b64, ValueError),
(u'ssh-rsa\t%s' % b64, ValueError),
(u'vanitas %s' % b64, ValueError),
(u'@opt %s' % openssh, ValueError),
(u'opt=val %s' % openssh, ValueError),
(u'opt, %s' % openssh, ValueError)],
# ids=repr is workaround for pytest issue with NULL bytes,
# see https://github.com/pytest-dev/pytest/issues/2644
ids=repr
)
def test_public_key_parsing(pk, out):
if isinstance(out, type) and issubclass(out, Exception):
pytest.raises(out, ssh.SSHPublicKey, pk)
else:
parsed = ssh.SSHPublicKey(pk)
assert parsed.openssh() == out