-
Notifications
You must be signed in to change notification settings - Fork 35
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #16 from christianherweg0807/s3_role_by_bucket
S3 role by bucket
- Loading branch information
Showing
15 changed files
with
777 additions
and
464 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# CodecFactory: | ||
# lazy-fetch codec plugins | ||
# (FIXME: is this thread-safe?) | ||
require "logstash/inputs/threadable" | ||
|
||
#module LogStash module Inputs class S3SNSSQS < LogStash::Inputs::Threadable | ||
class CodecFactory | ||
def initialize(logger, options) | ||
@logger = logger | ||
@default_codec = options[:default_codec] | ||
@codec_by_folder = options[:codec_by_folder] | ||
@codecs = { | ||
'default' => @default_codec | ||
} | ||
end | ||
|
||
def get_codec(record) | ||
codec = find_codec(record) | ||
if @codecs[codec].nil? | ||
@codecs[codec] = get_codec_plugin(codec) | ||
end | ||
@logger.debug("Switching to codec #{codec}") if codec != 'default' | ||
return @codecs[codec] | ||
end | ||
|
||
private | ||
|
||
def find_codec(record) | ||
bucket, key, folder = record[:bucket], record[:key], record[:folder] | ||
unless @codec_by_folder[bucket].nil? | ||
@logger.debug("trying to find codec for folder #{folder}", :codec => @codec_by_folder[bucket][folder]) | ||
return @codec_by_folder[bucket][folder] unless @codec_by_folder[bucket][folder].nil? | ||
end | ||
return 'default' | ||
end | ||
|
||
def get_codec_plugin(name, options = {}) | ||
LogStash::Plugin.lookup('codec', name).new(options) | ||
end | ||
end | ||
#end;end;end |
Binary file not shown.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
# not needed - Mutex is part of core lib: | ||
#require 'thread' | ||
require "logstash/inputs/threadable" | ||
|
||
|
||
#module LogStash module Inputs class S3SNSSQS < LogStash::Inputs::Threadable | ||
class S3ClientFactory | ||
|
||
def initialize(logger, options, aws_options_hash) | ||
@logger = logger | ||
@aws_options_hash = aws_options_hash | ||
# FIXME: region per bucket? | ||
@sts_client = Aws::STS::Client.new(region: options[:aws_region]) | ||
# FIXME: options are non-generic (...by_bucket mixes credentials with folder stuff) | ||
@credentials_by_bucket = options[:s3_credentials_by_bucket] | ||
@logger.debug("Credentials by Bucket", :credentials => @credentials_by_bucket) | ||
@default_session_name = options[:s3_role_session_name] | ||
@clients_by_bucket = {} | ||
#@mutexes_by_bucket = {} | ||
@creation_mutex = Mutex.new | ||
end | ||
|
||
def get_s3_client(bucket_name) | ||
bucket_symbol = bucket_name.to_sym | ||
@creation_mutex.synchronize do | ||
|
||
if @clients_by_bucket[bucket_symbol].nil? | ||
options = @aws_options_hash | ||
unless @credentials_by_bucket[bucket_name].nil? | ||
options.merge!(credentials: get_s3_auth(@credentials_by_bucket[bucket_name])) | ||
end | ||
@clients_by_bucket[bucket_symbol] = Aws::S3::Client.new(options) | ||
@logger.debug("Created a new S3 Client", :bucket_name => bucket_name, :client => @clients_by_bucket[bucket_symbol], :used_options => options) | ||
#@mutexes_by_bucket[bucket_symbol] = Mutex.new | ||
end | ||
end | ||
# to be thread-safe, one uses this method like this: | ||
# s3_client_factory.get_s3_client(my_s3_bucket) do | ||
# ... do stuff ... | ||
# end | ||
# FIXME: this does not allow concurrent downloads from the same bucket! | ||
# So we are testing this without this mutex. | ||
#@mutexes_by_bucket[bucket_symbol].synchronize do | ||
yield @clients_by_bucket[bucket_symbol] | ||
#end | ||
end | ||
|
||
private | ||
|
||
def get_s3_auth(credentials) | ||
# reminder: these are auto-refreshing! | ||
if credentials.key?('role') | ||
@logger.debug("Assume Role", :role => credentials["role"]) | ||
return Aws::AssumeRoleCredentials.new( | ||
client: @sts_client, | ||
role_arn: credentials['role'], | ||
role_session_name: @default_session_name | ||
) | ||
elsif credentials.key?('access_key_id') && credentials.key?('secret_access_key') | ||
@logger.debug("Fetch credentials", :access_key => credentials['access_key_id']) | ||
return Aws::Credentials.new(credentials) | ||
end | ||
end | ||
|
||
end # class | ||
#end;end;end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
# encoding: utf-8 | ||
require 'fileutils' | ||
require 'thread' | ||
require "logstash/inputs/threadable" | ||
#require "logstash/inputs/s3/remote_file" | ||
|
||
#module LogStash module Inputs class S3SNSSQS < LogStash::Inputs::Threadable | ||
class S3Downloader | ||
|
||
def initialize(logger, stop_semaphore, options) | ||
@logger = logger | ||
@stopped = stop_semaphore | ||
@factory = options[:s3_client_factory] | ||
@delete_on_success = options[:delete_on_success] | ||
end | ||
|
||
def copy_s3object_to_disk(record) | ||
# (from docs) WARNING: | ||
# yielding data to a block disables retries of networking errors! | ||
begin | ||
@factory.get_s3_client(record[:bucket]) do |s3| | ||
response = s3.get_object( | ||
bucket: record[:bucket], | ||
key: record[:key], | ||
response_target: record[:local_file] | ||
) | ||
end | ||
rescue Aws::S3::Errors::ServiceError => e | ||
@logger.error("Unable to download file. Requeuing the message", :error => e, :record => record) | ||
# prevent sqs message deletion | ||
throw :skip_delete | ||
end | ||
throw :skip_delete if stop? | ||
return true | ||
end | ||
|
||
def cleanup_local_object(record) | ||
FileUtils.remove_entry_secure(record[:local_file], true) if ::File.exists?(record[:local_file]) | ||
rescue Exception => e | ||
@logger.warn("Could not delete file", :file => record[:local_file], :error => e) | ||
end | ||
|
||
def cleanup_s3object(record) | ||
return unless @delete_on_success | ||
begin | ||
@factory.get_s3_client(record[:bucket]) do |s3| | ||
s3.delete_object(bucket: record[:bucket], key: record[:key]) | ||
end | ||
rescue Exception => e | ||
@logger.warn("Failed to delete s3 object", :record => record, :error => e) | ||
end | ||
end | ||
|
||
def stop? | ||
@stopped.value | ||
end | ||
|
||
end # class | ||
#end;end;end |
Oops, something went wrong.