From 139ecadd58ba0b7441a6d1910e375dfe1b65cb89 Mon Sep 17 00:00:00 2001 From: Athish Pranav D Date: Thu, 3 Oct 2024 23:53:20 +0530 Subject: [PATCH] Added zstd support for msgs Signed-off-by: Athish Pranav D --- fluentd.gemspec | 1 + lib/fluent/event.rb | 7 +-- lib/fluent/plugin/compressable.rb | 90 +++++++++++++++++++++++-------- lib/fluent/plugin/in_forward.rb | 12 +++-- test/plugin/test_in_forward.rb | 2 +- 5 files changed, 82 insertions(+), 30 deletions(-) diff --git a/fluentd.gemspec b/fluentd.gemspec index ff225597d3..4d18e3562b 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -33,6 +33,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency("tzinfo-data", ["~> 1.0"]) gem.add_runtime_dependency("strptime", [">= 0.2.4", "< 1.0.0"]) gem.add_runtime_dependency("webrick", ["~> 1.4"]) + gem.add_runtime_dependency("zstd-ruby", ["~> 1.5"]) # gems that aren't default gems as of Ruby 3.4 gem.add_runtime_dependency("base64", ["~> 0.2"]) diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index bcb06998ed..b4cc02bb61 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -268,10 +268,11 @@ def to_msgpack_stream(time_int: false, packer: nil) end class CompressedMessagePackEventStream < MessagePackEventStream - def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil) - super + def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip) + super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records) @decompressed_data = nil @compressed_data = data + @type = compress end def empty? @@ -303,7 +304,7 @@ def to_compressed_msgpack_stream(time_int: false, packer: nil) def ensure_decompressed! return if @decompressed_data - @data = @decompressed_data = decompress(@data) + @data = @decompressed_data = decompress(@data, type: @type) end end diff --git a/lib/fluent/plugin/compressable.rb b/lib/fluent/plugin/compressable.rb index aa242478fe..00aa915ba5 100644 --- a/lib/fluent/plugin/compressable.rb +++ b/lib/fluent/plugin/compressable.rb @@ -16,29 +16,35 @@ require 'stringio' require 'zlib' +require 'zstd-ruby' module Fluent module Plugin module Compressable - def compress(data, **kwargs) + def compress(data, type: :gzip, **kwargs) output_io = kwargs[:output_io] io = output_io || StringIO.new - Zlib::GzipWriter.wrap(io) do |gz| - gz.write data + if type == :gzip + writer = Zlib::GzipWriter.new(io) + elsif type == :zstd + writer = Zstd::StreamWriter.new(io) + else + raise ArgumentError, "Unknown compression type: #{type}" end - + writer.write(data) + writer.finish output_io || io.string end # compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)` # https://www.ruby-forum.com/topic/971591#979503 - def decompress(compressed_data = nil, output_io: nil, input_io: nil) + def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip) case when input_io && output_io - io_decompress(input_io, output_io) + io_decompress(input_io, output_io, type) when input_io output_io = StringIO.new - io = io_decompress(input_io, output_io) + io = io_decompress(input_io, output_io, type) io.string when compressed_data.nil? || compressed_data.empty? # check compressed_data(String) is 0 length @@ -46,51 +52,91 @@ def decompress(compressed_data = nil, output_io: nil, input_io: nil) when output_io # execute after checking compressed_data is empty or not io = StringIO.new(compressed_data) - io_decompress(io, output_io) + io_decompress(io, output_io, type) else - string_decompress(compressed_data) + string_decompress(compressed_data, type) end end private - def string_decompress(compressed_data) + def string_decompress_gzip(compressed_data) io = StringIO.new(compressed_data) - out = '' loop do - gz = Zlib::GzipReader.new(io) - out << gz.read - unused = gz.unused - gz.finish - + reader = Zlib::GzipReader.new(io) + out << reader.read + unused = reader.unused + reader.finish unless unused.nil? adjust = unused.length io.pos -= adjust end break if io.eof? end + out + end + def string_decompress_zstd(compressed_data) + io = StringIO.new(compressed_data) + out = '' + loop do + reader = Zstd::StreamReader.new(io) + # Zstd::StreamReader needs to specify the size of the buffer + out << reader.read(1024) + # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position + break if io.eof? + end out end - def io_decompress(input, output) + def string_decompress(compressed_data, type = :gzip) + if type == :gzip + string_decompress_gzip(compressed_data) + elsif type == :zstd + string_decompress_zstd(compressed_data) + else + raise ArgumentError, "Unknown compression type: #{type}" + end + end + + def io_decompress_gzip(input, output) loop do - gz = Zlib::GzipReader.new(input) - v = gz.read + reader = Zlib::GzipReader.new(input) + v = reader.read output.write(v) - unused = gz.unused - gz.finish - + unused = reader.unused + reader.finish unless unused.nil? adjust = unused.length input.pos -= adjust end break if input.eof? end + output + end + def io_decompress_zstd(input, output) + loop do + reader = Zstd::StreamReader.new(input) + # Zstd::StreamReader needs to specify the size of the buffer + v = reader.read(1024) + output.write(v) + # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position + break if input.eof? + end output end + + def io_decompress(input, output, type = :gzip) + if type == :gzip + io_decompress_gzip(input, output) + elsif type == :zstd + io_decompress_zstd(input, output) + else + raise ArgumentError, "Unknown compression type: #{type}" + end + end end end end diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index eb8b3c629e..0c2216883c 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -307,10 +307,14 @@ def on_message(msg, chunk_size, conn) case entries when String # PackedForward - option = msg[2] - size = (option && option['size']) || 0 - es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream - es = es_class.new(entries, nil, size.to_i) + option = msg[2] || {} + size = option['size'] || 0 + + if option['compressed'] && option['compressed'] != 'text' + es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym) + else + es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i) + end es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event if @enable_field_injection es = add_source_info(es, conn) diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 9a3b46fd9a..e32dbe8c1b 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -553,7 +553,7 @@ def create_driver(conf=base_config) chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack # check CompressedMessagePackEventStream is created - mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0) + mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :gzip) d.run do Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|