-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added error handling, still need to work out cross-region reporting
Signed-off-by: Aaron Lippold <[email protected]>
- Loading branch information
1 parent
5826cac
commit a7ae1b7
Showing
3 changed files
with
238 additions
and
119 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
require "concurrent" | ||
require "aws-sdk-s3" | ||
|
||
module Aws::S3 | ||
class Bucket | ||
def objects(options = {}) | ||
options = options.merge(bucket: @name) | ||
resp = @client.list_objects_v2(options) | ||
|
||
# Check if the response contains any objects | ||
return ObjectSummary::Collection.new([]) if resp.contents.empty? | ||
|
||
pool = Concurrent::FixedThreadPool.new(32) # Increased pool size | ||
log_thread_pool_status(pool, "Initialized") | ||
|
||
batches = | ||
Enumerator.new do |y| | ||
resp.each_page do |page| | ||
batch = Concurrent::Array.new | ||
page.data.contents.each { |c| submit_task_to_pool(pool, c, batch) } | ||
y.yield(batch) | ||
end | ||
end | ||
|
||
# Ensure all tasks are completed before shutting down the pool | ||
pool.shutdown | ||
pool.wait_for_termination | ||
|
||
ObjectSummary::Collection.new(batches) | ||
ensure | ||
pool.shutdown if pool | ||
end | ||
|
||
private | ||
|
||
def submit_task_to_pool(pool, c, batch, retries = 3) | ||
pool.post { process_object(c, batch) } | ||
rescue Concurrent::RejectedExecutionError => e | ||
if retries > 0 | ||
Inspec::Log.debug "Retrying task submission for object #{c.key}, retries left: #{retries}" | ||
sleep(0.1) # Small delay before retrying | ||
submit_task_to_pool(pool, c, batch, retries - 1) | ||
else | ||
handle_rejected_execution_error(e, c.key, pool) | ||
end | ||
end | ||
|
||
def process_object(c, batch) | ||
batch << ObjectSummary.new( | ||
bucket_name: @name, | ||
key: c.key, | ||
data: c, | ||
client: @client | ||
) | ||
rescue Aws::S3::Errors::PermanentRedirect => e | ||
Inspec::Log.debug "Permanent redirect for object #{c.key}: #{e.message}" | ||
rescue => e | ||
Inspec::Log.debug "Error processing object #{c.key}: #{e.message}" | ||
Inspec::Log.debug "Backtrace: #{e.backtrace.join("\n")}" | ||
end | ||
|
||
def handle_rejected_execution_error(e, key, pool) | ||
Inspec::Log.debug "Task submission rejected for object #{key}: #{e.message}" | ||
log_thread_pool_status(pool, "RejectedExecutionError") | ||
end | ||
|
||
def log_thread_pool_status(pool, context) | ||
if Inspec::Log.level == :debug | ||
Inspec::Log.debug "Thread pool status (#{context}):" | ||
Inspec::Log.debug " Pool size: #{pool.length}" | ||
Inspec::Log.debug " Queue length: #{pool.queue_length}" | ||
Inspec::Log.debug " Completed tasks: #{pool.completed_task_count}" | ||
end | ||
end | ||
end | ||
end | ||
|
||
def get_public_objects(myBucket) | ||
myPublicKeys = Concurrent::Array.new | ||
s3 = Aws::S3::Resource.new | ||
pool = Concurrent::FixedThreadPool.new(64) # Increased pool size | ||
log_thread_pool_status(pool, "Initialized") | ||
debug_mode = Inspec::Log.level == :debug | ||
|
||
begin | ||
bucket = s3.bucket(myBucket) | ||
object_count = bucket.objects.count | ||
|
||
if debug_mode | ||
Inspec::Log.debug "### Processing Bucket ### : #{myBucket} with #{object_count} objects" | ||
end | ||
|
||
# Check if the bucket has no objects | ||
return myPublicKeys if object_count.zero? | ||
|
||
bucket.objects.each do |object| | ||
Inspec::Log.debug " Examining Key: #{object.key}" if debug_mode | ||
submit_task_to_pool(pool, object, myPublicKeys) | ||
end | ||
|
||
# Ensure all tasks are completed before shutting down the pool | ||
pool.shutdown | ||
pool.wait_for_termination | ||
rescue Aws::S3::Errors::PermanentRedirect => e | ||
Inspec::Log.debug "Permanent redirect for bucket #{myBucket}: #{e.message}" | ||
rescue => e | ||
Inspec::Log.debug "Error accessing bucket #{myBucket}: #{e.message}" | ||
Inspec::Log.debug "Backtrace: #{e.backtrace.join("\n")}" | ||
ensure | ||
pool.shutdown if pool | ||
end | ||
|
||
myPublicKeys | ||
end | ||
|
||
def submit_task_to_pool(pool, object, myPublicKeys, retries = 3) | ||
pool.post { process_public_object(object, myPublicKeys) } | ||
rescue Concurrent::RejectedExecutionError => e | ||
if retries > 0 | ||
Inspec::Log.debug "Retrying task submission for object #{object.key}, retries left: #{retries}" | ||
sleep(0.1) # Small delay before retrying | ||
submit_task_to_pool(pool, object, myPublicKeys, retries - 1) | ||
else | ||
handle_rejected_execution_error(e, object.key, pool) | ||
end | ||
end | ||
|
||
def process_public_object(object, myPublicKeys) | ||
grants = object.acl.grants | ||
if grants.map { |x| x.grantee.type }.any? { |x| x =~ /Group/ } && | ||
grants | ||
.map { |x| x.grantee.uri } | ||
.any? { |x| x =~ /AllUsers|AuthenticatedUsers/ } | ||
myPublicKeys << object.key | ||
end | ||
rescue Aws::S3::Errors::AccessDenied => e | ||
Inspec::Log.debug "Access denied for object #{object.key}: #{e.message}" | ||
rescue Aws::S3::Errors::PermanentRedirect => e | ||
Inspec::Log.debug "Permanent redirect for object #{object.key}: #{e.message}" | ||
rescue => e | ||
Inspec::Log.debug "Error processing object #{object.key}: #{e.message}" | ||
Inspec::Log.debug "Backtrace: #{e.backtrace.join("\n")}" | ||
end | ||
|
||
def handle_rejected_execution_error(e, key, pool) | ||
Inspec::Log.debug "Task submission rejected for object #{key}: #{e.message}" | ||
log_thread_pool_status(pool, "RejectedExecutionError") | ||
end | ||
|
||
def log_thread_pool_status(pool, context) | ||
if Inspec::Log.level == :debug | ||
Inspec::Log.debug "Thread pool status (#{context}):" | ||
Inspec::Log.debug " Pool size: #{pool.length}" | ||
Inspec::Log.debug " Queue length: #{pool.queue_length}" | ||
Inspec::Log.debug " Completed tasks: #{pool.completed_task_count}" | ||
end | ||
end |
Oops, something went wrong.