Imported Debian patch 4.0.5-6~numeezy

This commit is contained in:
Alexandre Ellert
2016-02-17 15:07:45 +01:00
committed by Mario Fetka
parent c44de33144
commit 10dfc9587b
1203 changed files with 53869 additions and 241462 deletions

View File

@@ -1,289 +0,0 @@
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
"""
Implements a base class to track changes to an LDAP object.
"""
from __future__ import print_function
import functools
from ipalib import api, errors
from ipapython.dn import DN
from ipapython.version import API_VERSION
from ipatests.util import Fuzzy
class Tracker(object):
"""Wraps and tracks modifications to a plugin LDAP entry object
Stores a copy of state of a plugin entry object and allows checking that
the state in the database is the same as expected.
This allows creating independent tests: the individual tests check
that the relevant changes have been made. At the same time
the entry doesn't need to be recreated and cleaned up for each test.
Two attributes are used for tracking: ``exists`` (true if the entry is
supposed to exist) and ``attrs`` (a dict of LDAP attributes that are
expected to be returned from IPA commands).
For commonly used operations, there is a helper method, e.g.
``create``, ``update``, or ``find``, that does these steps:
* ensure the entry exists (or does not exist, for "create")
* store the expected modifications
* get the IPA command to run, and run it
* check that the result matches the expected state
Tests that require customization of these steps are expected to do them
manually, using lower-level methods.
Especially the first step (ensure the entry exists) is important for
achieving independent tests.
The Tracker object also stores information about the entry, e.g.
``dn``, ``rdn`` and ``name`` which is derived from DN property.
To use this class, the programer must subclass it and provide the
implementation of following methods:
* make_*_command -- implementing the API call for particular plugin
and operation (add, delete, ...)
These methods should use the make_command method
* check_* commands -- an assertion for a plugin command (CRUD)
* track_create -- to make an internal representation of the
entry
Apart from overriding these methods, the subclass must provide the
distinguished name of the entry in `self.dn` property.
It is also required to override the class variables defining the sets
of ldap attributes/keys for these operations specific to the plugin
being implemented. Take the host plugin test for an example.
The implementation of these methods is not strictly enforced.
A missing method will cause a NotImplementedError during runtime
as a result.
"""
retrieve_keys = None
retrieve_all_keys = None
create_keys = None
update_keys = None
managedby_keys = None
allowedto_keys = None
_override_me_msg = "This method needs to be overridden in a subclass"
def __init__(self, default_version=None):
self.api = api
self.default_version = default_version or API_VERSION
self._dn = None
self.exists = False
@property
def dn(self):
"""A property containing the distinguished name of the entry."""
if not self._dn:
raise ValueError('The DN must be set in the init method.')
return self._dn
@dn.setter
def dn(self, value):
if not (isinstance(value, DN) or isinstance(value, Fuzzy)):
raise ValueError('The value must be an instance of DN or Fuzzy.')
self._dn = value
@property
def rdn(self):
return self.dn[0]
@property
def name(self):
"""Property holding the name of the entry in LDAP.
This property is computed in runtime.
"""
return self.rdn.value
def filter_attrs(self, keys):
"""Return a dict of expected attrs, filtered by the given keys"""
if not self.attrs:
raise RuntimeError('The tracker instance has no attributes.')
return {k: v for k, v in self.attrs.items() if k in keys}
def run_command(self, name, *args, **options):
"""Run the given IPA command
Logs the command using print for easier debugging
"""
cmd = self.api.Command[name]
options.setdefault('version', self.default_version)
args_repr = ', '.join(
[repr(a) for a in args] +
['%s=%r' % item for item in list(options.items())])
try:
result = cmd(*args, **options)
except Exception as e:
print('Ran command: %s(%s): %s: %s' % (cmd, args_repr,
type(e).__name__, e))
raise
else:
print('Ran command: %s(%s): OK' % (cmd, args_repr))
return result
def make_command(self, name, *args, **options):
"""Make a functools.partial function to run the given command"""
return functools.partial(self.run_command, name, *args, **options)
def make_fixture(self, request):
"""Make a pytest fixture for this tracker
The fixture ensures the plugin entry does not exist before
and after the tests that use it.
"""
del_command = self.make_delete_command()
try:
del_command()
except errors.NotFound:
pass
def cleanup():
existed = self.exists
try:
del_command()
except errors.NotFound:
if existed:
raise
self.exists = False
request.addfinalizer(cleanup)
return self
def ensure_exists(self):
"""If the entry does not exist (according to tracker state), create it
"""
if not self.exists:
self.create(force=True)
def ensure_missing(self):
"""If the entry exists (according to tracker state), delete it
"""
if self.exists:
self.delete()
def make_create_command(self, force=True):
"""Make function that creates the plugin entry object."""
raise NotImplementedError(self._override_me_msg)
def make_delete_command(self):
"""Make function that deletes the plugin entry object."""
raise NotImplementedError(self._override_me_msg)
def make_retrieve_command(self, all=False, raw=False):
"""Make function that retrieves the entry using ${CMD}_show"""
raise NotImplementedError(self._override_me_msg)
def make_find_command(self, *args, **kwargs):
"""Make function that finds the entry using ${CMD}_find
Note that the name (or other search terms) needs to be specified
in arguments.
"""
raise NotImplementedError(self._override_me_msg)
def make_update_command(self, updates):
"""Make function that modifies the entry using ${CMD}_mod"""
raise NotImplementedError(self._override_me_msg)
def create(self, force=True):
"""Helper function to create an entry and check the result"""
self.ensure_missing()
self.track_create()
command = self.make_create_command(force=force)
result = command()
self.check_create(result)
def track_create(self):
"""Update expected state for host creation
The method should look similar to the following
example of host plugin.
self.attrs = dict(
dn=self.dn,
fqdn=[self.fqdn],
description=[self.description],
... # all required attributes
)
self.exists = True
"""
raise NotImplementedError(self._override_me_msg)
def check_create(self, result):
"""Check plugin's add command result"""
raise NotImplementedError(self._override_me_msg)
def delete(self):
"""Helper function to delete a host and check the result"""
self.ensure_exists()
self.track_delete()
command = self.make_delete_command()
result = command()
self.check_delete(result)
def track_delete(self):
"""Update expected state for host deletion"""
self.exists = False
self.attrs = {}
def check_delete(self, result):
"""Check plugin's `del` command result"""
raise NotImplementedError(self._override_me_msg)
def retrieve(self, all=False, raw=False):
"""Helper function to retrieve an entry and check the result"""
self.ensure_exists()
command = self.make_retrieve_command(all=all, raw=raw)
result = command()
self.check_retrieve(result, all=all, raw=raw)
def check_retrieve(self, result, all=False, raw=False):
"""Check the plugin's `show` command result"""
raise NotImplementedError(self._override_me_msg)
def find(self, all=False, raw=False):
"""Helper function to search for this hosts and check the result"""
self.ensure_exists()
command = self.make_find_command(self.name, all=all, raw=raw)
result = command()
self.check_find(result, all=all, raw=raw)
def check_find(self, result, all=False, raw=False):
"""Check the plugin's `find` command result"""
raise NotImplementedError(self._override_me_msg)
def update(self, updates, expected_updates=None):
"""Helper function to update this hosts and check the result
The ``updates`` are used as options to the *_mod command,
and the self.attrs is updated with this dict.
Additionally, self.attrs is updated with ``expected_updates``.
"""
if expected_updates is None:
expected_updates = {}
self.ensure_exists()
command = self.make_update_command(updates)
result = command()
self.attrs.update(updates)
self.attrs.update(expected_updates)
self.check_update(result, extra_keys=set(updates.keys()) |
set(expected_updates.keys()))
def check_update(self, result, extra_keys=()):
"""Check the plugin's `find` command result"""
raise NotImplementedError(self._override_me_msg)

