Skip to content

Commit

Permalink
Added zstd support for msgs
Browse files Browse the repository at this point in the history
Signed-off-by: Athish Pranav D <[email protected]>
  • Loading branch information
Athishpranav2003 committed Nov 18, 2024
1 parent 7f13c7a commit 139ecad
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 30 deletions.
1 change: 1 addition & 0 deletions fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("tzinfo-data", ["~> 1.0"])
gem.add_runtime_dependency("strptime", [">= 0.2.4", "< 1.0.0"])
gem.add_runtime_dependency("webrick", ["~> 1.4"])
gem.add_runtime_dependency("zstd-ruby", ["~> 1.5"])

# gems that aren't default gems as of Ruby 3.4
gem.add_runtime_dependency("base64", ["~> 0.2"])
Expand Down
7 changes: 4 additions & 3 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,11 @@ def to_msgpack_stream(time_int: false, packer: nil)
end

class CompressedMessagePackEventStream < MessagePackEventStream
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
super
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip)
super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records)
@decompressed_data = nil
@compressed_data = data
@type = compress
end

def empty?
Expand Down Expand Up @@ -303,7 +304,7 @@ def to_compressed_msgpack_stream(time_int: false, packer: nil)

def ensure_decompressed!
return if @decompressed_data
@data = @decompressed_data = decompress(@data)
@data = @decompressed_data = decompress(@data, type: @type)
end
end

Expand Down
90 changes: 68 additions & 22 deletions lib/fluent/plugin/compressable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,81 +16,127 @@

require 'stringio'
require 'zlib'
require 'zstd-ruby'

module Fluent
module Plugin
module Compressable
def compress(data, **kwargs)
def compress(data, type: :gzip, **kwargs)
output_io = kwargs[:output_io]
io = output_io || StringIO.new
Zlib::GzipWriter.wrap(io) do |gz|
gz.write data
if type == :gzip
writer = Zlib::GzipWriter.new(io)
elsif type == :zstd
writer = Zstd::StreamWriter.new(io)
else
raise ArgumentError, "Unknown compression type: #{type}"
end

writer.write(data)
writer.finish
output_io || io.string
end

# compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)`
# https://www.ruby-forum.com/topic/971591#979503
def decompress(compressed_data = nil, output_io: nil, input_io: nil)
def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip)
case
when input_io && output_io
io_decompress(input_io, output_io)
io_decompress(input_io, output_io, type)
when input_io
output_io = StringIO.new
io = io_decompress(input_io, output_io)
io = io_decompress(input_io, output_io, type)
io.string
when compressed_data.nil? || compressed_data.empty?
# check compressed_data(String) is 0 length
compressed_data
when output_io
# execute after checking compressed_data is empty or not
io = StringIO.new(compressed_data)
io_decompress(io, output_io)
io_decompress(io, output_io, type)
else
string_decompress(compressed_data)
string_decompress(compressed_data, type)
end
end

private

def string_decompress(compressed_data)
def string_decompress_gzip(compressed_data)
io = StringIO.new(compressed_data)

out = ''
loop do
gz = Zlib::GzipReader.new(io)
out << gz.read
unused = gz.unused
gz.finish

reader = Zlib::GzipReader.new(io)
out << reader.read
unused = reader.unused
reader.finish
unless unused.nil?
adjust = unused.length
io.pos -= adjust
end
break if io.eof?
end
out
end

def string_decompress_zstd(compressed_data)
io = StringIO.new(compressed_data)
out = ''
loop do
reader = Zstd::StreamReader.new(io)
# Zstd::StreamReader needs to specify the size of the buffer
out << reader.read(1024)
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
break if io.eof?
end
out
end

def io_decompress(input, output)
def string_decompress(compressed_data, type = :gzip)
if type == :gzip
string_decompress_gzip(compressed_data)
elsif type == :zstd
string_decompress_zstd(compressed_data)
else
raise ArgumentError, "Unknown compression type: #{type}"
end
end

def io_decompress_gzip(input, output)
loop do
gz = Zlib::GzipReader.new(input)
v = gz.read
reader = Zlib::GzipReader.new(input)
v = reader.read
output.write(v)
unused = gz.unused
gz.finish

unused = reader.unused
reader.finish
unless unused.nil?
adjust = unused.length
input.pos -= adjust
end
break if input.eof?
end
output
end

def io_decompress_zstd(input, output)
loop do
reader = Zstd::StreamReader.new(input)
# Zstd::StreamReader needs to specify the size of the buffer
v = reader.read(1024)
output.write(v)
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
break if input.eof?
end
output
end

def io_decompress(input, output, type = :gzip)
if type == :gzip
io_decompress_gzip(input, output)
elsif type == :zstd
io_decompress_zstd(input, output)
else
raise ArgumentError, "Unknown compression type: #{type}"
end
end
end
end
end
12 changes: 8 additions & 4 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,14 @@ def on_message(msg, chunk_size, conn)
case entries
when String
# PackedForward
option = msg[2]
size = (option && option['size']) || 0
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
option = msg[2] || {}
size = option['size'] || 0

if option['compressed'] && option['compressed'] != 'text'
es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym)
else
es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
end
es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event
if @enable_field_injection
es = add_source_info(es, conn)
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def create_driver(conf=base_config)
chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack

# check CompressedMessagePackEventStream is created
mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0)
mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :gzip)

d.run do
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|
Expand Down

0 comments on commit 139ecad

Please sign in to comment.