146 lines
4.1 KiB
Python
146 lines
4.1 KiB
Python
#
|
|
# Copyright (C) 2021 FreeIPA Contributors see COPYING for license
|
|
#
|
|
|
|
"""
|
|
Test the LDAPCache class.
|
|
"""
|
|
# pylint: disable=no-member
|
|
|
|
from ipalib import api, errors
|
|
from ipapython import ipaldap
|
|
from ipapython.dn import DN
|
|
|
|
import pytest
|
|
|
|
|
|
def hits_and_misses(cache, hits, misses):
|
|
assert cache._cache_hits == hits
|
|
assert cache._cache_misses == misses
|
|
|
|
|
|
@pytest.fixture(scope='class')
|
|
@pytest.mark.tier1
|
|
@pytest.mark.needs_ipaapi
|
|
def class_cache(request):
|
|
cache = ipaldap.LDAPCache(api.env.ldap_uri)
|
|
hits_and_misses(cache, 0, 0)
|
|
|
|
request.cls.cache = cache
|
|
request.cls.userdn = DN(
|
|
'uid=testuser', api.env.container_user, api.env.basedn
|
|
)
|
|
|
|
rpcclient = api.Backend.rpcclient
|
|
was_connected = rpcclient.isconnected()
|
|
|
|
if not was_connected:
|
|
rpcclient.connect()
|
|
|
|
api.Command.user_add('testuser', givenname=u'Test', sn=u'User')
|
|
|
|
yield
|
|
|
|
try:
|
|
api.Command.user_del('testuser')
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
if not was_connected:
|
|
rpcclient.disconnect()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
@pytest.mark.usefixtures('class_cache')
|
|
@pytest.mark.skip_ipaclient_unittest
|
|
@pytest.mark.needs_ipaapi
|
|
class TestLDAPCache:
|
|
|
|
def test_one(self):
|
|
dn = DN('uid=notfound', api.env.container_user, api.env.basedn)
|
|
try:
|
|
self.cache.get_entry(dn)
|
|
except errors.EmptyResult:
|
|
pass
|
|
|
|
assert dn in self.cache.cache
|
|
exc = self.cache.cache[dn].exception
|
|
assert isinstance(exc, errors.EmptyResult)
|
|
|
|
hits_and_misses(self.cache, 0, 1)
|
|
|
|
def test_retrieve_exception(self):
|
|
dn = DN('uid=notfound', api.env.container_user, api.env.basedn)
|
|
try:
|
|
self.cache.get_entry(dn)
|
|
except errors.EmptyResult:
|
|
pass
|
|
assert dn in self.cache.cache
|
|
exc = self.cache.cache[dn].exception
|
|
assert isinstance(exc, errors.EmptyResult)
|
|
hits_and_misses(self.cache, 1, 1)
|
|
|
|
def test_get_testuser(self):
|
|
assert self.userdn not in self.cache.cache
|
|
self.cache.get_entry(self.userdn)
|
|
assert self.userdn in self.cache.cache
|
|
hits_and_misses(self.cache, 1, 2)
|
|
|
|
def test_get_testuser_again(self):
|
|
assert self.userdn in self.cache.cache
|
|
|
|
# get the user again with with no attributes requested (so all)
|
|
self.cache.get_entry(self.userdn)
|
|
hits_and_misses(self.cache, 2, 2)
|
|
|
|
# Now get the user with a subset of cached attributes
|
|
entry = self.cache.get_entry(self.userdn, ('givenname', 'sn', 'cn'))
|
|
# Make sure we only got three attributes, as requested
|
|
assert len(entry.items()) == 3
|
|
hits_and_misses(self.cache, 3, 2)
|
|
|
|
def test_update_testuser(self):
|
|
entry = self.cache.cache[self.userdn].entry
|
|
try:
|
|
self.cache.update_entry(entry)
|
|
except errors.EmptyModlist:
|
|
pass
|
|
assert self.userdn not in self.cache.cache
|
|
hits_and_misses(self.cache, 3, 2)
|
|
|
|
def test_modify_testuser(self):
|
|
self.cache.get_entry(self.userdn)
|
|
entry = self.cache.cache[self.userdn].entry
|
|
try:
|
|
self.cache.modify_s(entry.dn, [])
|
|
except errors.EmptyModlist:
|
|
pass
|
|
assert self.userdn not in self.cache.cache
|
|
hits_and_misses(self.cache, 3, 3)
|
|
|
|
def test_delete_entry(self):
|
|
# We don't care if this is successful or not, just that the
|
|
# cache doesn't retain the deleted entry
|
|
try:
|
|
self.cache.delete_entry(self.userdn)
|
|
except Exception:
|
|
pass
|
|
assert self.userdn not in self.cache.cache
|
|
hits_and_misses(self.cache, 3, 3)
|
|
|
|
def test_add_entry(self):
|
|
# We don't care if this is successful or not, just that the
|
|
# cache doesn't get the added entry
|
|
try:
|
|
self.cache.add_entry(self.userdn)
|
|
except Exception:
|
|
pass
|
|
assert self.userdn not in self.cache.cache
|
|
hits_and_misses(self.cache, 3, 3)
|
|
|
|
def test_clear_cache(self):
|
|
self.cache.clear_cache()
|
|
hits_and_misses(self.cache, 0, 0)
|