View File

@@ -1,367 +0,0 @@
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
from ipalib import errors
from ipatests.util import assert_deepequal
from ipatests.test_xmlrpc.xmlrpc_test import (fuzzy_caacldn, fuzzy_uuid,
fuzzy_ipauniqueid)
from ipatests.test_xmlrpc import objectclasses
from ipatests.test_xmlrpc.tracker.base import Tracker
class CAACLTracker(Tracker):
"""Tracker class for CA ACL LDAP object.
The class implements methods required by the base class
to help with basic CRUD operations.
Methods for adding and deleting actual member entries into an ACL
do not have check methods as these would make the class
unnecessarily complicated. The checks are implemented as
a standalone test suite. However, this makes the test crucial
in debugging more complicated test cases. Since the add/remove
operation won't be checked right away by the tracker, a problem
in this operation may propagate into more complicated test case.
It is possible to pass a list of member uids to these methods.
The test uses instances of Fuzzy class to compare results as they
are in the UUID format. The dn and rdn properties were modified
to reflect this as well.
"""
member_keys = {
u'memberuser_user', u'memberuser_group',
u'memberhost_host', u'memberhost_hostgroup',
u'memberservice_service',
u'ipamembercertprofile_certprofile'}
category_keys = {
u'ipacacategory', u'ipacertprofilecategory', u'usercategory',
u'hostcategory', u'servicecategory'}
retrieve_keys = {
u'dn', u'cn', u'description', u'ipaenabledflag',
u'ipamemberca', u'ipamembercertprofile', u'memberuser',
u'memberhost', u'memberservice'} | member_keys | category_keys
retrieve_all_keys = retrieve_keys | {u'objectclass', u'ipauniqueid'}
create_keys = {u'dn', u'cn', u'description', u'ipacertprofilecategory',
u'usercategory', u'hostcategory', u'ipacacategory',
u'servicecategory', u'ipaenabledflag', u'objectclass',
u'ipauniqueid'}
update_keys = create_keys - {u'dn'}
def __init__(self, name, ipacertprofile_category=None, user_category=None,
service_category=None, host_category=None, description=None,
default_version=None):
super(CAACLTracker, self).__init__(default_version=default_version)
self._name = name
self.description = description
self._categories = dict(
ipacertprofilecategory=ipacertprofile_category,
usercategory=user_category,
servicecategory=service_category,
hostcategory=host_category)
self.dn = fuzzy_caacldn
@property
def name(self):
return self._name
@property
def rdn(self):
return fuzzy_ipauniqueid
@property
def categories(self):
"""To be used in track_create"""
return {cat: v for cat, v in self._categories.items() if v}
@property
def create_categories(self):
""" Return the categories set on create.
Unused categories are left out.
"""
return {cat: [v] for cat, v in self.categories.items() if v}
def make_create_command(self, force=True):
return self.make_command(u'caacl_add', self.name,
description=self.description,
**self.categories)
def check_create(self, result):
assert_deepequal(dict(
value=self.name,
summary=u'Added CA ACL "{}"'.format(self.name),
result=dict(self.filter_attrs(self.create_keys))
), result)
def track_create(self):
self.attrs = dict(
dn=self.dn,
ipauniqueid=[fuzzy_uuid],
cn=[self.name],
objectclass=objectclasses.caacl,
ipaenabledflag=[u'TRUE'])
self.attrs.update(self.create_categories)
if self.description:
self.attrs.update({u'description', [self.description]})
self.exists = True
def make_delete_command(self):
return self.make_command('caacl_del', self.name)
def check_delete(self, result):
assert_deepequal(dict(
value=[self.name],
summary=u'Deleted CA ACL "{}"'.format(self.name),
result=dict(failed=[])
), result)
def make_retrieve_command(self, all=False, raw=False):
return self.make_command('caacl_show', self.name, all=all, raw=raw)
def check_retrieve(self, result, all=False, raw=False):
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
assert_deepequal(dict(
value=self.name,
summary=None,
result=expected
), result)
def make_find_command(self, *args, **kwargs):
return self.make_command('caacl_find', *args, **kwargs)
def check_find(self, result, all=False, raw=False):
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
assert_deepequal(dict(
count=1,
truncated=False,
summary=u'1 CA ACL matched',
result=[expected]
), result)
def make_update_command(self, updates):
return self.make_command('caacl_mod', self.name, **updates)
def update(self, updates, expected_updates=None, silent=False):
"""If removing a category, delete it from tracker as well"""
# filter out empty categories and track changes
filtered_updates = dict()
for key, value in updates.items():
if key in self.category_keys:
if not value:
try:
del self.attrs[key]
except IndexError:
if silent:
pass
else:
# if there is a value, prepare the pair for update
filtered_updates.update({key: value})
else:
filtered_updates.update({key: value})
if expected_updates is None:
expected_updates = {}
command = self.make_update_command(updates)
try:
result = command()
except errors.EmptyModlist:
if silent:
self.attrs.update(filtered_updates)
self.attrs.update(expected_updates)
self.check_update(result,
extra_keys=set(self.update_keys) |
set(expected_updates.keys()))
def check_update(self, result, extra_keys=()):
assert_deepequal(dict(
value=self.name,
summary=u'Modified CA ACL "{}"'.format(self.name),
result=self.filter_attrs(self.update_keys | set(extra_keys))
), result)
# Helper methods for caacl subcommands. The check methods will be
# implemented in standalone test
#
# The methods implemented here will be:
# caacl_{add,remove}_{host, service, certprofile, user [, subca]}
def _add_acl_component(self, command_name, keys, track):
""" Add a resource into ACL rule and track it.
command_name - the name in the API
keys = {
'tracker_attr': {
'api_key': 'value'
}
}
e.g.
keys = {
'memberhost_host': {
'host': 'hostname'
},
'memberhost_hostgroup': {
'hostgroup': 'hostgroup_name'
}
}
"""
if not self.exists:
raise errors.NotFound(reason="The tracked entry doesn't exist.")
command = self.make_command(command_name, self.name)
command_options = dict()
# track
for tracker_attr in keys:
api_options = keys[tracker_attr]
if track:
for option in api_options:
try:
if type(option) in (list, tuple):
self.attrs[tracker_attr].extend(api_options[option])
else:
self.attrs[tracker_attr].append(api_options[option])
except KeyError:
if type(option) in (list, tuple):
self.attrs[tracker_attr] = api_options[option]
else:
self.attrs[tracker_attr] = [api_options[option]]
# prepare options for the command call
command_options.update(api_options)
return command(**command_options)
def _remove_acl_component(self, command_name, keys, track):
""" Remove a resource from ACL rule and track it.
command_name - the name in the API
keys = {
'tracker_attr': {
'api_key': 'value'
}
}
e.g.
keys = {
'memberhost_host': {
'host': 'hostname'
},
'memberhost_hostgroup': {
'hostgroup': 'hostgroup_name'
}
}
"""
command = self.make_command(command_name, self.name)
command_options = dict()
for tracker_attr in keys:
api_options = keys[tracker_attr]
if track:
for option in api_options:
if type(option) in (list, tuple):
for item in option:
self.attrs[tracker_attr].remove(item)
else:
self.attrs[tracker_attr].remove(api_options[option])
if len(self.attrs[tracker_attr]) == 0:
del self.attrs[tracker_attr]
command_options.update(api_options)
return command(**command_options)
def add_host(self, host=None, hostgroup=None, track=True):
"""Associates an host or hostgroup entry with the ACL.
The command takes an unicode string with the name
of the entry (RDN).
It is the responsibility of a test writer to provide
the correct value, object type as the method does not
verify whether the entry exists.
The method can add only one entry of each type
in one call.
"""
options = {
u'memberhost_host': {u'host': host},
u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
return self._add_acl_component(u'caacl_add_host', options, track)
def remove_host(self, host=None, hostgroup=None, track=True):
options = {
u'memberhost_host': {u'host': host},
u'memberhost_hostgroup': {u'hostgroup': hostgroup}}
return self._remove_acl_component(u'caacl_remove_host', options, track)
def add_user(self, user=None, group=None, track=True):
options = {
u'memberuser_user': {u'user': user},
u'memberuser_group': {u'group': group}}
return self._add_acl_component(u'caacl_add_user', options, track)
def remove_user(self, user=None, group=None, track=True):
options = {
u'memberuser_user': {u'user': user},
u'memberuser_group': {u'group': group}}
return self._remove_acl_component(u'caacl_remove_user', options, track)
def add_service(self, service=None, track=True):
options = {
u'memberservice_service': {u'service': service}}
return self._add_acl_component(u'caacl_add_service', options, track)
def remove_service(self, service=None, track=True):
options = {
u'memberservice_service': {u'service': service}}
return self._remove_acl_component(u'caacl_remove_service', options, track)
def add_profile(self, certprofile=None, track=True):
options = {
u'ipamembercertprofile_certprofile':
{u'certprofile': certprofile}}
return self._add_acl_component(u'caacl_add_profile', options, track)
def remove_profile(self, certprofile=None, track=True):
options = {
u'ipamembercertprofile_certprofile':
{u'certprofile': certprofile}}
return self._remove_acl_component(u'caacl_remove_profile', options, track)
def enable(self):
command = self.make_command(u'caacl_enable', self.name)
self.attrs.update({u'ipaenabledflag': [u'TRUE']})
command()
def disable(self):
command = self.make_command(u'caacl_disable', self.name)
self.attrs.update({u'ipaenabledflag': [u'FALSE']})
command()

View File

@@ -1,137 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
import os
import six
from ipapython.dn import DN
from ipatests.test_xmlrpc.tracker.base import Tracker
from ipatests.test_xmlrpc import objectclasses
from ipatests.util import assert_deepequal
if six.PY3:
unicode = str
class CertprofileTracker(Tracker):
"""Tracker class for certprofile plugin.
"""
retrieve_keys = {
'dn', 'cn', 'description', 'ipacertprofilestoreissued'
}
retrieve_all_keys = retrieve_keys | {'objectclass'}
create_keys = retrieve_keys | {'objectclass'}
update_keys = retrieve_keys - {'dn'}
managedby_keys = retrieve_keys
allowedto_keys = retrieve_keys
def __init__(self, name, store=False, desc='dummy description',
profile=None, default_version=None):
super(CertprofileTracker, self).__init__(
default_version=default_version
)
self.store = store
self.description = desc
self._profile_path = profile
self.dn = DN(('cn', name), 'cn=certprofiles', 'cn=ca',
self.api.env.basedn)
@property
def profile(self):
if not self._profile_path:
return None
if os.path.isabs(self._profile_path):
path = self._profile_path
else:
path = os.path.join(os.path.dirname(__file__),
self._profile_path)
with open(path, 'r') as f:
content = f.read()
return unicode(content)
def make_create_command(self, force=True):
if not self.profile:
raise RuntimeError('Tracker object without path to profile '
'cannot be used to create profile entry.')
return self.make_command('certprofile_import', self.name,
description=self.description,
ipacertprofilestoreissued=self.store,
file=self.profile)
def check_create(self, result):
assert_deepequal(dict(
value=self.name,
summary=u'Imported profile "{}"'.format(self.name),
result=dict(self.filter_attrs(self.create_keys))
), result)
def track_create(self):
self.attrs = dict(
dn=unicode(self.dn),
cn=[self.name],
description=[self.description],
ipacertprofilestoreissued=[unicode(self.store).upper()],
objectclass=objectclasses.certprofile
)
self.exists = True
def make_delete_command(self):
return self.make_command('certprofile_del', self.name)
def check_delete(self, result):
assert_deepequal(dict(
value=[self.name], # correctly a list?
summary=u'Deleted profile "{}"'.format(self.name),
result=dict(failed=[]),
), result)
def make_retrieve_command(self, all=False, raw=False, **options):
return self.make_command('certprofile_show', self.name, all=all,
raw=raw, **options)
def check_retrieve(self, result, all=False, raw=False):
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
assert_deepequal(dict(
value=self.name,
summary=None,
result=expected,
), result)
def make_find_command(self, *args, **kwargs):
return self.make_command('certprofile_find', *args, **kwargs)
def check_find(self, result, all=False, raw=False):
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
assert_deepequal(dict(
count=1,
truncated=False,
summary=u'1 profile matched',
result=[expected]
), result)
def make_update_command(self, updates):
return self.make_command('certprofile_mod', self.name, **updates)
def check_update(self, result, extra_keys=()):
assert_deepequal(dict(
value=self.name,
summary=u'Modified Certificate Profile "{}"'.format(self.name),
result=self.filter_attrs(self.update_keys | set(extra_keys))
), result)

View File

@@ -1,196 +0,0 @@
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
from ipatests.test_xmlrpc import objectclasses
from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_digits, fuzzy_uuid
from ipatests.test_xmlrpc.tracker.base import Tracker
from ipatests.util import assert_deepequal, get_group_dn
class GroupTracker(Tracker):
""" Class for host plugin like tests """
retrieve_keys = {u'dn', u'cn', u'gidnumber', u'member_user',
u'member_group'}
retrieve_all_keys = retrieve_keys | {u'ipauniqueid', u'objectclass'}
create_keys = retrieve_all_keys
update_keys = retrieve_keys - {u'dn'}
add_member_keys = retrieve_keys | {u'description'}
def __init__(self, name):
super(GroupTracker, self).__init__(default_version=None)
self.cn = name
self.dn = get_group_dn(name)
def make_create_command(self, nonposix=False, external=False,
force=True):
""" Make function that creates a group using 'group-add' """
return self.make_command('group_add', self.cn,
nonposix=nonposix, external=external)
def make_delete_command(self):
""" Make function that deletes a group using 'group-del' """
return self.make_command('group_del', self.cn)
def make_retrieve_command(self, all=False, raw=False):
""" Make function that retrieves a group using 'group-show' """
return self.make_command('group_show', self.cn, all=all)
def make_find_command(self, *args, **kwargs):
""" Make function that searches for a group using 'group-find' """
return self.make_command('group_find', *args, **kwargs)
def make_update_command(self, updates):
""" Make function that updates a group using 'group-mod' """
return self.make_command('group_mod', self.cn, **updates)
def make_add_member_command(self, options={}):
""" Make function that adds a member to a group
Attention: only works for one user OR group! """
if u'user' in options:
self.attrs[u'member_user'] = [options[u'user']]
elif u'group' in options:
self.attrs[u'member_group'] = [options[u'group']]
self.adds = options
return self.make_command('group_add_member', self.cn, **options)
def make_remove_member_command(self, options={}):
""" Make function that removes a member from a group
Attention: only works for one user OR group! """
if u'user' in options:
del self.attrs[u'member_user']
elif u'group' in options:
del self.attrs[u'member_group']
return self.make_command('group_remove_member', self.cn, **options)
def make_detach_command(self):
""" Make function that detaches a managed group using
'group-detach' """
self.exists = True
return self.make_command('group_detach', self.cn)
def track_create(self):
""" Updates expected state for group creation"""
self.attrs = dict(
dn=get_group_dn(self.cn),
cn=[self.cn],
gidnumber=[fuzzy_digits],
ipauniqueid=[fuzzy_uuid],
objectclass=objectclasses.posixgroup,
)
self.exists = True
def check_create(self, result):
""" Checks 'group_add' command result """
assert_deepequal(dict(
value=self.cn,
summary=u'Added group "%s"' % self.cn,
result=self.filter_attrs(self.create_keys)
), result)
def check_delete(self, result):
""" Checks 'group_del' command result """
assert_deepequal(dict(
value=[self.cn],
summary=u'Deleted group "%s"' % self.cn,
result=dict(failed=[]),
), result)
def check_retrieve(self, result, all=False, raw=False):
""" Checks 'group_show' command result """
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
assert_deepequal(dict(
value=self.cn,
summary=None,
result=expected
), result)
def check_find(self, result, all=False, raw=False):
""" Checks 'group_find' command result """
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
assert_deepequal(dict(
count=1,
truncated=False,
summary=u'1 group matched',
result=[expected],
), result)
def check_update(self, result, extra_keys={}):
""" Checks 'group_mod' command result """
assert_deepequal(dict(
value=self.cn,
summary=u'Modified group "%s"' % self.cn,
result=self.filter_attrs(self.update_keys | set(extra_keys))
), result)
def check_add_member(self, result):
""" Checks 'group_add_member' command result """
assert_deepequal(dict(
completed=1,
failed={u'member': {u'group': (), u'user': ()}},
result=self.filter_attrs(self.add_member_keys)
), result)
def check_add_member_negative(self, result):
""" Checks 'group_add_member' command result when expected result
is failure of the operation"""
if u'member_user' in self.attrs:
del self.attrs[u'member_user']
elif u'member_group' in self.attrs:
del self.attrs[u'member_group']
expected = dict(
completed=0,
failed={u'member': {u'group': (), u'user': ()}},
result=self.filter_attrs(self.add_member_keys)
)
if u'user' in self.adds:
expected[u'failed'][u'member'][u'user'] = [(
self.adds[u'user'], u'no such entry')]
elif u'group' in self.adds:
expected[u'failed'][u'member'][u'group'] = [(
self.adds[u'group'], u'no such entry')]
assert_deepequal(expected, result)
def check_remove_member(self, result):
""" Checks 'group_remove_member' command result """
assert_deepequal(dict(
completed=1,
failed={u'member': {u'group': (), u'user': ()}},
result=self.filter_attrs(self.add_member_keys)
), result)
def check_detach(self, result):
""" Checks 'group_detach' command result """
assert_deepequal(dict(
value=self.cn,
summary=u'Detached group "%s" from user "%s"' % (
self.cn, self.cn),
result=True
), result)
def make_fixture_detach(self, request):
"""Make a pytest fixture for this tracker
The fixture ensures the plugin entry does not exist before
and after the tests that use itself.
"""
def cleanup():
pass
request.addfinalizer(cleanup)
return self

View File

@@ -1,154 +0,0 @@
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
from __future__ import print_function
from ipapython.dn import DN
from ipatests.test_xmlrpc.tracker.base import Tracker
from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_uuid
from ipatests.test_xmlrpc import objectclasses
from ipatests.util import assert_deepequal
class HostTracker(Tracker):
"""Wraps and tracks modifications to a Host object
Implements the helper functions for host plugin.
The HostTracker object stores information about the host, e.g.
``fqdn`` and ``dn``.
"""
retrieve_keys = {
'dn', 'fqdn', 'description', 'l', 'krbprincipalname', 'managedby_host',
'has_keytab', 'has_password', 'issuer', 'md5_fingerprint',
'serial_number', 'serial_number_hex', 'sha1_fingerprint',
'subject', 'usercertificate', 'valid_not_after', 'valid_not_before',
'macaddress', 'sshpubkeyfp', 'ipaallowedtoperform_read_keys_user',
'ipaallowedtoperform_read_keys_group',
'ipaallowedtoperform_read_keys_host',
'ipaallowedtoperform_read_keys_hostgroup',
'ipaallowedtoperform_write_keys_user',
'ipaallowedtoperform_write_keys_group',
'ipaallowedtoperform_write_keys_host',
'ipaallowedtoperform_write_keys_hostgroup'}
retrieve_all_keys = retrieve_keys | {
u'cn', u'ipakrbokasdelegate', u'ipakrbrequirespreauth', u'ipauniqueid',
u'managing_host', u'objectclass', u'serverhostname'}
create_keys = retrieve_keys | {'objectclass', 'ipauniqueid',
'randompassword'}
update_keys = retrieve_keys - {'dn'}
managedby_keys = retrieve_keys - {'has_keytab', 'has_password'}
allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'}
def __init__(self, name, fqdn=None, default_version=None):
super(HostTracker, self).__init__(default_version=default_version)
self.shortname = name
if fqdn:
self.fqdn = fqdn
else:
self.fqdn = u'%s.%s' % (name, self.api.env.domain)
self.dn = DN(('fqdn', self.fqdn), 'cn=computers', 'cn=accounts',
self.api.env.basedn)
self.description = u'Test host <%s>' % name
self.location = u'Undisclosed location <%s>' % name
def make_create_command(self, force=True):
"""Make function that creates this host using host_add"""
return self.make_command('host_add', self.fqdn,
description=self.description,
l=self.location,
force=force)
def make_delete_command(self):
"""Make function that deletes the host using host_del"""
return self.make_command('host_del', self.fqdn)
def make_retrieve_command(self, all=False, raw=False):
"""Make function that retrieves the host using host_show"""
return self.make_command('host_show', self.fqdn, all=all, raw=raw)
def make_find_command(self, *args, **kwargs):
"""Make function that finds hosts using host_find
Note that the fqdn (or other search terms) needs to be specified
in arguments.
"""
return self.make_command('host_find', *args, **kwargs)
def make_update_command(self, updates):
"""Make function that modifies the host using host_mod"""
return self.make_command('host_mod', self.fqdn, **updates)
def track_create(self):
"""Update expected state for host creation"""
self.attrs = dict(
dn=self.dn,
fqdn=[self.fqdn],
description=[self.description],
l=[self.location],
krbprincipalname=[u'host/%s@%s' % (self.fqdn, self.api.env.realm)],
objectclass=objectclasses.host,
ipauniqueid=[fuzzy_uuid],
managedby_host=[self.fqdn],
has_keytab=False,
has_password=False,
cn=[self.fqdn],
ipakrbokasdelegate=False,
ipakrbrequirespreauth=True,
managing_host=[self.fqdn],
serverhostname=[self.shortname],
)
self.exists = True
def check_create(self, result):
"""Check `host_add` command result"""
assert_deepequal(dict(
value=self.fqdn,
summary=u'Added host "%s"' % self.fqdn,
result=self.filter_attrs(self.create_keys),
), result)
def check_delete(self, result):
"""Check `host_del` command result"""
assert_deepequal(dict(
value=[self.fqdn],
summary=u'Deleted host "%s"' % self.fqdn,
result=dict(failed=[]),
), result)
def check_retrieve(self, result, all=False, raw=False):
"""Check `host_show` command result"""
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
assert_deepequal(dict(
value=self.fqdn,
summary=None,
result=expected,
), result)
def check_find(self, result, all=False, raw=False):
"""Check `host_find` command result"""
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
assert_deepequal(dict(
count=1,
truncated=False,
summary=u'1 host matched',
result=[expected],
), result)
def check_update(self, result, extra_keys=()):
"""Check `host_update` command result"""
assert_deepequal(dict(
value=self.fqdn,
summary=u'Modified host "%s"' % self.fqdn,
result=self.filter_attrs(self.update_keys | set(extra_keys))
), result)

View File

@@ -1,267 +0,0 @@
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
import six
from ipalib import api, errors
from ipatests.test_xmlrpc.tracker.base import Tracker
from ipatests.test_xmlrpc import objectclasses
from ipatests.test_xmlrpc.xmlrpc_test import (
fuzzy_string, fuzzy_dergeneralizedtime, raises_exact)
from ipatests.util import assert_deepequal, get_user_dn
from ipapython.dn import DN
if six.PY3:
unicode = str
sshpubkey = (u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6X'
'HBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGI'
'wA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2'
'No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNm'
'cSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM01'
'9Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF'
'0L public key test')
sshpubkeyfp = (u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B '
'public key test (ssh-rsa)')
class StageUserTracker(Tracker):
""" Tracker class for staged user LDAP object
Implements helper functions for host plugin.
StageUserTracker object stores information about the user.
"""
retrieve_keys = {
u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell',
u'uidnumber', u'gidnumber', u'mail', u'ou', u'telephonenumber',
u'title', u'memberof', u'nsaccountlock', u'memberofindirect',
u'ipauserauthtype', u'userclass', u'ipatokenradiusconfiglink',
u'ipatokenradiususername', u'krbprincipalexpiration',
u'usercertificate', u'dn', u'has_keytab', u'has_password',
u'street', u'postalcode', u'facsimiletelephonenumber',
u'carlicense', u'ipasshpubkey', u'sshpubkeyfp', u'l',
u'st', u'mobile', u'pager', }
retrieve_all_keys = retrieve_keys | {
u'cn', u'ipauniqueid', u'objectclass', u'description',
u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
create_keys = retrieve_all_keys | {
u'objectclass', u'ipauniqueid', u'randompassword',
u'userpassword', u'krbextradata', u'krblastpwdchange',
u'krbpasswordexpiration', u'krbprincipalkey'}
update_keys = retrieve_keys - {u'dn', u'nsaccountlock'}
activate_keys = retrieve_keys | {
u'has_keytab', u'has_password', u'nsaccountlock'}
def __init__(self, name, givenname, sn, **kwargs):
super(StageUserTracker, self).__init__(default_version=None)
self.uid = name
self.givenname = givenname
self.sn = sn
self.dn = DN(
('uid', self.uid), api.env.container_stageuser, api.env.basedn)
self.kwargs = kwargs
def make_create_command(self, options=None, force=None):
""" Make function that creates a staged user using stageuser-add """
if options is not None:
self.kwargs = options
return self.make_command('stageuser_add', self.uid,
givenname=self.givenname,
sn=self.sn, **self.kwargs)
def make_delete_command(self):
""" Make function that deletes a staged user using stageuser-del """
return self.make_command('stageuser_del', self.uid)
def make_retrieve_command(self, all=False, raw=False):
""" Make function that retrieves a staged user using stageuser-show """
return self.make_command('stageuser_show', self.uid, all=all)
def make_find_command(self, *args, **kwargs):
""" Make function that finds staged user using stageuser-find """
return self.make_command('stageuser_find', *args, **kwargs)
def make_update_command(self, updates):
""" Make function that updates staged user using stageuser-mod """
return self.make_command('stageuser_mod', self.uid, **updates)
def make_activate_command(self):
""" Make function that activates staged user
using stageuser-activate """
return self.make_command('stageuser_activate', self.uid)
def track_create(self):
""" Update expected state for staged user creation """
self.attrs = dict(
dn=self.dn,
uid=[self.uid],
givenname=[self.givenname],
sn=[self.sn],
homedirectory=[u'/home/%s' % self.uid],
displayname=[u'%s %s' % (self.givenname, self.sn)],
cn=[u'%s %s' % (self.givenname, self.sn)],
initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
objectclass=objectclasses.user_base,
description=[u'__no_upg__'],
ipauniqueid=[u'autogenerate'],
uidnumber=[u'-1'],
gidnumber=[u'-1'],
krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
gecos=[u'%s %s' % (self.givenname, self.sn)],
loginshell=[u'/bin/sh'],
has_keytab=False,
has_password=False,
nsaccountlock=[u'true'],
)
for key in self.kwargs:
if key == u'krbprincipalname':
self.attrs[key] = [u'%s@%s' % (
(self.kwargs[key].split('@'))[0].lower(),
(self.kwargs[key].split('@'))[1])]
elif key == u'manager':
self.attrs[key] = [self.kwargs[key]]
elif key == u'ipasshpubkey':
self.attrs[u'sshpubkeyfp'] = [sshpubkeyfp]
self.attrs[key] = [self.kwargs[key]]
elif key == u'random' or key == u'userpassword':
self.attrs[u'krbextradata'] = [fuzzy_string]
self.attrs[u'krbpasswordexpiration'] = [
fuzzy_dergeneralizedtime]
self.attrs[u'krblastpwdchange'] = [fuzzy_dergeneralizedtime]
self.attrs[u'krbprincipalkey'] = [fuzzy_string]
self.attrs[u'userpassword'] = [fuzzy_string]
self.attrs[u'has_keytab'] = True
self.attrs[u'has_password'] = True
if key == u'random':
self.attrs[u'randompassword'] = fuzzy_string
else:
self.attrs[key] = [self.kwargs[key]]
self.exists = True
def check_create(self, result):
""" Check 'stageuser-add' command result """
assert_deepequal(dict(
value=self.uid,
summary=u'Added stage user "%s"' % self.uid,
result=self.filter_attrs(self.create_keys),
), result)
def check_delete(self, result):
""" Check 'stageuser-del' command result """
assert_deepequal(dict(
value=[self.uid],
summary=u'Deleted stage user "%s"' % self.uid,
result=dict(failed=[]),
), result)
def check_retrieve(self, result, all=False, raw=False):
""" Check 'stageuser-show' command result """
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
# small override because stageuser-find returns different
# type of nsaccountlock value than DS, but overall the value
# fits expected result
if expected[u'nsaccountlock'] == [u'true']:
expected[u'nsaccountlock'] = True
elif expected[u'nsaccountlock'] == [u'false']:
expected[u'nsaccountlock'] = False
assert_deepequal(dict(
value=self.uid,
summary=None,
result=expected,
), result)
def check_find(self, result, all=False, raw=False):
""" Check 'stageuser-find' command result """
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
# small override because stageuser-find returns different
# type of nsaccountlock value than DS, but overall the value
# fits expected result
if expected[u'nsaccountlock'] == [u'true']:
expected[u'nsaccountlock'] = True
elif expected[u'nsaccountlock'] == [u'false']:
expected[u'nsaccountlock'] = False
assert_deepequal(dict(
count=1,
truncated=False,
summary=u'1 user matched',
result=[expected],
), result)
def check_find_nomatch(self, result):
""" Check 'stageuser-find' command result when no match is expected """
assert_deepequal(dict(
count=0,
truncated=False,
summary=u'0 users matched',
result=[],
), result)
def check_update(self, result, extra_keys=()):
""" Check 'stageuser-mod' command result """
assert_deepequal(dict(
value=self.uid,
summary=u'Modified stage user "%s"' % self.uid,
result=self.filter_attrs(self.update_keys | set(extra_keys))
), result)
def check_restore_preserved(self, result):
assert_deepequal(dict(
value=[self.uid],
summary=u'Staged user account "%s"' % self.uid,
result=dict(failed=[]),
), result)
def make_fixture_activate(self, request):
"""Make a pytest fixture for a staged user that is to be activated
The fixture ensures the plugin entry does not exist before
and after the tests that use it. It takes into account
that the staged user no longer exists after activation,
therefore the fixture verifies after the tests
that the staged user doesn't exist instead of deleting it.
"""
del_command = self.make_delete_command()
try:
del_command()
except errors.NotFound:
pass
def finish():
with raises_exact(errors.NotFound(
reason=u'%s: stage user not found' % self.uid)):
del_command()
request.addfinalizer(finish)
return self
def create_from_preserved(self, user):
""" Copies values from preserved user - helper function for
restoration tests """
self.attrs = user.attrs
self.uid = user.uid
self.givenname = user.givenname
self.sn = user.sn
self.dn = DN(
('uid', self.uid), api.env.container_stageuser, api.env.basedn)
self.attrs[u'dn'] = self.dn

View File

@@ -1,479 +0,0 @@
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
from ipalib import api, errors
from ipapython.dn import DN
import six
from ipatests.util import assert_deepequal, get_group_dn
from ipatests.test_xmlrpc import objectclasses
from ipatests.test_xmlrpc.xmlrpc_test import (
fuzzy_digits, fuzzy_uuid, raises_exact)
from ipatests.test_xmlrpc.tracker.base import Tracker
if six.PY3:
unicode = str
class UserTracker(Tracker):
""" Class for host plugin like tests """
retrieve_keys = {
u'dn', u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell',
u'uidnumber', u'gidnumber', u'mail', u'ou',
u'telephonenumber', u'title', u'memberof', u'nsaccountlock',
u'memberofindirect', u'ipauserauthtype', u'userclass',
u'ipatokenradiusconfiglink', u'ipatokenradiususername',
u'krbprincipalexpiration', u'usercertificate;binary',
u'has_keytab', u'has_password', u'memberof_group', u'sshpubkeyfp',
}
retrieve_all_keys = retrieve_keys | {
u'usercertificate', u'street', u'postalcode',
u'facsimiletelephonenumber', u'carlicense', u'ipasshpubkey',
u'l', u'mobile', u'krbextradata', u'krblastpwdchange',
u'krbpasswordexpiration', u'pager', u'st', u'manager', u'cn',
u'ipauniqueid', u'objectclass', u'mepmanagedentry',
u'displayname', u'gecos', u'initials', u'krbprincipalname',
u'preserved'}
retrieve_preserved_keys = (retrieve_keys - {u'memberof_group'}) | {
u'preserved'}
retrieve_preserved_all_keys = retrieve_all_keys - {u'memberof_group'}
create_keys = retrieve_all_keys | {
u'krbextradata', u'krbpasswordexpiration', u'krblastpwdchange',
u'krbprincipalkey', u'userpassword', u'randompassword'}
create_keys = create_keys - {u'nsaccountlock'}
update_keys = retrieve_keys - {u'dn'}
activate_keys = retrieve_keys
find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'}
find_all_keys = retrieve_all_keys
primary_keys = {u'uid', u'dn'}
def __init__(self, name, givenname, sn, **kwargs):
super(UserTracker, self).__init__(default_version=None)
self.uid = name
self.givenname = givenname
self.sn = sn
self.dn = DN(('uid', self.uid), api.env.container_user, api.env.basedn)
self.kwargs = kwargs
def make_create_command(self, force=None):
""" Make function that crates a user using user-add """
return self.make_command(
'user_add', self.uid,
givenname=self.givenname,
sn=self.sn, **self.kwargs
)
def make_delete_command(self, no_preserve=True, preserve=False):
""" Make function that deletes a user using user-del """
if preserve and not no_preserve:
# necessary to change some user attributes due to moving
# to different container
self.attrs[u'dn'] = DN(
('uid', self.uid),
api.env.container_deleteuser,
api.env.basedn
)
self.attrs[u'objectclass'] = objectclasses.user_base
return self.make_command(
'user_del', self.uid,
no_preserve=no_preserve,
preserve=preserve
)
def make_retrieve_command(self, all=False, raw=False):
""" Make function that retrieves a user using user-show """
return self.make_command('user_show', self.uid, all=all)
def make_find_command(self, *args, **kwargs):
""" Make function that finds user using user-find """
return self.make_command('user_find', *args, **kwargs)
def make_update_command(self, updates):
""" Make function that updates user using user-mod """
return self.make_command('user_mod', self.uid, **updates)
def make_undelete_command(self):
""" Make function that activates preserved user using user-undel """
return self.make_command('user_undel', self.uid)
def make_enable_command(self):
""" Make function that enables user using user-enable """
return self.make_command('user_enable', self.uid)
def make_disable_command(self):
""" Make function that disables user using user-disable """
return self.make_command('user_disable', self.uid)
def make_stage_command(self):
""" Make function that restores preserved user by moving it to
staged container """
return self.make_command('user_stage', self.uid)
def make_group_add_member_command(self, *args, **kwargs):
return self.make_command('group_add_member', *args, **kwargs)
def track_create(self):
""" Update expected state for user creation """
self.attrs = dict(
dn=self.dn,
uid=[self.uid],
givenname=[self.givenname],
sn=[self.sn],
homedirectory=[u'/home/%s' % self.uid],
displayname=[u'%s %s' % (self.givenname, self.sn)],
cn=[u'%s %s' % (self.givenname, self.sn)],
initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
objectclass=objectclasses.user,
description=[u'__no_upg__'],
ipauniqueid=[fuzzy_uuid],
uidnumber=[fuzzy_digits],
gidnumber=[fuzzy_digits],
krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
gecos=[u'%s %s' % (self.givenname, self.sn)],
loginshell=[u'/bin/sh'],
has_keytab=False,
has_password=False,
mepmanagedentry=[get_group_dn(self.uid)],
memberof_group=[u'ipausers'],
nsaccountlock=[u'false'],
)
for key in self.kwargs:
if key == u'krbprincipalname':
try:
self.attrs[key] = [u'%s@%s' % (
(self.kwargs[key].split('@'))[0].lower(),
(self.kwargs[key].split('@'))[1]
)]
except IndexError as ex:
# we can provide just principal part
self.attrs[key] = [u'%s@%s' % (
(self.kwargs[key].lower(),
self.api.env.realm)
)]
else:
if not type(self.kwargs[key]) is list:
self.attrs[key] = [self.kwargs[key]]
else:
self.attrs[key] = self.kwargs[key]
self.exists = True
def update(self, updates, expected_updates=None):
"""Helper function to update this user and check the result
Overriding Tracker method for setting self.attrs correctly;
* most attributes stores its value in list
* the rest can be overridden by expected_updates
* allow deleting parametrs if update value is None
"""
if expected_updates is None:
expected_updates = {}
self.ensure_exists()
command = self.make_update_command(updates)
result = command()
for key, value in updates.items():
if value is None or value is '' or value is u'':
del self.attrs[key]
else:
if type(value) is list:
self.attrs[key] = value
else:
self.attrs[key] = [value]
for key, value in expected_updates.items():
if value is None or value is '' or value is u'':
del self.attrs[key]
else:
self.attrs[key] = value
self.check_update(
result,
extra_keys=set(updates.keys()) | set(expected_updates.keys())
)
def check_create(self, result, extra_keys=()):
""" Check 'user-add' command result """
expected = self.filter_attrs(self.create_keys | set(extra_keys))
assert_deepequal(dict(
value=self.uid,
summary=u'Added user "%s"' % self.uid,
result=self.filter_attrs(expected),
), result)
def check_delete(self, result):
""" Check 'user-del' command result """
assert_deepequal(dict(
value=[self.uid],
summary=u'Deleted user "%s"' % self.uid,
result=dict(failed=[]),
), result)
def check_retrieve(self, result, all=False, raw=False):
""" Check 'user-show' command result """
if u'preserved' in self.attrs and self.attrs[u'preserved']:
self.retrieve_all_keys = self.retrieve_preserved_all_keys
self.retrieve_keys = self.retrieve_preserved_keys
elif u'preserved' not in self.attrs and all:
self.attrs[u'preserved'] = False
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
# small override because stageuser-find returns different type
# of nsaccountlock value than DS, but overall the value fits
# expected result
if u'nsaccountlock' in expected:
if expected[u'nsaccountlock'] == [u'true']:
expected[u'nsaccountlock'] = True
elif expected[u'nsaccountlock'] == [u'false']:
expected[u'nsaccountlock'] = False
assert_deepequal(dict(
value=self.uid,
summary=None,
result=expected,
), result)
def check_find(self, result, all=False, pkey_only=False, raw=False,
expected_override=None):
""" Check 'user-find' command result """
if all:
if u'preserved' not in self.attrs:
self.attrs.update(preserved=False)
expected = self.filter_attrs(self.find_all_keys)
elif pkey_only:
expected = self.filter_attrs(self.primary_keys)
else:
expected = self.filter_attrs(self.find_keys)
if all and self.attrs[u'preserved']:
del expected[u'mepmanagedentry']
if u'nsaccountlock' in expected:
if expected[u'nsaccountlock'] == [u'true']:
expected[u'nsaccountlock'] = True
elif expected[u'nsaccountlock'] == [u'false']:
expected[u'nsaccountlock'] = False
if expected_override:
assert isinstance(expected_override, dict)
expected.update(expected_override)
assert_deepequal(dict(
count=1,
truncated=False,
summary=u'1 user matched',
result=[expected],
), result)
def check_find_nomatch(self, result):
""" Check 'user-find' command result when no user should be found """
assert_deepequal(dict(
count=0,
truncated=False,
summary=u'0 users matched',
result=[],
), result)
def check_update(self, result, extra_keys=()):
""" Check 'user-mod' command result """
expected = self.filter_attrs(self.update_keys | set(extra_keys))
if expected[u'nsaccountlock'] == [u'true']:
expected[u'nsaccountlock'] = True
elif expected[u'nsaccountlock'] == [u'false']:
expected[u'nsaccountlock'] = False
assert_deepequal(dict(
value=self.uid,
summary=u'Modified user "%s"' % self.uid,
result=expected
), result)
def check_enable(self, result):
""" Check result of enable user operation """
assert_deepequal(dict(
value=self.name,
summary=u'Enabled user account "%s"' % self.name,
result=True
), result)
def check_disable(self, result):
""" Check result of disable user operation """
assert_deepequal(dict(
value=self.name,
summary=u'Disabled user account "%s"' % self.name,
result=True
), result)
def create_from_staged(self, stageduser):
""" Copies attributes from staged user - helper function for
activation tests """
self.attrs = stageduser.attrs
self.uid = stageduser.uid
self.givenname = stageduser.givenname
self.sn = stageduser.sn
self.attrs[u'mepmanagedentry'] = None
self.attrs[u'dn'] = self.dn
self.attrs[u'ipauniqueid'] = [fuzzy_uuid]
self.attrs[u'memberof_group'] = [u'ipausers']
self.attrs[u'mepmanagedentry'] = [u'cn=%s,%s,%s' % (
self.uid, api.env.container_group, api.env.basedn
)]
self.attrs[u'objectclass'] = objectclasses.user
if self.attrs[u'gidnumber'] == [u'-1']:
self.attrs[u'gidnumber'] = [fuzzy_digits]
if self.attrs[u'uidnumber'] == [u'-1']:
self.attrs[u'uidnumber'] = [fuzzy_digits]
if u'ipasshpubkey' in self.kwargs:
self.attrs[u'ipasshpubkey'] = [str(
self.kwargs[u'ipasshpubkey']
)]
self.attrs[u'nsaccountlock'] = [u'false']
def check_activate(self, result):
""" Check 'stageuser-activate' command result """
expected = dict(
value=self.uid,
summary=u'Stage user %s activated' % self.uid,
result=self.filter_attrs(self.activate_keys))
# small override because stageuser-find returns different
# type of nsaccountlock value than DS, but overall the value
# fits expected result
if expected['result'][u'nsaccountlock'] == [u'true']:
expected['result'][u'nsaccountlock'] = True
elif expected['result'][u'nsaccountlock'] == [u'false']:
expected['result'][u'nsaccountlock'] = False
assert_deepequal(expected, result)
self.exists = True
def check_undel(self, result):
""" Check 'user-undel' command result """
assert_deepequal(dict(
value=self.uid,
summary=u'Undeleted user account "%s"' % self.uid,
result=True
), result)
def enable(self):
""" Enable user account if it was disabled """
if (self.attrs['nsaccountlock'] is True or
self.attrs['nsaccountlock'] == [u'true']):
self.attrs.update(nsaccountlock=False)
result = self.make_enable_command()()
self.check_enable(result)
def disable(self):
""" Disable user account if it was enabled """
if (self.attrs['nsaccountlock'] is False or
self.attrs['nsaccountlock'] == [u'false']):
self.attrs.update(nsaccountlock=True)
result = self.make_disable_command()()
self.check_disable(result)
def track_delete(self, preserve=False):
"""Update expected state for host deletion"""
if preserve:
self.exists = True
if u'memberof_group' in self.attrs:
del self.attrs[u'memberof_group']
self.attrs[u'nsaccountlock'] = True
self.attrs[u'preserved'] = True
else:
self.exists = False
self.attrs = {}
def make_preserved_user(self):
""" 'Creates' a preserved user necessary for some tests """
self.ensure_exists()
self.track_delete(preserve=True)
command = self.make_delete_command(no_preserve=False, preserve=True)
result = command()
self.check_delete(result)
def check_attr_preservation(self, expected):
""" Verifies that ipaUniqueID, uidNumber and gidNumber are
preserved upon reactivation. Also verifies that resulting
active user is a member of ipausers group only."""
command = self.make_retrieve_command(all=True)
result = command()
assert_deepequal(dict(
ipauniqueid=result[u'result'][u'ipauniqueid'],
uidnumber=result[u'result'][u'uidnumber'],
gidnumber=result[u'result'][u'gidnumber']
), expected)
if (u'memberof_group' not in result[u'result'] or
result[u'result'][u'memberof_group'] != (u'ipausers',)):
assert False
def make_fixture_restore(self, request):
"""Make a pytest fixture for a preserved user that is to be moved to
staged area.
The fixture ensures the plugin entry does not exist before
and after the tests that use it. It takes into account
that the preserved user no longer exists after restoring it,
therefore the fixture verifies after the tests
that the preserved user doesn't exist instead of deleting it.
"""
del_command = self.make_delete_command()
try:
del_command()
except errors.NotFound:
pass
def finish():
with raises_exact(errors.NotFound(
reason=u'%s: user not found' % self.uid)):
del_command()
request.addfinalizer(finish)
return self
def make_admin(self, admin_group=u'admins'):
""" Add user to the administrator's group """
result = self.run_command('group_show', admin_group)
admin_group_content = result[u'result'][u'member_user']
admin_group_expected = list(admin_group_content) + [self.name]
command = self.make_group_add_member_command(
admin_group, **dict(user=self.name)
)
result = command()
assert_deepequal(dict(
completed=1,
failed=dict(
member=dict(group=tuple(), user=tuple())
),
result={
'dn': get_group_dn(admin_group),
'member_user': admin_group_expected,
'gidnumber': [fuzzy_digits],
'cn': [admin_group],
'description': [u'Account administrators group'],
},
), result)