diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index e32dbe8c1b..3d223f4e56 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -508,7 +508,7 @@ def create_driver(conf=base_config) end sub_test_case 'compressed packed forward' do - test 'set_compress_to_option' do + test 'set_compress_to_option_gzip' do @d = d = create_driver time_i = event_time("2011-01-02 13:14:15 UTC").to_i @@ -535,6 +535,33 @@ def create_driver(conf=base_config) assert_equal events, d.events end + test 'set_compress_to_option_zstd' do + @d = d = create_driver + + time_i = event_time("2011-01-02 13:14:15 UTC").to_i + events = [ + ["tag1", time_i, {"a"=>1}], + ["tag1", time_i, {"a"=>2}] + ] + + # create compressed entries + entries = '' + events.each do |_tag, _time, record| + v = [_time, record].to_msgpack + entries << compress(v, type: :zstd) + end + chunk = ["tag1", entries, { 'compressed' => 'zstd' }].to_msgpack + + d.run do + Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj| + option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) + assert_equal 'zstd', option['compressed'] + end + end + + assert_equal events, d.events + end + test 'create_CompressedMessagePackEventStream_with_gzip_compress_option' do @d = d = create_driver @@ -562,6 +589,34 @@ def create_driver(conf=base_config) end end end + + test 'create_CompressedMessagePackEventStream_with_zstd_compress_option' do + @d = d = create_driver + + time_i = event_time("2011-01-02 13:14:15 UTC").to_i + events = [ + ["tag1", time_i, {"a"=>1}], + ["tag1", time_i, {"a"=>2}] + ] + + # create compressed entries + entries = '' + events.each do |_tag, _time, record| + v = [_time, record].to_msgpack + entries << compress(v) + end + chunk = ["tag1", entries, { 'compressed' => 'zstd' }].to_msgpack + + # check CompressedMessagePackEventStream is created + mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :zstd) + + d.run do + Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj| + option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) + assert_equal 'zstd', option['compressed'] + end + end + end end sub_test_case 'warning' do