From a7ae1b7914a1e48135b72a69c9330ec4a4ddb266 Mon Sep 17 00:00:00 2001 From: Aaron Lippold Date: Fri, 6 Sep 2024 09:48:43 -0400 Subject: [PATCH] added error handling, still need to work out cross-region reporting Signed-off-by: Aaron Lippold --- controls/aws_s3_bucket_objects.rb | 51 ++++++---- libraries/concurrent_s3-cop | 157 ++++++++++++++++++++++++++++++ libraries/concurrent_s3.rb | 149 ++++++++++------------------ 3 files changed, 238 insertions(+), 119 deletions(-) create mode 100644 libraries/concurrent_s3-cop diff --git a/controls/aws_s3_bucket_objects.rb b/controls/aws_s3_bucket_objects.rb index a31497b..5bf6e6d 100644 --- a/controls/aws_s3_bucket_objects.rb +++ b/controls/aws_s3_bucket_objects.rb @@ -1,35 +1,44 @@ -require_relative '../libraries/concurrent_s3' +require_relative "../libraries/concurrent_s3" -control 'Public_S3_Objects' do +control "Public_S3_Objects" do impact 0.7 - title 'Ensure there are no publicly accessible S3 objects' - desc 'Ensure there are no publicly accessible S3 objects' - tag "nist": %w[AC-6] - tag "severity": 'high' + title "Ensure there are no publicly accessible S3 objects" + desc "Ensure there are no publicly accessible S3 objects" + tag nist: %w[AC-6] + tag severity: "high" - tag "check": "Review your AWS console and note if any S3 bucket objects are set to + tag check: + "Review your AWS console and note if any S3 bucket objects are set to 'Public'. If any objects are listed as 'Public', then this is a finding." - tag "fix": "Log into your AWS console and select the S3 buckets section. Select + tag fix: + "Log into your AWS console and select the S3 buckets section. Select the buckets found in your review. For each object in the bucket select the permissions tab for the object and remove the Public Access permission." - exception_bucket_list = input('exception_bucket_list') + exception_bucket_list = input("exception_bucket_list") if aws_s3_buckets.bucket_names.empty? impact 0.0 - desc 'This control is Non Applicable since no S3 buckets were found.' + desc "This control is Non Applicable since no S3 buckets were found." - describe 'This control is Non Applicable since no S3 buckets were found.' do - skip 'This control is Non Applicable since no S3 buckets were found.' + describe "This control is Non Applicable since no S3 buckets were found." do + skip "This control is Non Applicable since no S3 buckets were found." end - elsif input('single_bucket').present? - public_objects = get_public_objects(input('single_bucket').to_s) - describe input('single_bucket').to_s do - it 'should not have any public objects' do - failure_message = public_objects.count > 1 ? "#{public_objects.join(', ')} are public" : "#{public_objects.join(', ')} is public" + elsif input("single_bucket").present? + public_objects = get_public_objects(input("single_bucket").to_s) + describe input("single_bucket").to_s do + it "should not have any public objects" do + failure_message = + ( + if public_objects.count > 1 + "#{public_objects.join(", ")} are public" + else + "#{public_objects.join(", ")} is public" + end + ) expect(public_objects).to be_empty, failure_message end end @@ -39,8 +48,12 @@ public_objects_multi = get_public_objects(bucket.to_s) describe bucket.to_s do - it 'should not have any public objects' do - failure_message = "#{public_objects_multi.join(', ')} is public" + it "should not have any public objects" do + failure_message = + "\t- #{public_objects_multi.join("\n\t- ")} \n\tis public" + failure_message = + "\t- #{public_objects_multi.join("\n\t- ")} \n\tare public" if public_objects_multi.count > + 1 expect(public_objects_multi).to be_empty, failure_message end end diff --git a/libraries/concurrent_s3-cop b/libraries/concurrent_s3-cop new file mode 100644 index 0000000..2cd2e97 --- /dev/null +++ b/libraries/concurrent_s3-cop @@ -0,0 +1,157 @@ +require "concurrent" +require "aws-sdk-s3" + +module Aws::S3 + class Bucket + def objects(options = {}) + options = options.merge(bucket: @name) + resp = @client.list_objects_v2(options) + + # Check if the response contains any objects + return ObjectSummary::Collection.new([]) if resp.contents.empty? + + pool = Concurrent::FixedThreadPool.new(32) # Increased pool size + log_thread_pool_status(pool, "Initialized") + + batches = + Enumerator.new do |y| + resp.each_page do |page| + batch = Concurrent::Array.new + page.data.contents.each { |c| submit_task_to_pool(pool, c, batch) } + y.yield(batch) + end + end + + # Ensure all tasks are completed before shutting down the pool + pool.shutdown + pool.wait_for_termination + + ObjectSummary::Collection.new(batches) + ensure + pool.shutdown if pool + end + + private + + def submit_task_to_pool(pool, c, batch, retries = 3) + pool.post { process_object(c, batch) } + rescue Concurrent::RejectedExecutionError => e + if retries > 0 + Inspec::Log.debug "Retrying task submission for object #{c.key}, retries left: #{retries}" + sleep(0.1) # Small delay before retrying + submit_task_to_pool(pool, c, batch, retries - 1) + else + handle_rejected_execution_error(e, c.key, pool) + end + end + + def process_object(c, batch) + batch << ObjectSummary.new( + bucket_name: @name, + key: c.key, + data: c, + client: @client + ) + rescue Aws::S3::Errors::PermanentRedirect => e + Inspec::Log.debug "Permanent redirect for object #{c.key}: #{e.message}" + rescue => e + Inspec::Log.debug "Error processing object #{c.key}: #{e.message}" + Inspec::Log.debug "Backtrace: #{e.backtrace.join("\n")}" + end + + def handle_rejected_execution_error(e, key, pool) + Inspec::Log.debug "Task submission rejected for object #{key}: #{e.message}" + log_thread_pool_status(pool, "RejectedExecutionError") + end + + def log_thread_pool_status(pool, context) + if Inspec::Log.level == :debug + Inspec::Log.debug "Thread pool status (#{context}):" + Inspec::Log.debug " Pool size: #{pool.length}" + Inspec::Log.debug " Queue length: #{pool.queue_length}" + Inspec::Log.debug " Completed tasks: #{pool.completed_task_count}" + end + end + end +end + +def get_public_objects(myBucket) + myPublicKeys = Concurrent::Array.new + s3 = Aws::S3::Resource.new + pool = Concurrent::FixedThreadPool.new(64) # Increased pool size + log_thread_pool_status(pool, "Initialized") + debug_mode = Inspec::Log.level == :debug + + begin + bucket = s3.bucket(myBucket) + object_count = bucket.objects.count + + if debug_mode + Inspec::Log.debug "### Processing Bucket ### : #{myBucket} with #{object_count} objects" + end + + # Check if the bucket has no objects + return myPublicKeys if object_count.zero? + + bucket.objects.each do |object| + Inspec::Log.debug " Examining Key: #{object.key}" if debug_mode + submit_task_to_pool(pool, object, myPublicKeys) + end + + # Ensure all tasks are completed before shutting down the pool + pool.shutdown + pool.wait_for_termination + rescue Aws::S3::Errors::PermanentRedirect => e + Inspec::Log.debug "Permanent redirect for bucket #{myBucket}: #{e.message}" + rescue => e + Inspec::Log.debug "Error accessing bucket #{myBucket}: #{e.message}" + Inspec::Log.debug "Backtrace: #{e.backtrace.join("\n")}" + ensure + pool.shutdown if pool + end + + myPublicKeys +end + +def submit_task_to_pool(pool, object, myPublicKeys, retries = 3) + pool.post { process_public_object(object, myPublicKeys) } +rescue Concurrent::RejectedExecutionError => e + if retries > 0 + Inspec::Log.debug "Retrying task submission for object #{object.key}, retries left: #{retries}" + sleep(0.1) # Small delay before retrying + submit_task_to_pool(pool, object, myPublicKeys, retries - 1) + else + handle_rejected_execution_error(e, object.key, pool) + end +end + +def process_public_object(object, myPublicKeys) + grants = object.acl.grants + if grants.map { |x| x.grantee.type }.any? { |x| x =~ /Group/ } && + grants + .map { |x| x.grantee.uri } + .any? { |x| x =~ /AllUsers|AuthenticatedUsers/ } + myPublicKeys << object.key + end +rescue Aws::S3::Errors::AccessDenied => e + Inspec::Log.debug "Access denied for object #{object.key}: #{e.message}" +rescue Aws::S3::Errors::PermanentRedirect => e + Inspec::Log.debug "Permanent redirect for object #{object.key}: #{e.message}" +rescue => e + Inspec::Log.debug "Error processing object #{object.key}: #{e.message}" + Inspec::Log.debug "Backtrace: #{e.backtrace.join("\n")}" +end + +def handle_rejected_execution_error(e, key, pool) + Inspec::Log.debug "Task submission rejected for object #{key}: #{e.message}" + log_thread_pool_status(pool, "RejectedExecutionError") +end + +def log_thread_pool_status(pool, context) + if Inspec::Log.level == :debug + Inspec::Log.debug "Thread pool status (#{context}):" + Inspec::Log.debug " Pool size: #{pool.length}" + Inspec::Log.debug " Queue length: #{pool.queue_length}" + Inspec::Log.debug " Completed tasks: #{pool.completed_task_count}" + end +end diff --git a/libraries/concurrent_s3.rb b/libraries/concurrent_s3.rb index c7db250..8a36551 100644 --- a/libraries/concurrent_s3.rb +++ b/libraries/concurrent_s3.rb @@ -1,95 +1,64 @@ -## -# The above functions utilize multithreading with a thread pool to efficiently process objects in an -# AWS S3 bucket and identify public objects based on their ACL grants. -# -# Args: -# myBucket: The code you provided defines methods to retrieve public objects from an AWS S3 bucket -# using multi-threading for improved performance. The `get_public_objects` method iterates over the -# objects in the specified S3 bucket, checks their ACL permissions, and collects the keys of objects -# that are publicly accessible. -# -# Returns: -# The `get_public_objects` method returns an array of keys for objects in a specified S3 bucket that -# have public access permissions for all users or authenticated users. require "concurrent" require "aws-sdk-s3" module Aws::S3 class Bucket def objects(options = {}) - options = options.merge(bucket: @name) - resp = @client.list_objects_v2(options) - - # Check if the response contains any objects - return ObjectSummary::Collection.new([]) if resp.contents.empty? - - pool = Concurrent::FixedThreadPool.new(16) - log_thread_pool_status(pool, "Initialized") - batches = Enumerator.new do |y| - resp.each_page do |page| - batch = Concurrent::Array.new - page.data.contents.each do |c| - begin + options = options.merge(bucket: @name) + begin + resp = @client.list_objects_v2(options) + resp.each_page do |page| + batch = [] + pool = Concurrent::FixedThreadPool.new(16) + mutex = Mutex.new + page.data.contents.each do |c| pool.post do begin - batch << ObjectSummary.new( - bucket_name: @name, - key: c.key, - data: c, - client: @client - ) + mutex.synchronize do + batch << ObjectSummary.new( + bucket_name: @name, + key: c.key, + data: c, + client: @client + ) + end rescue Aws::S3::Errors::PermanentRedirect => e - # Handle endpoint redirection error - Inspec::Log.debug "Permanent redirect for object #{c.key}: #{e.message}" + Inspec::Log.warn "Permanent redirect for object #{c.key}: #{e.message}" rescue => e - # Handle or log other errors - Inspec::Log.debug "Error processing object #{c.key}: #{e.message}" - Inspec::Log.debug "Backtrace: #{e.backtrace.join("\n")}" + Inspec::Log.warn "Error processing object #{c.key}: #{e.message}" + Inspec::Log.warn "Backtrace: #{e.backtrace.join("\n")}" end end - rescue Concurrent::RejectedExecutionError => e - # Handle the rejected execution error - Inspec::Log.debug "Task submission rejected for object #{c.key}: #{e.message}" - log_thread_pool_status(pool, "RejectedExecutionError") end + pool.shutdown + pool.wait_for_termination + y.yield(batch) end - pool.shutdown - pool.wait_for_termination - y.yield(batch) + rescue Aws::S3::Errors::PermanentRedirect => e + Inspec::Log.warn "Permanent redirect for bucket #{@name}: #{e.message}" + rescue => e + Inspec::Log.warn "Error accessing bucket #{@name}: #{e.message}" + Inspec::Log.warn "Backtrace: #{e.backtrace.join("\n")}" end end ObjectSummary::Collection.new(batches) - ensure - pool.shutdown if pool - end - - private - - def log_thread_pool_status(pool, context) - if Inspec::Log.level == :debug - Inspec::Log.debug "Thread pool status (#{context}):" - Inspec::Log.debug " Pool size: #{pool.length}" - Inspec::Log.debug " Queue length: #{pool.queue_length}" - Inspec::Log.debug " Completed tasks: #{pool.completed_task_count}" - end end end end def get_public_objects(myBucket) - myPublicKeys = Concurrent::Array.new + myPublicKeys = [] s3 = Aws::S3::Resource.new pool = Concurrent::FixedThreadPool.new(56) - log_thread_pool_status(pool, "Initialized") - debug_mode = Inspec::Log.level == :debug + mutex = Mutex.new begin bucket = s3.bucket(myBucket) object_count = bucket.objects.count - if debug_mode + if Inspec::Log.level == :debug Inspec::Log.debug "### Processing Bucket ### : #{myBucket} with #{object_count} objects" end @@ -97,33 +66,24 @@ def get_public_objects(myBucket) return myPublicKeys if object_count.zero? bucket.objects.each do |object| - Inspec::Log.debug " Examining Key: #{object.key}" if debug_mode - begin - pool.post do - begin - grants = object.acl.grants - if grants.map { |x| x.grantee.type }.any? { |x| x =~ /Group/ } && - grants - .map { |x| x.grantee.uri } - .any? { |x| x =~ /AllUsers|AuthenticatedUsers/ } - myPublicKeys << object.key - end - rescue Aws::S3::Errors::AccessDenied => e - # Handle access denied error - Inspec::Log.debug "Access denied for object #{object.key}: #{e.message}" - rescue Aws::S3::Errors::PermanentRedirect => e - # Handle endpoint redirection error - Inspec::Log.debug "Permanent redirect for object #{object.key}: #{e.message}" - rescue => e - # Handle or log other errors - Inspec::Log.debug "Error processing object #{object.key}: #{e.message}" - Inspec::Log.debug "Backtrace: #{e.backtrace.join("\n")}" + if Inspec::Log.level == :debug + Inspec::Log.debug " Examining Key: #{object.key}" + end + pool.post do + begin + grants = object.acl.grants + if grants.map { |x| x.grantee.type }.any? { |x| x =~ /Group/ } && + grants + .map { |x| x.grantee.uri } + .any? { |x| x =~ /AllUsers|AuthenticatedUsers/ } + mutex.synchronize { myPublicKeys << object.key } end + rescue Aws::S3::Errors::PermanentRedirect => e + Inspec::Log.warn "Permanent redirect for object #{object.key}: #{e.message}" + rescue => e + Inspec::Log.warn "Error processing object #{object.key}: #{e.message}" + Inspec::Log.warn "Backtrace: #{e.backtrace.join("\n")}" end - rescue Concurrent::RejectedExecutionError => e - # Handle the rejected execution error - Inspec::Log.debug "Task submission rejected for object #{object.key}: #{e.message}" - log_thread_pool_status(pool, "RejectedExecutionError") end end @@ -131,24 +91,13 @@ def get_public_objects(myBucket) pool.shutdown pool.wait_for_termination rescue Aws::S3::Errors::PermanentRedirect => e - # Handle endpoint redirection error for the bucket - Inspec::Log.debug "Permanent redirect for bucket #{myBucket}: #{e.message}" + Inspec::Log.warn "Permanent redirect for bucket #{myBucket}: #{e.message}" rescue => e - # Handle or log other errors - Inspec::Log.debug "Error accessing bucket #{myBucket}: #{e.message}" - Inspec::Log.debug "Backtrace: #{e.backtrace.join("\n")}" + Inspec::Log.warn "Error accessing bucket #{myBucket}: #{e.message}" + Inspec::Log.warn "Backtrace: #{e.backtrace.join("\n")}" ensure pool.shutdown if pool end myPublicKeys end - -def log_thread_pool_status(pool, context) - if Inspec::Log.level == :debug - Inspec::Log.debug "Thread pool status (#{context}):" - Inspec::Log.debug " Pool size: #{pool.length}" - Inspec::Log.debug " Queue length: #{pool.queue_length}" - Inspec::Log.debug " Completed tasks: #{pool.completed_task_count}" - end -end