Skip to content

Commit

Permalink
[irods#7453,irods#7454,irods#7457] Fix msiRemoveUserFromGroup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alanking committed Mar 27, 2024
1 parent 70efc21 commit b098e1e
Showing 1 changed file with 94 additions and 30 deletions.
124 changes: 94 additions & 30 deletions scripts/irods/test/test_all_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1502,34 +1502,54 @@ def test_msiRemoveUserFromGroup__issue_7165(self):
# fed into the rules. This is necessary for testing unqualified usernames and qualified
# usernames.
rule_map = {
'irods_rule_engine_plugin-irods_rule_language': textwrap.dedent(f'''
msiRemoveUserFromGroup('{group}', '{self.user0.username}', '{{}}')
'irods_rule_engine_plugin-irods_rule_language': textwrap.dedent('''
issue_7165 {{
msiRemoveUserFromGroup('{0}', '{1}', '{2}')
}}
INPUT null
OUTPUT ruleExecOut
'''),
'irods_rule_engine_plugin-python': textwrap.dedent(f'''
def issue_7165(_, callback, _):
callback.msiRemoveUserFromGroup('{group}', '{self.user0.username}', '{{}}')
'irods_rule_engine_plugin-python': textwrap.dedent('''
def main(rule_args, callback, rei):
try:
callback.msiRemoveUserFromGroup('{0}', '{1}', '{2}')
except Exception as e:
callback.writeLine('stderr', str(e))
INPUT null
OUTPUT ruleExecOut
'''),
}

plugin = IrodsConfig().default_rule_engine_plugin
rule_file_path = os.path.join(self.admin.local_session_dir, 'test_msiRemoveUserFromGroup__issue_7165.r')

try:
with open(rule_file_path, 'w') as rule_file:
rule_file.write(rule_map[plugin].format(group, self.user0.username, ''))

# Create a new group and add a user to it.
self.admin.assert_icommand(['iadmin', 'mkgroup', group])
self.admin.assert_icommand(['iadmin', 'atg', group, self.user0.username])
self.admin.assert_icommand(['iadmin', 'lg', group], 'STDOUT', [self.user0.qualified_username])

# Remove the user from the group using the microservice.
rep_instance = plugin_name + '-instance'
self.admin.assert_icommand(['irule', '-r', rep_instance, rule_map[plugin_name].format(''), 'null', 'ruleExecOut'])
rep_instance = plugin + '-instance'
self.admin.assert_icommand(['irule', '-r', rep_instance, '-F', rule_file_path])

# Show the user is no longer a member of the group.
out, _, ec = self.admin.run_icommand(['iadmin', 'lg', group])
self.assertEqual(ec, 0)
self.assertNotIn(self.user0.qualified_username, out.strip())

with open(rule_file_path, 'w') as rule_file:
rule_file.write(rule_map[plugin].format(group, self.user0.username, self.user0.zone_name))

# Add the user back to the group and remove them again, except this time, include the user's zone.
self.admin.assert_icommand(['iadmin', 'atg', group, self.user0.username])
self.admin.assert_icommand(['iadmin', 'lg', group], 'STDOUT', [self.user0.qualified_username])
self.admin.assert_icommand(['irule', '-r', rep_instance, rule_map[plugin_name].format(self.user0.zone_name), 'null', 'ruleExecOut'])
self.admin.assert_icommand(['irule', '-r', rep_instance, '-F', rule_file_path])

# Show the user was removed from the group successfully.
out, _, ec = self.admin.run_icommand(['iadmin', 'lg', group])
Expand All @@ -1555,15 +1575,30 @@ def test_msiRemoveUserFromGroup_supports_remote_users__issue_7165(self):

# This map contains rule code key'd off of the the REP type.
rule_map = {
'irods_rule_engine_plugin-irods_rule_language': textwrap.dedent(f'''
msiRemoveUserFromGroup('{group}', '{{}}', '{{}}')
'irods_rule_engine_plugin-irods_rule_language': textwrap.dedent('''
issue_7165 {{
msiRemoveUserFromGroup('{0}', '{1}', '{2}')
}}
INPUT null
OUTPUT ruleExecOut
'''),
'irods_rule_engine_plugin-python': textwrap.dedent(f'''
def issue_7165(_, callback, _):
callback.msiRemoveUserFromGroup('{group}', '{{}}', '{{}}')
'irods_rule_engine_plugin-python': textwrap.dedent('''
def main(rule_args, callback, rei):
try:
callback.msiRemoveUserFromGroup('{0}', '{1}', '{2}')
except Exception as e:
callback.writeLine('stderr', str(e))
INPUT null
OUTPUT ruleExecOut
'''),
}

plugin = IrodsConfig().default_rule_engine_plugin
rule_file_path = os.path.join(
self.admin.local_session_dir, 'test_msiRemoveUserFromGroup_supports_remote_users__issue_7165.r')

try:
# Create the remote zone and user.
self.admin.assert_icommand(['iadmin', 'mkzone', remote_zone, 'remote', 'localhost:1247'])
Expand All @@ -1579,8 +1614,9 @@ def issue_7165(_, callback, _):
# Remove the local user from the group using the microservice.
# Notice the zone is empty. This implies the local zone.
rep_instance = plugin_name + '-instance'
rule = rule_map[plugin_name].format(self.user0.username, '')
self.admin.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut'])
with open(rule_file_path, 'w') as rule_file:
rule_file.write(rule_map[plugin].format(group, self.user0.username, ''))
self.admin.assert_icommand(['irule', '-r', rep_instance, '-F', rule_file_path])

# Show the local user is no longer a member of the group, but the remote user is.
out, _, ec = self.admin.run_icommand(['iadmin', 'lg', group])
Expand All @@ -1593,8 +1629,9 @@ def issue_7165(_, callback, _):
# assume the user to remove originated in the local zone.
self.admin.assert_icommand(['iadmin', 'atg', group, self.user0.username])
self.admin.assert_icommand(['iadmin', 'lg', group], 'STDOUT', [self.user0.qualified_username, remote_user_qualified_name])
rule = rule_map[plugin_name].format(remote_user, remote_zone)
self.admin.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut'])
with open(rule_file_path, 'w') as rule_file:
rule_file.write(rule_map[plugin].format(group, remote_user, remote_zone))
self.admin.assert_icommand(['irule', '-r', rep_instance, '-F', rule_file_path])

# Show the remote user was removed from the group successfully.
out, _, ec = self.admin.run_icommand(['iadmin', 'lg', group])
Expand All @@ -1611,39 +1648,66 @@ def test_msiRemoveUserFromGroup_correctly_handles_incorrect_arguments__issue_716
# This map contains rule code key'd off of the the REP type.
rule_map = {
'irods_rule_engine_plugin-irods_rule_language': textwrap.dedent('''
msiRemoveUserFromGroup('{}', '{}', '{}')
issue_7165 {{
msiRemoveUserFromGroup('{0}', '{1}', '{2}')
}}
INPUT null
OUTPUT ruleExecOut
'''),
'irods_rule_engine_plugin-python': textwrap.dedent('''
def issue_7165(_, callback, _):
callback.msiRemoveUserFromGroup('{}', '{}', '{}')
def main(rule_args, callback, rei):
try:
callback.msiRemoveUserFromGroup('{0}', '{1}', '{2}')
except Exception as e:
callback.writeLine('stderr', str(e))
INPUT null
OUTPUT ruleExecOut
'''),
}

plugin = IrodsConfig().default_rule_engine_plugin
rule_file_path = os.path.join(
self.admin.local_session_dir,
'test_msiRemoveUserFromGroup_correctly_handles_incorrect_arguments__issue_7165.r')

rep_instance = plugin_name + '-instance'
group = 'public'

# Show the MSI returns an error when the user is not a rodsadmin.
rule = rule_map[plugin_name].format(group, self.user0.username, self.user0.zone_name)
# The returned error string changes depending on where the the rule is running. The API being invoked by the
# microservice does not perform the permission check until after it has redirected to the appropriate server
# so the API privilege level will trip for the general admin API before getting to the database plugin.
if test.settings.TOPOLOGY_FROM_RESOURCE_SERVER:
# The returned error string also changes with the python rule engine plugin because python rules can only be
# executed by rodsadmins, so it does not execute the rule at all due to the privilege level being insufficient
# for executing the rule itself rather than the microservice.
if test.settings.TOPOLOGY_FROM_RESOURCE_SERVER or plugin == 'irods_rule_engine_plugin-python':
expected_error_str = '-13000 SYS_NO_API_PRIV'
else:
expected_error_str = '-830000 CAT_INSUFFICIENT_PRIVILEGE_LEVEL'
self.user0.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut'], 'STDERR', [expected_error_str])
# Show the MSI returns an error when the user is not a rodsadmin.
with open(rule_file_path, 'w') as rule_file:
rule_file.write(rule_map[plugin].format(group, self.user0.username, self.user0.zone_name))
self.user0.assert_icommand(
['irule', '-r', rep_instance, '-F', rule_file_path], 'STDERR', [expected_error_str])

# Show the MSI returns an error when the group does not exist.
rule = rule_map[plugin_name].format('does_not_exist', self.user0.username, self.user0.zone_name)
self.admin.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut'], 'STDERR', ['-829000 CAT_INVALID_GROUP'])
with open(rule_file_path, 'w') as rule_file:
rule_file.write(rule_map[plugin].format('does_not_exist', self.user0.username, self.user0.zone_name))
self.admin.assert_icommand(
['irule', '-r', rep_instance, '-F', rule_file_path], 'STDERR_MULTILINE', ['-829000', 'CAT_INVALID_GROUP'])

# Show the MSI returns an error when the user does not exist.
rule = rule_map[plugin_name].format(group, 'does_not_exist', self.user0.zone_name)
self.admin.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut'], 'STDERR', ['-827000 CAT_INVALID_USER'])
with open(rule_file_path, 'w') as rule_file:
rule_file.write(rule_map[plugin].format(group, 'does_not_exist', self.user0.zone_name))
self.admin.assert_icommand(
['irule', '-r', rep_instance, '-F', rule_file_path], 'STDERR_MULTILINE', ['-827000', 'CAT_INVALID_USER'])

# Show the MSI returns an error when the zone is incorrect.
rule = rule_map[plugin_name].format(group, self.user0.username, 'does_not_exist')
self.admin.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut'], 'STDERR', ['-827000 CAT_INVALID_USER'])
with open(rule_file_path, 'w') as rule_file:
rule_file.write(rule_map[plugin].format(group, self.user0.username, 'does_not_exist'))
self.admin.assert_icommand(
['irule', '-r', rep_instance, '-F', rule_file_path], 'STDERR_MULTILINE', ['-827000', 'CAT_INVALID_USER'])

def test_msiRemoveUserFromGroup_returns_error_when_attempting_to_remove_user_from_group_which_they_are_not_a_member_of__issue_7165(self):
# The name of the group that will be created.
Expand Down

0 comments on commit b098e1e

Please sign in to comment.