diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index b4cc02bb61..9c2ffbbf8e 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -62,9 +62,9 @@ def to_msgpack_stream(time_int: false, packer: nil) out.full_pack end - def to_compressed_msgpack_stream(time_int: false, packer: nil) + def to_compressed_msgpack_stream(time_int: false, packer: nil, type: :gzip) packed = to_msgpack_stream(time_int: time_int, packer: packer) - compress(packed) + compress(packed, type: type) end def to_msgpack_stream_forced_integer(packer: nil) diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index c88b724bb8..1e93f05263 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -60,7 +60,7 @@ def initialize(metadata, compress: :text) @created_at = Fluent::Clock.real_now @modified_at = Fluent::Clock.real_now - extend Decompressable if compress == :gzip + extend Decompressable unless compress == :text end attr_reader :unique_id, :metadata, :state @@ -193,6 +193,14 @@ def append(data, **kwargs) end end concat(io.string, data.size) + elsif kwargs[:compress] == :zstd + io = StringIO.new + stream = Zstd::StreamWriter.new(io) + data.each do |d| + stream.write(d) + end + stream.finish + concat(io.string, data.size) else super end