Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for AWS Credentials profiles and enhanced Ctrl-C handling #8

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 46 additions & 32 deletions enumerate_iam/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from enumerate_iam.utils.json_utils import json_encoder
from enumerate_iam.bruteforce_tests import BRUTEFORCE_TESTS

MAX_THREADS = 1
MAX_THREADS = 25
CLIENT_POOL = {}
MANAGER = Manager()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can move the manager and stop_signal variables to a different scope? Using variables with global scope should be avoided as much as possible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which particular scope would you think of?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where you wrote: global STOP_SIGNAL

Replace with:

manager = Manager()
stop_signal = manager.Value('i', 0)

And remove the old MANAGER and STOP_SIGNAL. Did not check if that works, please confirm :-)

STOP_SIGNAL = MANAGER.Value('i', 0)
Expand Down Expand Up @@ -72,7 +72,6 @@ def enumerate_using_bruteforce(access_key, secret_key, session_token, region, ti
logger = logging.getLogger()
logger.info('Attempting common-service describe / list brute force.')

# Ignore SIGINT signals so that child processes inherit SIGINT handler
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = ThreadPool(MAX_THREADS)
signal.signal(signal.SIGINT, original_sigint_handler)
Expand Down Expand Up @@ -184,9 +183,9 @@ def check_one_permission(arg_tuple):

logger.debug('Testing %s.%s() in region %s' % (service_name, operation_name, region))

if stop_signal.value == 1:
return
try:
if stop_signal.value == 1:
return
action_response = action_function()
except (botocore.exceptions.ClientError,
botocore.exceptions.EndpointConnectionError,
Expand Down Expand Up @@ -325,14 +324,17 @@ def enumerate_role(iam_client, output):
for policy in role_policies['AttachedPolicies']:
logger.info('-- Policy "%s" (%s)', policy['PolicyName'], policy['PolicyArn'])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the others changed from info to debug, maybe this one should be debug too?

I'm not sure why you changed some of the calls from logger.info to logger.debug but I'll trust you on those changes and that the whole tool will use the same "rules" to decide what is info and what is debug.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So right, I've seen that running the tool by default on verbose output is fine to the general UX, but as soon as iam's get-account-authorization-details results gets dumped, they will generate such a big JSON that would effectively make reading program's output cumbersome. So I started using logger.debug anywhere we've been dumping JSON contents. Whereas policy's name does not affect programs output to the point of making it being of a debug level.

If we wish to have the user know what's going on during program's non-verbose execution, then we shall go with logger.info only for essential or short enough outputs. Should a user want to learn more what the program learns as it goes, debug would provide him with all we've got to say there.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree with your decision, lets keep it like that: debug contains JSON, info contains short messages.

Just one more thing: for the places where you removed the logger.info(), is there a short message we could use? For example:

logger.info('Run for the hills, get_account_authorization_details worked!')

Could still be info.


get_policy = iam.get_role_policy(PolicyName=policy['PolicyName'])
policy_version = iam_client.get_policy_version(PolicyArn=policy['PolicyArn'], VersionId=policy['DefaultVersionId'])
logger.debug('Role attached policy: {}'.format(policy['PolicyName']))
logger.debug('%s', json.dumps(policy_version, indent=4, default=json_encoder))
try:
get_policy = iam.get_role_policy(PolicyName=policy['PolicyName'])
policy_version = iam_client.get_policy_version(PolicyArn=policy['PolicyArn'], VersionId=policy['DefaultVersionId'])
logger.debug('Role attached policy: {}'.format(policy['PolicyName']))
logger.debug('%s', json.dumps(policy_version, indent=4, default=json_encoder))

key = 'iam.role_attached_policies'
if key not in output.keys(): output[key] = []
output[key].append(remove_metadata(policy_version))
key = 'iam.role_attached_policies'
if key not in output.keys(): output[key] = []
output[key].append(remove_metadata(policy_version))
except:
pass
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log error debug log? Applies to all the new try/except that were added.


# Attempt to get inline policies for this user.
try:
Expand All @@ -352,12 +354,15 @@ def enumerate_role(iam_client, output):
for policy in role_policies['PolicyNames']:
logger.info('-- Policy "%s"', policy)

get_policy = iam_client.get_user_policy(RoleName=role_name, PolicyName=policy)
logger.debug('Role inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder))
try:
get_policy = iam_client.get_user_policy(RoleName=role_name, PolicyName=policy)
logger.debug('Role inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder))

key = 'iam.role_inline_policies'
if key not in output.keys(): output[key] = []
output[key].append(remove_metadata(get_policy['PolicyDocument']))
key = 'iam.role_inline_policies'
if key not in output.keys(): output[key] = []
output[key].append(remove_metadata(get_policy['PolicyDocument']))
except:
pass

return output

Expand Down Expand Up @@ -412,13 +417,16 @@ def enumerate_user(iam_client, output):
for policy in user_policies['AttachedPolicies']:
logger.info('-- Policy "%s" (%s)', policy['PolicyName'], policy['PolicyArn'])

get_policy = iam_client.get_policy(PolicyArn=policy['PolicyArn'])
policy_version = iam_client.get_policy_version(PolicyArn=policy['PolicyArn'], VersionId=get_policy['Policy']['DefaultVersionId'])
logger.debug('User attached policy:\n%s', json.dumps(policy_version['PolicyVersion'], indent=4, default=json_encoder))
try:
get_policy = iam_client.get_policy(PolicyArn=policy['PolicyArn'])
policy_version = iam_client.get_policy_version(PolicyArn=policy['PolicyArn'], VersionId=get_policy['Policy']['DefaultVersionId'])
logger.debug('User attached policy:\n%s', json.dumps(policy_version['PolicyVersion'], indent=4, default=json_encoder))

key = 'iam.user_attached_policies'
if key not in output.keys(): output[key] = []
output[key].append(remove_metadata(policy_version['PolicyVersion']))
key = 'iam.user_attached_policies'
if key not in output.keys(): output[key] = []
output[key].append(remove_metadata(policy_version['PolicyVersion']))
except:
pass

# Attempt to get inline policies for this user.
try:
Expand All @@ -438,12 +446,15 @@ def enumerate_user(iam_client, output):
for policy in user_policies['PolicyNames']:
logger.info('-- Policy "%s"', policy)

get_policy = iam_client.get_user_policy(UserName=user_name, PolicyName=policy)
logger.debug('User inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder))
try:
get_policy = iam_client.get_user_policy(UserName=user_name, PolicyName=policy)
logger.debug('User inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder))

key = 'iam.user_inline_policies'
if key not in output.keys(): output[key] = []
output[key].append(remove_metadata(get_policy['PolicyDocument']))
key = 'iam.user_inline_policies'
if key not in output.keys(): output[key] = []
output[key].append(remove_metadata(get_policy['PolicyDocument']))
except:
pass

# Attempt to get the groups attached to this user.
user_groups = dict()
Expand Down Expand Up @@ -481,12 +492,15 @@ def enumerate_user(iam_client, output):
for policy in group_policy['PolicyNames']:
logger.info('---- Policy "%s"', policy)

get_policy = iam_client.get_group_policy(GroupName=group['GroupName'], PolicyName=policy)
logger.debug('Group inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder))
try:
get_policy = iam_client.get_group_policy(GroupName=group['GroupName'], PolicyName=policy)
logger.debug('Group inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder))

key = 'iam.group_inline_policies'
if key not in output.keys(): output[key] = []
output[key].append(remove_metadata(get_policy['PolicyDocument']))
key = 'iam.group_inline_policies'
if key not in output.keys(): output[key] = []
output[key].append(remove_metadata(get_policy['PolicyDocument']))
except:
pass

except botocore.exceptions.ClientError as err:
pass
Expand Down