-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added support for AWS Credentials profiles and enhanced Ctrl-C handling #8
base: master
Are you sure you want to change the base?
Changes from all commits
a3bd618
8cd7a3b
79b8a4b
9b4cfc2
21cbc5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,60 @@ | ||
#!/usr/bin/env python | ||
#!/usr/bin/env python3 | ||
import sys | ||
import json | ||
import logging | ||
import argparse | ||
|
||
from boto3 import Session | ||
from enumerate_iam.main import enumerate_iam | ||
|
||
from enumerate_iam.utils.json_utils import json_encoder | ||
|
||
def main(): | ||
parser = argparse.ArgumentParser(description='Enumerate IAM permissions') | ||
|
||
parser.add_argument('--access-key', help='AWS access key', required=True) | ||
parser.add_argument('--secret-key', help='AWS secret key', required=True) | ||
parser.add_argument('--profile', help='AWS profile name fetched from credentials file. Specify this parameter or access-key and secret-key manually.') | ||
parser.add_argument('--access-key', help='AWS access key if profile was not used') | ||
parser.add_argument('--secret-key', help='AWS secret key if profile was not used') | ||
parser.add_argument('--session-token', help='STS session token') | ||
parser.add_argument('--region', help='AWS region to send API requests to', default='us-east-1') | ||
parser.add_argument('--output', help='File to write output JSON containing all of the collected permissions') | ||
parser.add_argument('--timeout', help='Timeout in minutes for permissions brute-forcing activity. Def: 15.', type=int, default=15) | ||
#parser.add_argument('--verbose', action='store_true', help='Enable verbose output.') | ||
parser.add_argument('--debug', action='store_true', help='Enable debug output.') | ||
|
||
args = parser.parse_args() | ||
|
||
enumerate_iam(args.access_key, | ||
args.secret_key, | ||
args.session_token, | ||
args.region) | ||
|
||
if args.profile and (args.access_key or args.secret_key or args.session_token): | ||
sys.stderr.write('error: Profile and raw AWS credential options are mutually exclusive.\n') | ||
sys.stderr.write(' Please specify either --profile or --access-key and --secret-key.\n\n') | ||
parser.print_help() | ||
sys.exit(2) | ||
|
||
access_key = args.access_key | ||
secret_key = args.secret_key | ||
session_token = args.session_token | ||
|
||
if args.profile: | ||
session = Session(profile_name = args.profile) | ||
credentials = session.get_credentials() | ||
currcreds = credentials.get_frozen_credentials() | ||
access_key = currcreds.access_key | ||
secret_key = currcreds.secret_key | ||
session_token = currcreds.token | ||
|
||
level = logging.INFO | ||
if args.debug: | ||
level = logging.DEBUG | ||
|
||
output = enumerate_iam(access_key, | ||
secret_key, | ||
session_token, | ||
args.region, | ||
args.timeout * 60, | ||
level) | ||
|
||
if args.output: | ||
with open(args.output, 'w') as f: | ||
f.write(json.dumps(output, indent=4, default=json_encoder)) | ||
|
||
if __name__ == '__main__': | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,20 +19,24 @@ | |
import re | ||
import json | ||
import logging | ||
import signal | ||
import boto3 | ||
import botocore | ||
import random | ||
|
||
from botocore.client import Config | ||
from botocore.endpoint import MAX_POOL_CONNECTIONS | ||
from multiprocessing.dummy import Pool as ThreadPool | ||
from multiprocessing import TimeoutError | ||
from multiprocessing.dummy import Pool as ThreadPool, Manager, Value | ||
|
||
from enumerate_iam.utils.remove_metadata import remove_metadata | ||
from enumerate_iam.utils.json_utils import json_encoder | ||
from enumerate_iam.bruteforce_tests import BRUTEFORCE_TESTS | ||
|
||
MAX_THREADS = 25 | ||
CLIENT_POOL = {} | ||
MANAGER = Manager() | ||
STOP_SIGNAL = MANAGER.Value('i', 0) | ||
|
||
|
||
def report_arn(candidate): | ||
|
@@ -58,21 +62,31 @@ def report_arn(candidate): | |
return None, None, None | ||
|
||
|
||
def enumerate_using_bruteforce(access_key, secret_key, session_token, region): | ||
def enumerate_using_bruteforce(access_key, secret_key, session_token, region, timeout): | ||
""" | ||
Attempt to brute-force common describe calls. | ||
""" | ||
global STOP_SIGNAL | ||
output = dict() | ||
|
||
logger = logging.getLogger() | ||
logger.info('Attempting common-service describe / list brute force.') | ||
|
||
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
pool = ThreadPool(MAX_THREADS) | ||
args_generator = generate_args(access_key, secret_key, session_token, region) | ||
signal.signal(signal.SIGINT, original_sigint_handler) | ||
|
||
args_generator = generate_args(access_key, secret_key, session_token, region, STOP_SIGNAL) | ||
|
||
try: | ||
results = pool.map(check_one_permission, args_generator) | ||
results = pool.map_async(check_one_permission, args_generator) | ||
results.get(timeout) | ||
except TimeoutError: | ||
logger.info('Brute-forcing permissions timed out.') | ||
STOP_SIGNAL.value = 1 | ||
|
||
except KeyboardInterrupt: | ||
STOP_SIGNAL.value = 1 | ||
print('') | ||
|
||
results = [] | ||
|
@@ -100,7 +114,7 @@ def enumerate_using_bruteforce(access_key, secret_key, session_token, region): | |
return output | ||
|
||
|
||
def generate_args(access_key, secret_key, session_token, region): | ||
def generate_args(access_key, secret_key, session_token, region, stop_signal): | ||
|
||
service_names = list(BRUTEFORCE_TESTS.keys()) | ||
|
||
|
@@ -111,7 +125,7 @@ def generate_args(access_key, secret_key, session_token, region): | |
random.shuffle(actions) | ||
|
||
for action in actions: | ||
yield access_key, secret_key, session_token, region, service_name, action | ||
yield access_key, secret_key, session_token, region, stop_signal, service_name, action | ||
|
||
|
||
def get_client(access_key, secret_key, session_token, service_name, region): | ||
|
@@ -149,9 +163,12 @@ def get_client(access_key, secret_key, session_token, service_name, region): | |
|
||
|
||
def check_one_permission(arg_tuple): | ||
access_key, secret_key, session_token, region, service_name, operation_name = arg_tuple | ||
access_key, secret_key, session_token, region, stop_signal, service_name, operation_name = arg_tuple | ||
logger = logging.getLogger() | ||
|
||
if stop_signal.value == 1: | ||
return | ||
|
||
service_client = get_client(access_key, secret_key, session_token, service_name, region) | ||
if service_client is None: | ||
return | ||
|
@@ -166,6 +183,8 @@ def check_one_permission(arg_tuple): | |
|
||
logger.debug('Testing %s.%s() in region %s' % (service_name, operation_name, region)) | ||
|
||
if stop_signal.value == 1: | ||
return | ||
try: | ||
action_response = action_function() | ||
except (botocore.exceptions.ClientError, | ||
|
@@ -186,9 +205,9 @@ def check_one_permission(arg_tuple): | |
return key, remove_metadata(action_response) | ||
|
||
|
||
def configure_logging(): | ||
def configure_logging(level): | ||
logging.basicConfig( | ||
level=logging.INFO, | ||
level=level, | ||
format='%(asctime)s - %(process)d - [%(levelname)s] %(message)s', | ||
) | ||
|
||
|
@@ -207,17 +226,17 @@ def configure_logging(): | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | ||
|
||
|
||
def enumerate_iam(access_key, secret_key, session_token, region): | ||
def enumerate_iam(access_key, secret_key, session_token, region, timeout, level = logging.INFO): | ||
"""IAM Account Enumerator. | ||
|
||
This code provides a mechanism to attempt to validate the permissions assigned | ||
to a given set of AWS tokens. | ||
""" | ||
output = dict() | ||
configure_logging() | ||
configure_logging(level) | ||
|
||
output['iam'] = enumerate_using_iam(access_key, secret_key, session_token, region) | ||
output['bruteforce'] = enumerate_using_bruteforce(access_key, secret_key, session_token, region) | ||
output['bruteforce'] = enumerate_using_bruteforce(access_key, secret_key, session_token, region, timeout) | ||
|
||
return output | ||
|
||
|
@@ -243,8 +262,8 @@ def enumerate_using_iam(access_key, secret_key, session_token, region): | |
botocore.exceptions.ReadTimeoutError): | ||
pass | ||
else: | ||
logger.info('Run for the hills, get_account_authorization_details worked!') | ||
logger.info('-- %s', json.dumps(everything, indent=4, default=json_encoder)) | ||
logger.debug('Run for the hills, get_account_authorization_details worked!') | ||
logger.debug('%s', json.dumps(everything, indent=4, default=json_encoder)) | ||
|
||
output['iam.get_account_authorization_details'] = remove_metadata(everything) | ||
|
||
|
@@ -293,6 +312,7 @@ def enumerate_role(iam_client, output): | |
pass | ||
else: | ||
output['iam.list_attached_role_policies'] = remove_metadata(role_policies) | ||
logger.debug('%s', json.dumps(role_policies, indent=4, default=json_encoder)) | ||
|
||
logger.info( | ||
'Role "%s" has %0d attached policies', | ||
|
@@ -304,6 +324,18 @@ def enumerate_role(iam_client, output): | |
for policy in role_policies['AttachedPolicies']: | ||
logger.info('-- Policy "%s" (%s)', policy['PolicyName'], policy['PolicyArn']) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the others changed from I'm not sure why you changed some of the calls from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So right, I've seen that running the tool by default on verbose output is fine to the general UX, but as soon as iam's get-account-authorization-details results gets dumped, they will generate such a big JSON that would effectively make reading program's output cumbersome. So I started using If we wish to have the user know what's going on during program's non-verbose execution, then we shall go with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Totally agree with your decision, lets keep it like that: Just one more thing: for the places where you removed the
Could still be |
||
|
||
try: | ||
get_policy = iam.get_role_policy(PolicyName=policy['PolicyName']) | ||
policy_version = iam_client.get_policy_version(PolicyArn=policy['PolicyArn'], VersionId=policy['DefaultVersionId']) | ||
logger.debug('Role attached policy: {}'.format(policy['PolicyName'])) | ||
logger.debug('%s', json.dumps(policy_version, indent=4, default=json_encoder)) | ||
|
||
key = 'iam.role_attached_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(policy_version)) | ||
except: | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe log error debug log? Applies to all the new try/except that were added. |
||
|
||
# Attempt to get inline policies for this user. | ||
try: | ||
role_policies = iam_client.list_role_policies(RoleName=role_name) | ||
|
@@ -313,7 +345,7 @@ def enumerate_role(iam_client, output): | |
output['iam.list_role_policies'] = remove_metadata(role_policies) | ||
|
||
logger.info( | ||
'User "%s" has %0d inline policies', | ||
'Role "%s" has %0d inline policies', | ||
role['Role']['RoleName'], | ||
len(role_policies['PolicyNames']) | ||
) | ||
|
@@ -322,6 +354,16 @@ def enumerate_role(iam_client, output): | |
for policy in role_policies['PolicyNames']: | ||
logger.info('-- Policy "%s"', policy) | ||
|
||
try: | ||
get_policy = iam_client.get_user_policy(RoleName=role_name, PolicyName=policy) | ||
logger.debug('Role inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder)) | ||
|
||
key = 'iam.role_inline_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(get_policy['PolicyDocument'])) | ||
except: | ||
pass | ||
|
||
return output | ||
|
||
|
||
|
@@ -375,6 +417,17 @@ def enumerate_user(iam_client, output): | |
for policy in user_policies['AttachedPolicies']: | ||
logger.info('-- Policy "%s" (%s)', policy['PolicyName'], policy['PolicyArn']) | ||
|
||
try: | ||
get_policy = iam_client.get_policy(PolicyArn=policy['PolicyArn']) | ||
policy_version = iam_client.get_policy_version(PolicyArn=policy['PolicyArn'], VersionId=get_policy['Policy']['DefaultVersionId']) | ||
logger.debug('User attached policy:\n%s', json.dumps(policy_version['PolicyVersion'], indent=4, default=json_encoder)) | ||
|
||
key = 'iam.user_attached_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(policy_version['PolicyVersion'])) | ||
except: | ||
pass | ||
|
||
# Attempt to get inline policies for this user. | ||
try: | ||
user_policies = iam_client.list_user_policies(UserName=user_name) | ||
|
@@ -393,6 +446,16 @@ def enumerate_user(iam_client, output): | |
for policy in user_policies['PolicyNames']: | ||
logger.info('-- Policy "%s"', policy) | ||
|
||
try: | ||
get_policy = iam_client.get_user_policy(UserName=user_name, PolicyName=policy) | ||
logger.debug('User inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder)) | ||
|
||
key = 'iam.user_inline_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(get_policy['PolicyDocument'])) | ||
except: | ||
pass | ||
|
||
# Attempt to get the groups attached to this user. | ||
user_groups = dict() | ||
user_groups['Groups'] = [] | ||
|
@@ -428,6 +491,17 @@ def enumerate_user(iam_client, output): | |
# List all group policy names. | ||
for policy in group_policy['PolicyNames']: | ||
logger.info('---- Policy "%s"', policy) | ||
|
||
try: | ||
get_policy = iam_client.get_group_policy(GroupName=group['GroupName'], PolicyName=policy) | ||
logger.debug('Group inline policy:\n%s', json.dumps(get_policy['PolicyDocument'], indent=4, default=json_encoder)) | ||
|
||
key = 'iam.group_inline_policies' | ||
if key not in output.keys(): output[key] = [] | ||
output[key].append(remove_metadata(get_policy['PolicyDocument'])) | ||
except: | ||
pass | ||
|
||
except botocore.exceptions.ClientError as err: | ||
pass | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we can move the
manager
andstop_signal
variables to a different scope? Using variables with global scope should be avoided as much as possible.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which particular scope would you think of?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where you wrote:
global STOP_SIGNAL
Replace with:
And remove the old
MANAGER
andSTOP_SIGNAL
. Did not check if that works, please confirm :-)