-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added support for AWS Credentials profiles and enhanced Ctrl-C handling #8
base: master
Are you sure you want to change the base?
Changes from 1 commit
a3bd618
8cd7a3b
79b8a4b
9b4cfc2
21cbc5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ | |
from enumerate_iam.utils.json_utils import json_encoder | ||
from enumerate_iam.bruteforce_tests import BRUTEFORCE_TESTS | ||
|
||
MAX_THREADS = 1 | ||
MAX_THREADS = 25 | ||
CLIENT_POOL = {} | ||
MANAGER = Manager() | ||
STOP_SIGNAL = MANAGER.Value('i', 0) | ||
|
@@ -72,7 +72,6 @@ def enumerate_using_bruteforce(access_key, secret_key, session_token, region, ti | |
logger = logging.getLogger() | ||
logger.info('Attempting common-service describe / list brute force.') | ||
|
||
# Ignore SIGINT signals so that child processes inherit SIGINT handler | ||
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
pool = ThreadPool(MAX_THREADS) | ||
signal.signal(signal.SIGINT, original_sigint_handler) | ||
|
@@ -184,9 +183,9 @@ def check_one_permission(arg_tuple): | |
|
||
logger.debug('Testing %s.%s() in region %s' % (service_name, operation_name, region)) | ||
|
||
if stop_signal.value == 1: | ||
return | ||
try: | ||
if stop_signal.value == 1: | ||
return | ||
action_response = action_function() | ||
except (botocore.exceptions.ClientError, | ||
botocore.exceptions.EndpointConnectionError, | ||
|
@@ -325,14 +324,17 @@ def enumerate_role(iam_client, output): | |
for policy in role_policies['AttachedPolicies']: | ||
logger.info('-- Policy "%s" (%s)', policy['PolicyName'], policy['PolicyArn']) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the others changed from I'm not sure why you changed some of the calls from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So right, I've seen that running the tool by default on verbose output is fine to the general UX, but as soon as iam's get-account-authorization-details results gets dumped, they will generate such a big JSON that would effectively make reading program's output cumbersome. So I started using If we wish to have the user know what's going on during program's non-verbose execution, then we shall go with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Totally agree with your decision, lets keep it like that: Just one more thing: for the places where you removed the
Could still be |
||
|
||
get_policy = iam.get_role_policy(PolicyName=policy['PolicyName']) | ||
policy_version = iam_client.get_policy_version(PolicyArn=policy['PolicyArn'], VersionId=policy['DefaultVersionId']) | ||
logger.debug('Role attached policy: {}'.format(policy['PolicyName'])) | ||
logger.debug('%s', json.dumps(policy_version, indent=4, default=json_encoder)) | ||
try: | ||
get_policy = iam.get_role_policy(PolicyName=policy['PolicyName']) | ||
policy_version = iam_client.get_policy_version(PolicyArn=policy['PolicyArn'], VersionId=policy['DefaultVersionId']) | ||
logger.debug('Role attached policy: {}'.format(policy['PolicyName'])) | ||
logger.debug('%s', json.dumps(policy_version, indent=4, default=json_encoder)) | ||
|
||
key = 'iam.role_attached_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(policy_version)) | ||
key = 'iam.role_attached_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(policy_version)) | ||
except: | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe log error debug log? Applies to all the new try/except that were added. |
||
|
||
# Attempt to get inline policies for this user. | ||
try: | ||
|
@@ -352,12 +354,15 @@ def enumerate_role(iam_client, output): | |
for policy in role_policies['PolicyNames']: | ||
logger.info('-- Policy "%s"', policy) | ||
|
||
get_policy = iam_client.get_user_policy(RoleName=role_name, PolicyName=policy) | ||
logger.debug('Role inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder)) | ||
try: | ||
get_policy = iam_client.get_user_policy(RoleName=role_name, PolicyName=policy) | ||
logger.debug('Role inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder)) | ||
|
||
key = 'iam.role_inline_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(get_policy['PolicyDocument'])) | ||
key = 'iam.role_inline_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(get_policy['PolicyDocument'])) | ||
except: | ||
pass | ||
|
||
return output | ||
|
||
|
@@ -412,13 +417,16 @@ def enumerate_user(iam_client, output): | |
for policy in user_policies['AttachedPolicies']: | ||
logger.info('-- Policy "%s" (%s)', policy['PolicyName'], policy['PolicyArn']) | ||
|
||
get_policy = iam_client.get_policy(PolicyArn=policy['PolicyArn']) | ||
policy_version = iam_client.get_policy_version(PolicyArn=policy['PolicyArn'], VersionId=get_policy['Policy']['DefaultVersionId']) | ||
logger.debug('User attached policy:\n%s', json.dumps(policy_version['PolicyVersion'], indent=4, default=json_encoder)) | ||
try: | ||
get_policy = iam_client.get_policy(PolicyArn=policy['PolicyArn']) | ||
policy_version = iam_client.get_policy_version(PolicyArn=policy['PolicyArn'], VersionId=get_policy['Policy']['DefaultVersionId']) | ||
logger.debug('User attached policy:\n%s', json.dumps(policy_version['PolicyVersion'], indent=4, default=json_encoder)) | ||
|
||
key = 'iam.user_attached_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(policy_version['PolicyVersion'])) | ||
key = 'iam.user_attached_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(policy_version['PolicyVersion'])) | ||
except: | ||
pass | ||
|
||
# Attempt to get inline policies for this user. | ||
try: | ||
|
@@ -438,12 +446,15 @@ def enumerate_user(iam_client, output): | |
for policy in user_policies['PolicyNames']: | ||
logger.info('-- Policy "%s"', policy) | ||
|
||
get_policy = iam_client.get_user_policy(UserName=user_name, PolicyName=policy) | ||
logger.debug('User inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder)) | ||
try: | ||
get_policy = iam_client.get_user_policy(UserName=user_name, PolicyName=policy) | ||
logger.debug('User inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder)) | ||
|
||
key = 'iam.user_inline_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(get_policy['PolicyDocument'])) | ||
key = 'iam.user_inline_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(get_policy['PolicyDocument'])) | ||
except: | ||
pass | ||
|
||
# Attempt to get the groups attached to this user. | ||
user_groups = dict() | ||
|
@@ -481,12 +492,15 @@ def enumerate_user(iam_client, output): | |
for policy in group_policy['PolicyNames']: | ||
logger.info('---- Policy "%s"', policy) | ||
|
||
get_policy = iam_client.get_group_policy(GroupName=group['GroupName'], PolicyName=policy) | ||
logger.debug('Group inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder)) | ||
try: | ||
get_policy = iam_client.get_group_policy(GroupName=group['GroupName'], PolicyName=policy) | ||
logger.debug('Group inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder)) | ||
|
||
key = 'iam.group_inline_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(get_policy['PolicyDocument'])) | ||
key = 'iam.group_inline_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(get_policy['PolicyDocument'])) | ||
except: | ||
pass | ||
|
||
except botocore.exceptions.ClientError as err: | ||
pass | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we can move the
manager
andstop_signal
variables to a different scope? Using variables with global scope should be avoided as much as possible.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which particular scope would you think of?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where you wrote:
global STOP_SIGNAL
Replace with:
And remove the old
MANAGER
andSTOP_SIGNAL
. Did not check if that works, please confirm :-)