Skip to content

Commit

Permalink
Out_file zstd addition
Browse files Browse the repository at this point in the history
Signed-off-by: Athish Pranav D <[email protected]>
  • Loading branch information
Athishpranav2003 committed Nov 21, 2024
1 parent a631c21 commit f5f8361
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 22 deletions.
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
config_param :queued_chunks_limit_size, :integer, default: nil

desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text
config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text

desc 'If true, chunks are thrown away when unrecoverable error happens'
config_param :disable_chunk_backup, :bool, default: false
Expand Down
62 changes: 46 additions & 16 deletions lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ class FileOutput < Output

helpers :formatter, :inject, :compat_parameters

SUPPORTED_COMPRESS = [:text, :gz, :gzip]
SUPPORTED_COMPRESS = [:text, :gz, :gzip, :zstd]
SUPPORTED_COMPRESS_MAP = {
text: nil,
gz: :gzip,
gzip: :gzip,
zstd: :zstd,
}

DEFAULT_TIMEKEY = 60 * 60 * 24
Expand Down Expand Up @@ -212,17 +213,27 @@ def write(chunk)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

writer = case
when @compress_method.nil?
method(:write_without_compression)
when @compress_method == :gzip
if @buffer.compress != :gzip || @recompress
method(:write_gzip_with_compression)
else
method(:write_gzip_from_gzipped_chunk)
end
else
raise "BUG: unknown compression method #{@compress_method}"
end
when @compress_method.nil?
method(:write_without_compression)
when @compress_method == :gzip
if @buffer.compress == :text || @recompress
method(:write_with_compression).curry.call(@compress_method)
elsif @buffer.compress == :gzip
method(:write_from_compressed_chunk_same_format).curry.call(@compress_method)
else
method(:write_from_compressed_chunk_diff_format).curry.call(@compress_method)
end
when @compress_method == :zstd
if @buffer.compress == :text || @recompress
method(:write_with_compression).curry.call(@compress_method)
elsif @buffer.compress == :zstd
method(:write_from_compressed_chunk_same_format).curry.call(@compress_method)
else
method(:write_from_compressed_chunk_diff_format).curry.call(@compress_method)
end
else
raise "BUG: unknown compression method #{@compress_method}"
end

if @append
if @need_lock
Expand Down Expand Up @@ -253,17 +264,35 @@ def write_without_compression(path, chunk)
end
end

def write_gzip_with_compression(path, chunk)
def write_with_compression(type, path, chunk)
File.open(path, "ab", @file_perm) do |f|
gz = Zlib::GzipWriter.new(f)
gz = nil
if type == :gzip
gz = Zlib::GzipWriter.new(f)
elsif type == :zstd
gz = Zstd::StreamWriter.new(f)
end
chunk.write_to(gz, compressed: :text)
gz.close
end
end

def write_gzip_from_gzipped_chunk(path, chunk)
def write_from_compressed_chunk_same_format(type, path, chunk)
File.open(path, "ab", @file_perm) do |f|
chunk.write_to(f, compressed: type)
end
end

def write_from_compressed_chunk_diff_format(type, path, chunk)
File.open(path, "ab", @file_perm) do |f|
chunk.write_to(f, compressed: :gzip)
gz = nil
if type == :gzip
gz = Zlib::GzipWriter.new(f)
elsif type == :zstd
gz = Zstd::StreamWriter.new(f)
end
chunk.write_to(gz, compressed: :text)
gz.close
end
end

Expand All @@ -280,6 +309,7 @@ def timekey_to_timeformat(timekey)
def compression_suffix(compress)
case compress
when :gzip then '.gz'
when :zstd then '.zstd'
when nil then ''
else
raise ArgumentError, "unknown compression type #{compress}"
Expand Down
10 changes: 7 additions & 3 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1014,13 +1014,17 @@ def write_guard(&block)
end

FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer, type: :zstd) }
FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer, type: :zstd) }

def generate_format_proc
if @buffer && @buffer.compress == :gzip
@time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM
@time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP : FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP
elsif @buffer && @buffer.compress == :zstd
@time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD : FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD
else
@time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM
end
Expand Down
16 changes: 14 additions & 2 deletions test/plugin/test_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1010,13 +1010,25 @@ def waiting(seconds)
test 'when output has <buffer> and compress is gzip' do
i = create_output(:buffered)
i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'gzip'})]))
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, i.generate_format_proc
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP, i.generate_format_proc
end

test 'when output has <buffer> and compress is gzip and time_as_integer is true' do
i = create_output(:buffered)
i.configure(config_element('ROOT', '', {'time_as_integer' => true}, [config_element('buffer', '', {'compress' => 'gzip'})]))
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, i.generate_format_proc
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP, i.generate_format_proc
end

test 'when output has <buffer> and compress is zstd' do
i = create_output(:buffered)
i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'zstd'})]))
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD, i.generate_format_proc
end

test 'when output has <buffer> and compress is zstd and time_as_integer is true' do
i = create_output(:buffered)
i.configure(config_element('ROOT', '', {'time_as_integer' => true}, [config_element('buffer', '', {'compress' => 'zstd'})]))
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD, i.generate_format_proc
end

test 'when output has <buffer> and compress is text' do
Expand Down

0 comments on commit f5f8361

Please sign in to comment.