Skip to content

Commit

Permalink
Buffer chunk compression
Browse files Browse the repository at this point in the history
Signed-off-by: Athish Pranav D <[email protected]>
  • Loading branch information
Athishpranav2003 committed Nov 18, 2024
1 parent 7d837b6 commit bb88d21
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
4 changes: 2 additions & 2 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ def to_msgpack_stream(time_int: false, packer: nil)
out.full_pack
end

def to_compressed_msgpack_stream(time_int: false, packer: nil)
def to_compressed_msgpack_stream(time_int: false, packer: nil, type: :gzip)
packed = to_msgpack_stream(time_int: time_int, packer: packer)
compress(packed)
compress(packed, type: type)
end

def to_msgpack_stream_forced_integer(packer: nil)
Expand Down
10 changes: 9 additions & 1 deletion lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def initialize(metadata, compress: :text)
@created_at = Fluent::Clock.real_now
@modified_at = Fluent::Clock.real_now

extend Decompressable if compress == :gzip
extend Decompressable unless compress == :text
end

attr_reader :unique_id, :metadata, :state
Expand Down Expand Up @@ -193,6 +193,14 @@ def append(data, **kwargs)
end
end
concat(io.string, data.size)
elsif kwargs[:compress] == :zstd
io = StringIO.new
stream = Zstd::StreamWriter.new(io)
data.each do |d|
stream.write(d)
end
stream.finish
concat(io.string, data.size)
else
super
end
Expand Down

0 comments on commit bb88d21

Please sign in to comment.