diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 80709c12bb..6ba36c4318 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -64,7 +64,7 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than config_param :queued_chunks_limit_size, :integer, default: nil desc 'Compress buffered data.' - config_param :compress, :enum, list: [:text, :gzip], default: :text + config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text desc 'If true, chunks are thrown away when unrecoverable error happens' config_param :disable_chunk_backup, :bool, default: false diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index 19c0afa8ef..7e142bdead 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -29,11 +29,12 @@ class FileOutput < Output helpers :formatter, :inject, :compat_parameters - SUPPORTED_COMPRESS = [:text, :gz, :gzip] + SUPPORTED_COMPRESS = [:text, :gz, :gzip, :zstd] SUPPORTED_COMPRESS_MAP = { text: nil, gz: :gzip, gzip: :gzip, + zstd: :zstd, } DEFAULT_TIMEKEY = 60 * 60 * 24 @@ -212,17 +213,27 @@ def write(chunk) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm writer = case - when @compress_method.nil? - method(:write_without_compression) - when @compress_method == :gzip - if @buffer.compress != :gzip || @recompress - method(:write_gzip_with_compression) - else - method(:write_gzip_from_gzipped_chunk) - end - else - raise "BUG: unknown compression method #{@compress_method}" - end + when @compress_method.nil? + method(:write_without_compression) + when @compress_method == :gzip + if @buffer.compress == :text || @recompress + method(:write_with_compression).curry.call(@compress_method) + elsif @buffer.compress == :gzip + method(:write_from_compressed_chunk_same_format).curry.call(@compress_method) + else + method(:write_from_compressed_chunk_diff_format).curry.call(@compress_method) + end + when @compress_method == :zstd + if @buffer.compress == :text || @recompress + method(:write_with_compression).curry.call(@compress_method) + elsif @buffer.compress == :zstd + method(:write_from_compressed_chunk_same_format).curry.call(@compress_method) + else + method(:write_from_compressed_chunk_diff_format).curry.call(@compress_method) + end + else + raise "BUG: unknown compression method #{@compress_method}" + end if @append if @need_lock @@ -253,17 +264,35 @@ def write_without_compression(path, chunk) end end - def write_gzip_with_compression(path, chunk) + def write_with_compression(type, path, chunk) File.open(path, "ab", @file_perm) do |f| - gz = Zlib::GzipWriter.new(f) + gz = nil + if type == :gzip + gz = Zlib::GzipWriter.new(f) + elsif type == :zstd + gz = Zstd::StreamWriter.new(f) + end chunk.write_to(gz, compressed: :text) gz.close end end - def write_gzip_from_gzipped_chunk(path, chunk) + def write_from_compressed_chunk_same_format(type, path, chunk) + File.open(path, "ab", @file_perm) do |f| + chunk.write_to(f, compressed: type) + end + end + + def write_from_compressed_chunk_diff_format(type, path, chunk) File.open(path, "ab", @file_perm) do |f| - chunk.write_to(f, compressed: :gzip) + gz = nil + if type == :gzip + gz = Zlib::GzipWriter.new(f) + elsif type == :zstd + gz = Zstd::StreamWriter.new(f) + end + chunk.write_to(gz, compressed: :text) + gz.close end end @@ -280,6 +309,7 @@ def timekey_to_timeformat(timekey) def compression_suffix(compress) case compress when :gzip then '.gz' + when :zstd then '.zstd' when nil then '' else raise ArgumentError, "unknown compression type #{compress}" diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 0aed67db95..4e03d5ce54 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1014,13 +1014,17 @@ def write_guard(&block) end FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } - FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer, type: :zstd) } FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } - FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer, type: :zstd) } def generate_format_proc if @buffer && @buffer.compress == :gzip - @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM + @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP : FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP + elsif @buffer && @buffer.compress == :zstd + @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD : FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD else @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM end diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index c9cc649f0a..5923ee0937 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -1010,13 +1010,25 @@ def waiting(seconds) test 'when output has and compress is gzip' do i = create_output(:buffered) i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'gzip'})])) - assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, i.generate_format_proc + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP, i.generate_format_proc end test 'when output has and compress is gzip and time_as_integer is true' do i = create_output(:buffered) i.configure(config_element('ROOT', '', {'time_as_integer' => true}, [config_element('buffer', '', {'compress' => 'gzip'})])) - assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, i.generate_format_proc + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP, i.generate_format_proc + end + + test 'when output has and compress is zstd' do + i = create_output(:buffered) + i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'zstd'})])) + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD, i.generate_format_proc + end + + test 'when output has and compress is zstd and time_as_integer is true' do + i = create_output(:buffered) + i.configure(config_element('ROOT', '', {'time_as_integer' => true}, [config_element('buffer', '', {'compress' => 'zstd'})])) + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD, i.generate_format_proc end test 'when output has and compress is text' do