diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 5b97ae7..6110f4b 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -23,6 +23,6 @@ jobs: env: CI: true run: | - gem install bundler rake + gem install rake bundle install --jobs 4 --retry 3 bundle exec rake test diff --git a/fluent-plugin-s3.gemspec b/fluent-plugin-s3.gemspec index 8088a41..10ec13b 100644 --- a/fluent-plugin-s3.gemspec +++ b/fluent-plugin-s3.gemspec @@ -19,6 +19,7 @@ Gem::Specification.new do |gem| gem.add_dependency "fluentd", [">= 0.14.22", "< 2"] gem.add_dependency "aws-sdk-s3", "~> 1.60" gem.add_dependency "aws-sdk-sqs", "~> 1.23" + gem.add_dependency 'zstd-ruby' gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", ">= 1.0.3" diff --git a/lib/fluent/plugin/s3_compressor_zstd.rb b/lib/fluent/plugin/s3_compressor_zstd.rb new file mode 100644 index 0000000..cc8540d --- /dev/null +++ b/lib/fluent/plugin/s3_compressor_zstd.rb @@ -0,0 +1,30 @@ +require 'zstd-ruby' + +module Fluent::Plugin + class S3Output + class ZstdCompressor < Compressor + S3Output.register_compressor('zstd', self) + + config_section :compress, param_name: :compress_config, init: true, multi: false do + desc "Compression level for zstd (1-22)" + config_param :level, :integer, default: 3 + end + + def ext + 'zst'.freeze + end + + def content_type + 'application/x-zst'.freeze + end + + def compress(chunk, tmp) + compressed = Zstd.compress(chunk.read, level: @compress_config.level) + tmp.write(compressed) + rescue => e + log.warn "zstd compression failed: #{e.message}" + raise + end + end + end +end \ No newline at end of file diff --git a/test/test_out_s3.rb b/test/test_out_s3.rb index cbf7860..2d7941d 100644 --- a/test/test_out_s3.rb +++ b/test/test_out_s3.rb @@ -109,6 +109,18 @@ def test_configure_with_mime_type_lzo assert(e.is_a?(Fluent::ConfigError)) end + data('level default' => nil, + 'level 1' => 1) + def test_configure_with_mime_type_zstd(level) + conf = CONFIG.clone + conf << "\nstore_as zstd\n" + conf << "\n\nlevel #{level}\n\n" if level + d = create_driver(conf) + assert_equal 'zst', d.instance.instance_variable_get(:@compressor).ext + assert_equal 'application/x-zst', d.instance.instance_variable_get(:@compressor).content_type + assert_equal (level || 3), d.instance.instance_variable_get(:@compressor).instance_variable_get(:@compress_config).level + end + def test_configure_with_path_style conf = CONFIG.clone conf << "\nforce_path_style true\n" @@ -456,6 +468,33 @@ def test_write_with_custom_s3_object_key_format_containing_hex_random_placeholde FileUtils.rm_f(s3_local_file_path) end + def test_write_with_zstd + setup_mocks(true) + s3_local_file_path = "/tmp/s3-test.zst" + + expected_s3path = "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.zst" + + setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: expected_s3path) + + config = CONFIG_TIME_SLICE + "\nstore_as zstd\n" + d = create_time_sliced_driver(config) + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, { "a" => 1 }) + d.feed(time, { "a" => 2 }) + end + + File.open(s3_local_file_path, 'rb') do |file| + compressed_data = file.read + uncompressed_data = Zstd.decompress(compressed_data) + expected_data = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + assert_equal expected_data, uncompressed_data + end + FileUtils.rm_f(s3_local_file_path) + end + class MockResponse attr_reader :data