diff --git a/scripts/irods/test/test_all_rules.py b/scripts/irods/test/test_all_rules.py index c66361483f..5c316073c8 100644 --- a/scripts/irods/test/test_all_rules.py +++ b/scripts/irods/test/test_all_rules.py @@ -1502,34 +1502,54 @@ def test_msiRemoveUserFromGroup__issue_7165(self): # fed into the rules. This is necessary for testing unqualified usernames and qualified # usernames. rule_map = { - 'irods_rule_engine_plugin-irods_rule_language': textwrap.dedent(f''' - msiRemoveUserFromGroup('{group}', '{self.user0.username}', '{{}}') + 'irods_rule_engine_plugin-irods_rule_language': textwrap.dedent(''' + issue_7165 {{ + msiRemoveUserFromGroup('{0}', '{1}', '{2}') + }} + + INPUT null + OUTPUT ruleExecOut '''), - 'irods_rule_engine_plugin-python': textwrap.dedent(f''' - def issue_7165(_, callback, _): - callback.msiRemoveUserFromGroup('{group}', '{self.user0.username}', '{{}}') + 'irods_rule_engine_plugin-python': textwrap.dedent(''' + def main(rule_args, callback, rei): + try: + callback.msiRemoveUserFromGroup('{0}', '{1}', '{2}') + except Exception as e: + callback.writeLine('stderr', str(e)) + + INPUT null + OUTPUT ruleExecOut '''), } + plugin = IrodsConfig().default_rule_engine_plugin + rule_file_path = os.path.join(self.admin.local_session_dir, 'test_msiRemoveUserFromGroup__issue_7165.r') + try: + with open(rule_file_path, 'w') as rule_file: + rule_file.write(rule_map[plugin].format(group, self.user0.username, '')) + # Create a new group and add a user to it. self.admin.assert_icommand(['iadmin', 'mkgroup', group]) self.admin.assert_icommand(['iadmin', 'atg', group, self.user0.username]) self.admin.assert_icommand(['iadmin', 'lg', group], 'STDOUT', [self.user0.qualified_username]) # Remove the user from the group using the microservice. - rep_instance = plugin_name + '-instance' - self.admin.assert_icommand(['irule', '-r', rep_instance, rule_map[plugin_name].format(''), 'null', 'ruleExecOut']) + rep_instance = plugin + '-instance' + self.admin.assert_icommand(['irule', '-r', rep_instance, '-F', rule_file_path]) # Show the user is no longer a member of the group. out, _, ec = self.admin.run_icommand(['iadmin', 'lg', group]) self.assertEqual(ec, 0) self.assertNotIn(self.user0.qualified_username, out.strip()) + with open(rule_file_path, 'w') as rule_file: + rule_file.write(rule_map[plugin].format(group, self.user0.username, self.user0.zone_name)) + # Add the user back to the group and remove them again, except this time, include the user's zone. self.admin.assert_icommand(['iadmin', 'atg', group, self.user0.username]) self.admin.assert_icommand(['iadmin', 'lg', group], 'STDOUT', [self.user0.qualified_username]) - self.admin.assert_icommand(['irule', '-r', rep_instance, rule_map[plugin_name].format(self.user0.zone_name), 'null', 'ruleExecOut']) + self.admin.assert_icommand(['irule', '-r', rep_instance, '-F', rule_file_path]) # Show the user was removed from the group successfully. out, _, ec = self.admin.run_icommand(['iadmin', 'lg', group]) @@ -1555,15 +1575,30 @@ def test_msiRemoveUserFromGroup_supports_remote_users__issue_7165(self): # This map contains rule code key'd off of the the REP type. rule_map = { - 'irods_rule_engine_plugin-irods_rule_language': textwrap.dedent(f''' - msiRemoveUserFromGroup('{group}', '{{}}', '{{}}') + 'irods_rule_engine_plugin-irods_rule_language': textwrap.dedent(''' + issue_7165 {{ + msiRemoveUserFromGroup('{0}', '{1}', '{2}') + }} + + INPUT null + OUTPUT ruleExecOut '''), - 'irods_rule_engine_plugin-python': textwrap.dedent(f''' - def issue_7165(_, callback, _): - callback.msiRemoveUserFromGroup('{group}', '{{}}', '{{}}') + 'irods_rule_engine_plugin-python': textwrap.dedent(''' + def main(rule_args, callback, rei): + try: + callback.msiRemoveUserFromGroup('{0}', '{1}', '{2}') + except Exception as e: + callback.writeLine('stderr', str(e)) + + INPUT null + OUTPUT ruleExecOut '''), } + plugin = IrodsConfig().default_rule_engine_plugin + rule_file_path = os.path.join( + self.admin.local_session_dir, 'test_msiRemoveUserFromGroup_supports_remote_users__issue_7165.r') + try: # Create the remote zone and user. self.admin.assert_icommand(['iadmin', 'mkzone', remote_zone, 'remote', 'localhost:1247']) @@ -1579,8 +1614,9 @@ def issue_7165(_, callback, _): # Remove the local user from the group using the microservice. # Notice the zone is empty. This implies the local zone. rep_instance = plugin_name + '-instance' - rule = rule_map[plugin_name].format(self.user0.username, '') - self.admin.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut']) + with open(rule_file_path, 'w') as rule_file: + rule_file.write(rule_map[plugin].format(group, self.user0.username, '')) + self.admin.assert_icommand(['irule', '-r', rep_instance, '-F', rule_file_path]) # Show the local user is no longer a member of the group, but the remote user is. out, _, ec = self.admin.run_icommand(['iadmin', 'lg', group]) @@ -1593,8 +1629,9 @@ def issue_7165(_, callback, _): # assume the user to remove originated in the local zone. self.admin.assert_icommand(['iadmin', 'atg', group, self.user0.username]) self.admin.assert_icommand(['iadmin', 'lg', group], 'STDOUT', [self.user0.qualified_username, remote_user_qualified_name]) - rule = rule_map[plugin_name].format(remote_user, remote_zone) - self.admin.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut']) + with open(rule_file_path, 'w') as rule_file: + rule_file.write(rule_map[plugin].format(group, remote_user, remote_zone)) + self.admin.assert_icommand(['irule', '-r', rep_instance, '-F', rule_file_path]) # Show the remote user was removed from the group successfully. out, _, ec = self.admin.run_icommand(['iadmin', 'lg', group]) @@ -1611,39 +1648,66 @@ def test_msiRemoveUserFromGroup_correctly_handles_incorrect_arguments__issue_716 # This map contains rule code key'd off of the the REP type. rule_map = { 'irods_rule_engine_plugin-irods_rule_language': textwrap.dedent(''' - msiRemoveUserFromGroup('{}', '{}', '{}') + issue_7165 {{ + msiRemoveUserFromGroup('{0}', '{1}', '{2}') + }} + + INPUT null + OUTPUT ruleExecOut '''), 'irods_rule_engine_plugin-python': textwrap.dedent(''' - def issue_7165(_, callback, _): - callback.msiRemoveUserFromGroup('{}', '{}', '{}') + def main(rule_args, callback, rei): + try: + callback.msiRemoveUserFromGroup('{0}', '{1}', '{2}') + except Exception as e: + callback.writeLine('stderr', str(e)) + + INPUT null + OUTPUT ruleExecOut '''), } + plugin = IrodsConfig().default_rule_engine_plugin + rule_file_path = os.path.join( + self.admin.local_session_dir, + 'test_msiRemoveUserFromGroup_correctly_handles_incorrect_arguments__issue_7165.r') + rep_instance = plugin_name + '-instance' group = 'public' - # Show the MSI returns an error when the user is not a rodsadmin. - rule = rule_map[plugin_name].format(group, self.user0.username, self.user0.zone_name) # The returned error string changes depending on where the the rule is running. The API being invoked by the # microservice does not perform the permission check until after it has redirected to the appropriate server # so the API privilege level will trip for the general admin API before getting to the database plugin. - if test.settings.TOPOLOGY_FROM_RESOURCE_SERVER: + # The returned error string also changes with the python rule engine plugin because python rules can only be + # executed by rodsadmins, so it does not execute the rule at all due to the privilege level being insufficient + # for executing the rule itself rather than the microservice. + if test.settings.TOPOLOGY_FROM_RESOURCE_SERVER or plugin == 'irods_rule_engine_plugin-python': expected_error_str = '-13000 SYS_NO_API_PRIV' else: expected_error_str = '-830000 CAT_INSUFFICIENT_PRIVILEGE_LEVEL' - self.user0.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut'], 'STDERR', [expected_error_str]) + # Show the MSI returns an error when the user is not a rodsadmin. + with open(rule_file_path, 'w') as rule_file: + rule_file.write(rule_map[plugin].format(group, self.user0.username, self.user0.zone_name)) + self.user0.assert_icommand( + ['irule', '-r', rep_instance, '-F', rule_file_path], 'STDERR', [expected_error_str]) # Show the MSI returns an error when the group does not exist. - rule = rule_map[plugin_name].format('does_not_exist', self.user0.username, self.user0.zone_name) - self.admin.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut'], 'STDERR', ['-829000 CAT_INVALID_GROUP']) + with open(rule_file_path, 'w') as rule_file: + rule_file.write(rule_map[plugin].format('does_not_exist', self.user0.username, self.user0.zone_name)) + self.admin.assert_icommand( + ['irule', '-r', rep_instance, '-F', rule_file_path], 'STDERR_MULTILINE', ['-829000', 'CAT_INVALID_GROUP']) # Show the MSI returns an error when the user does not exist. - rule = rule_map[plugin_name].format(group, 'does_not_exist', self.user0.zone_name) - self.admin.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut'], 'STDERR', ['-827000 CAT_INVALID_USER']) + with open(rule_file_path, 'w') as rule_file: + rule_file.write(rule_map[plugin].format(group, 'does_not_exist', self.user0.zone_name)) + self.admin.assert_icommand( + ['irule', '-r', rep_instance, '-F', rule_file_path], 'STDERR_MULTILINE', ['-827000', 'CAT_INVALID_USER']) # Show the MSI returns an error when the zone is incorrect. - rule = rule_map[plugin_name].format(group, self.user0.username, 'does_not_exist') - self.admin.assert_icommand(['irule', '-r', rep_instance, rule, 'null', 'ruleExecOut'], 'STDERR', ['-827000 CAT_INVALID_USER']) + with open(rule_file_path, 'w') as rule_file: + rule_file.write(rule_map[plugin].format(group, self.user0.username, 'does_not_exist')) + self.admin.assert_icommand( + ['irule', '-r', rep_instance, '-F', rule_file_path], 'STDERR_MULTILINE', ['-827000', 'CAT_INVALID_USER']) def test_msiRemoveUserFromGroup_returns_error_when_attempting_to_remove_user_from_group_which_they_are_not_a_member_of__issue_7165(self): # The name of the group that will be created.