Skip to content

Commit

Permalink
tests: housekeeping - test_cache.py
Browse files Browse the repository at this point in the history
housekeeping, the following is looked at and may have been done:

* fixed typos and standardized formatting
* renamed test cases to improve the clarity of what the test does
* improved docstring language, setup, steps and expected results
* synced code with the docstring order
* removed necessary configuration relevant to the test
* added pytest.mark.importance to test cases

noteable changes:

* created test_tools.py to move sss_cache test to
* renamed test_sss_cache.py to test_cache.py
* renamed test_memory_cache.py to test_memcache.py
  • Loading branch information
Dan Lavu committed Jul 11, 2024
1 parent c19cac2 commit 6ed7dd4
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 176 deletions.
102 changes: 49 additions & 53 deletions src/tests/multihost/alltests/test_sssctl_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
:upstream: yes
:status: approved
"""

from __future__ import print_function
import pytest
from sssd.testlib.common.utils import sssdTools
Expand All @@ -17,17 +18,18 @@ def client_version(multihost):
return True


@pytest.mark.usefixtures('default_sssd')
@pytest.mark.usefixtures("default_sssd")
@pytest.mark.sssctl
class Testsssctl(object):
"""
This is test case class for sssctl suite
"""
@pytest.mark.converted('test_sssctl.py', 'test_sssctl__user_show_cache_expiration_time')

@pytest.mark.converted(
"test_tools.py", "test_sssctl__user_show_cache_expiration_time"
)
@pytest.mark.tier1_2
def test_0001_bz1640576(self, multihost,
backupsssdconf,
localusers):
def test_0001_bz1640576(self, multihost, backupsssdconf, localusers):
"""
:title: IDM-SSSD-TC: sssctl: sssctl reports incorrect
information about local user's cache entry expiration time
Expand All @@ -37,27 +39,22 @@ def test_0001_bz1640576(self, multihost,
pytest.skip("Files Provider support isn't available, skipping")
users = localusers
tools = sssdTools(multihost.client[0])
multihost.client[0].service_sssd('stop')
tools.remove_sss_cache('/var/lib/sss/db')
tools.remove_sss_cache('/var/log/sssd')
sssd_param = {'domains': 'local'}
tools.sssd_conf('sssd', sssd_param)
param = {'id_provider': 'files',
'passwd_files': '/etc/passwd'}
tools.sssd_conf('domain/local', param)
multihost.client[0].service_sssd('start')
multihost.client[0].service_sssd("stop")
tools.remove_sss_cache("/var/lib/sss/db")
tools.remove_sss_cache("/var/log/sssd")
sssd_param = {"domains": "local"}
tools.sssd_conf("sssd", sssd_param)
param = {"id_provider": "files", "passwd_files": "/etc/passwd"}
tools.sssd_conf("domain/local", param)
multihost.client[0].service_sssd("start")
for user in users.keys():
sssctl_cmd = 'sssctl user-show %s' % user
cmd = multihost.client[0].run_command(sssctl_cmd,
raiseonerr=False)
assert 'Cache entry expiration time: Never'\
in cmd.stdout_text
sssctl_cmd = "sssctl user-show %s" % user
cmd = multihost.client[0].run_command(sssctl_cmd, raiseonerr=False)
assert "Cache entry expiration time: Never" in cmd.stdout_text

@pytest.mark.converted('test_sssctl.py', 'test_sssctl__handle_implicit_domain')
@pytest.mark.converted("test_sssctl.py", "test_sssctl__handle_implicit_domain")
@pytest.mark.tier1_2
def test_0002_bz1599207(self, multihost,
backupsssdconf,
localusers):
def test_0002_bz1599207(self, multihost, backupsssdconf, localusers):
"""
:title: IDM-SSSD-TC: sssctl: sssd tools do not handle the implicit
domain
Expand All @@ -67,28 +64,27 @@ def test_0002_bz1599207(self, multihost,
pytest.skip("Files Provider support isn't available, skipping")
users = localusers
tools = sssdTools(multihost.client[0])
multihost.client[0].service_sssd('stop')
tools.remove_sss_cache('/var/lib/sss/db')
tools.remove_sss_cache('/var/log/sssd')
tools.sssd_conf("sssd",
{'enable_files_domain': 'true'},
action='update')
multihost.client[0].service_sssd('start')
multihost.client[0].service_sssd("stop")
tools.remove_sss_cache("/var/lib/sss/db")
tools.remove_sss_cache("/var/log/sssd")
tools.sssd_conf("sssd", {"enable_files_domain": "true"}, action="update")
multihost.client[0].service_sssd("start")
for user in users.keys():
cmd = multihost.client[0].run_command('getent'
' -s sss'
' passwd %s '
'&& sssctl '
'user-show %s' %
(user, user),
raiseonerr=False)
assert 'Cache entry creation date' in \
cmd.stdout_text and cmd.returncode == 0
cmd = multihost.client[0].run_command(
"getent"
" -s sss"
" passwd %s "
"&& sssctl "
"user-show %s" % (user, user),
raiseonerr=False,
)
assert (
"Cache entry creation date" in cmd.stdout_text and cmd.returncode == 0
)

@pytest.mark.converted('test_sss_cache.py', 'test_sss_cache__cache_expire_message')
@pytest.mark.converted("test_cache.py", "test_sss_cache__cache_expire_message")
@pytest.mark.tier1_2
def test_0003_bz1661182(self, multihost,
backupsssdconf):
def test_0003_bz1661182(self, multihost, backupsssdconf):
"""
:title: sss_cache prints spurious error messages
when invoked from shadow-utils on package install
Expand All @@ -111,24 +107,24 @@ def test_0003_bz1661182(self, multihost,
if not multihost.client[0].detect_files_provider():
pytest.skip("Files Provider support isn't available, skipping")
tools = sssdTools(multihost.client[0])
ldap_params = {'enable_files_domain': 'false'}
tools.sssd_conf('sssd', ldap_params)
ldap_params = {"enable_files_domain": "false"}
tools.sssd_conf("sssd", ldap_params)
with pytest.raises(SSSDException):
multihost.client[0].service_sssd('restart')
multihost.client[0].service_sssd("restart")
ps_cmd = "> /var/log/sssd/sssd.log"
multihost.client[0].run_command(ps_cmd)
ps_cmd = "useradd user1_test"
multihost.client[0].run_command(ps_cmd, raiseonerr=False)
ps_cmd = "usermod -a -G wheel user1_test"
cmd = multihost.client[0].run_command(ps_cmd)
assert 'No domains configured, fatal error!' \
not in cmd.stdout_text
assert "No domains configured, fatal error!" not in cmd.stdout_text
ps_cmd = "userdel user1_test"
multihost.client[0].run_command(ps_cmd)
for ps_cmd in ('sss_cache -U',
'sss_cache -G',
'sss_cache -E',
'sss_cache -u non-existinguser'):
for ps_cmd in (
"sss_cache -U",
"sss_cache -G",
"sss_cache -E",
"sss_cache -u non-existinguser",
):
cmd = multihost.client[0].run_command(ps_cmd)
assert 'No domains configured, fatal error!' \
not in cmd.stdout_text
assert "No domains configured, fatal error!" not in cmd.stdout_text
Original file line number Diff line number Diff line change
@@ -1,72 +1,43 @@
"""
sss_cache tests.
SSSD Cache Tests.
:requirement: IDM-SSSD-REQ: Status utility
Tests pertaining SSSD caches, the following types are tested and some will be in other python files.
* Local cache (LDB)
* Negative cache (ncache)
* In-memory cache (memcache): test_memcache.py
:requirement: Cache
"""

from __future__ import annotations

import time

import pytest
from pytest_mh.ssh import SSHProcessError
from sssd_test_framework.roles.client import Client
from sssd_test_framework.roles.generic import GenericProvider
from sssd_test_framework.topology import KnownTopology, KnownTopologyGroup


@pytest.mark.ticket(bz=1661182)
@pytest.mark.topology(KnownTopology.Client)
def test_sss_cache__cache_expire_message(client: Client):
"""
:title: sss_cache do not print fake error messages
:setup:
1. Configure SSSD without any domain
2. Set to sssd section "enable_files_domain" to "false"
3. Create local user
:steps:
1. Restart SSSD
2. Modify existing local user
3. Expire cache with specific options
:expectedresults:
1. Error is raised, SSSD is not running
2. Modified successfully
3. Output did not contain wrong messages
:customerscenario: True
"""
client.sssd.sssd["enable_files_domain"] = "false"
client.local.user("user1").add()

with pytest.raises(SSHProcessError):
client.sssd.restart()

res = client.host.ssh.run("usermod -a -G wheel user1")
assert "No domains configured, fatal error!" not in res.stdout

for cmd in ("sss_cache -U", "sss_cache -G", "sss_cache -E", "sss_cache --user=nonexisting"):
res = client.host.ssh.run(cmd)
assert "No domains configured, fatal error!" not in res.stdout


@pytest.mark.importance("critical")
@pytest.mark.cache
@pytest.mark.topology(KnownTopologyGroup.AnyProvider)
def test_sss_cache__ldb_database_is_refreshed_as_configured(client: Client, provider: GenericProvider):
def test_cache__is_refreshed_as_configured(client: Client, provider: GenericProvider):
"""
:title: Ensuring ldb cache data is refreshed correctly
:title: Ensuring LDB cache refreshes at configured intervals
:setup:
1. Create provider user
2. Create provider group
3. Create provider netgroup
4. Configure SSSD and set 'entry_cache_timeout' to 1 and 'refresh_expired_interval' to 2
1. Create user
2. Create group
3. Create netgroup
4. Configure SSSD and set 'entry_cache_timeout to 1' and 'refresh_expired_interval to 2'
5. Restart SSSD
6. Populate the cache by performing 'getent' on the user, group and netgroup
6. Lookup user, group and netgroup
:steps:
1. Search for user, group and netgroup lastUpdate and dataExpireTimestamp in the ldb database
2. Wait 5 seconds and search for all timestamp in the cache again
1. Search for objects lastUpdate and dataExpireTimestamp in ldb database
2. Wait 5 seconds and repeat search
:expectedresults:
1. The 'dataExpireTimestamp' value equals the 'lastUpdate + entry_cache_timeout' value
2. User, group and netgroup 'lastUpdate' timestamp value has been refreshed
2. Objects 'lastUpdate' timestamp value has been refreshed
:customerscenario: False
"""
user = provider.user("test_user").add()
Expand Down Expand Up @@ -101,7 +72,9 @@ def test_sss_cache__ldb_database_is_refreshed_as_configured(client: Client, prov
expire_time = expire_time + [(int(y[1][0]))]

for m, n in enumerate(last_update):
assert last_update[m] + entry_cache_timeout == expire_time[m]
assert (
last_update[m] + entry_cache_timeout == expire_time[m]
), f"{expire_time[m]} != {last_update[m]} + {entry_cache_timeout}"

time.sleep(5)

Expand All @@ -110,31 +83,30 @@ def test_sss_cache__ldb_database_is_refreshed_as_configured(client: Client, prov
for k, v in result.items():
for y in v.items():
if y[0] == "lastUpdate":
assert last_update[s] <= (int(y[1][0]))
assert last_update[s] <= (int(y[1][0])), f"{s} lastUpdate value is greater than expected!"


@pytest.mark.topology(KnownTopologyGroup.AnyProvider)
def test_sss_cache__check_timestamp_value_in_ldb(client: Client, provider: GenericProvider):
def test_cache__search_for_user_in_ldb_databases(client: Client, provider: GenericProvider):
"""
:title: Verify the existence of timestamp cache and use lsbsearch on those files
:title: Search for user in the following ldb databases, cache_*.ldb and timestamp_*.ldb
:setup:
1. Add user
1. Create user
2. Start SSSD
:steps:
1. Execute getent passwd to fetch user details
2. Check if timestamps cache file exists
3. Get user information using ldbsearch on cache_test.ldb
4. Get user timestamp information using ldbsearch on timestamps_test.ldb
1. Lookup user
2. Check cache
3. Lookup user in cache ldb database
4. Lookup user in timestamp ldb database
:expectedresults:
1. User details should be successfully fetched
2. Cache file should be present
3. User information were successfully fetched
4. User information were successfully fetched
1. User is found
2. Cache file exists
3. User found
4. User found
:customerscenario: False
"""
provider.user("user1").add()
client.sssd.start()

client.tools.getent.passwd("user1")
cache = "/var/lib/sss/db/cache_test.ldb"
timestamps = "/var/lib/sss/db/timestamps_test.ldb"
Expand All @@ -147,22 +119,20 @@ def test_sss_cache__check_timestamp_value_in_ldb(client: Client, provider: Gener


@pytest.mark.topology(KnownTopologyGroup.AnyProvider)
def test_sss_cache__check_timestamp_value_in_ldb_when_fully_qualified_names_enabled(
client: Client, provider: GenericProvider
):
def test_cache__search_for_user_using_fully_qualified_name_in_ldb_databases(client: Client, provider: GenericProvider):
"""
:title: Set use fully qualified names to true and verify cache updates
:title: Search for user using fully qualified name in the following ldb databases, cache_*.ldb and timestamp_*.ldb
:setup:
1. Add user
2. Set use_fully_qualified_names to True in the sssd.conf
3. Start SSSD
4. Execute getent passwd user1@test
1. Create user
2. Start SSSD
:steps:
1. Get user information using ldbsearch on cache_test.ldb
2. Get user timestamp information using ldbsearch on timestamps_test.ldb
1. Lookup user
2. Lookup user in cache ldb database
3. Lookup user in timestamp ldb database
:expectedresults:
1. User information were successfully fetched
2. User information were successfully fetched
1. User found
2. User found
3. User found
:customerscenario: False
"""
provider.user("user1").add()
Expand All @@ -181,40 +151,42 @@ def test_sss_cache__check_timestamp_value_in_ldb_when_fully_qualified_names_enab


@pytest.mark.topology(KnownTopologyGroup.AnyProvider)
def test_sss_cache__check_ldb_updates_when_user_is_deleted_and_modified(client: Client, provider: GenericProvider):
def test_cache__check_ldb_database_for_latest_user_changes_when_modified_and_deleted(
client: Client, provider: GenericProvider
):
"""
:title: Modify user attribute and verify cache updates
:title: Check ldb database for latest user changes when modified and deleted
:setup:
1. Add users
3. Start SSSD
4. Execute getent passwd to fetch cache
5. Expire whole cache
6. Modify and delete user attribute
7. Execute getent passwd again
1. Add users 'user-modify' and 'user-delete'
2. Start SSSD
3. Lookup users
:steps:
1. Try to login
2. Check that modified user was modified
1. Login as users
2. Modify 'user-modify' shell and delete 'user-delete' and clear cache
3. Login as users
4. Lookup user 'user-modify'
:expectedresults:
1. Modified user can login, Deleted usec cannot login
2. Modified user has correct attributes
1. Users logged in
2. User is modified and user is deleted
3. User 'user-modify' logged in
4. User's shell was updated
:customerscenario: False
"""
provider.user("user-modify").add(shell="/bin/bash")
provider.user("user-delete").add(shell="/bin/bash")
client.sssd.start()
client.tools.getent.passwd("user-modify")
client.tools.getent.passwd("user-delete")
client.sssctl.cache_expire(everything=True)

assert client.auth.ssh.password("user-modify", "Secret123"), "Login failed!"
assert client.auth.ssh.password("user-delete", "Secret123"), "Login failed!"

provider.user("user-delete").delete()
provider.user("user-modify").modify(shell="/bin/sh")

client.tools.getent.passwd("user-delete")
client.tools.getent.passwd("user-modify")
client.sssctl.cache_expire(everything=True)

assert client.auth.ssh.password("user-modify", "Secret123")
assert not client.auth.ssh.password("user-delete", "Secret123")
assert client.auth.ssh.password("user-modify", "Secret123"), "Login failed!"
assert not client.auth.ssh.password("user-delete", "Secret123"), "Login successful!"

modify = client.tools.getent.passwd("user-modify")
assert modify is not None
assert modify.shell == "/bin/sh"
assert client.tools.getent.passwd("user-modify").shell == "/bin/sh", "User shell did not update!"
Loading

0 comments on commit 6ed7dd4

Please sign in to comment.