diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index 0bc040a4d594..3c6146d0b9a5 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -306,7 +306,7 @@ set_headers(Headers, #amqp10_msg{header = Current} = Msg) -> H = maps:fold(fun(durable, V, Acc) -> Acc#'v1_0.header'{durable = V}; (priority, V, Acc) -> - Acc#'v1_0.header'{priority = {uint, V}}; + Acc#'v1_0.header'{priority = {ubyte, V}}; (first_acquirer, V, Acc) -> Acc#'v1_0.header'{first_acquirer = V}; (ttl, V, Acc) -> @@ -325,8 +325,8 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) -> P = maps:fold(fun(message_id, V, Acc) when is_binary(V) -> % message_id can be any type but we restrict it here Acc#'v1_0.properties'{message_id = utf8(V)}; - (user_id, V, Acc) -> - Acc#'v1_0.properties'{user_id = utf8(V)}; + (user_id, V, Acc) when is_binary(V) orelse is_list(V) -> + Acc#'v1_0.properties'{user_id = binary(V)}; (to, V, Acc) -> Acc#'v1_0.properties'{to = utf8(V)}; (subject, V, Acc) -> @@ -407,8 +407,12 @@ wrap_ap_value(true) -> {boolean, true}; wrap_ap_value(false) -> {boolean, false}; -wrap_ap_value(V) when is_integer(V) -> +wrap_ap_value(V) when is_integer(V) andalso V >= 0 -> {uint, V}; +wrap_ap_value(V) when is_integer(V) andalso V < 0 -> + {int, V}; +wrap_ap_value(F) when is_float(F) -> + {double, F}; wrap_ap_value(V) when is_binary(V) -> utf8(V); wrap_ap_value(V) when is_list(V) -> @@ -449,6 +453,8 @@ utf8(V) -> amqp10_client_types:utf8(V). sym(B) when is_list(B) -> {symbol, list_to_binary(B)}; sym(B) when is_binary(B) -> {symbol, B}. uint(B) -> {uint, B}. +binary(B) when is_binary(B) -> {binary, B}; +binary(B) when is_list(B) -> {binary, erlang:list_to_binary(B)}. has_value(undefined) -> false; has_value(_) -> true. diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index 16a603210866..4743f59408b3 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -178,7 +178,8 @@ dest_endpoint(#{shovel_type := dynamic, handle_source({amqp10_msg, _LinkRef, Msg}, State) -> Tag = amqp10_msg:delivery_id(Msg), Payload = amqp10_msg:body_bin(Msg), - rabbit_shovel_behaviour:forward(Tag, #{}, Payload, State); + Props = props_to_map(Msg), + rabbit_shovel_behaviour:forward(Tag, Props, Payload, State); handle_source({amqp10_event, {connection, Conn, opened}}, State = #{source := #{current := #{conn := Conn}}}) -> State; @@ -382,7 +383,9 @@ add_forward_headers(_, Msg) -> Msg. set_message_properties(Props, Msg) -> %% this is effectively special handling properties from amqp 0.9.1 maps:fold( - fun(content_type, Ct, M) -> + fun(_Key, undefined, M) -> + M; + (content_type, Ct, M) -> amqp10_msg:set_properties( #{content_type => to_binary(Ct)}, M); (content_encoding, Ct, M) -> @@ -390,6 +393,8 @@ set_message_properties(Props, Msg) -> #{content_encoding => to_binary(Ct)}, M); (delivery_mode, 2, M) -> amqp10_msg:set_headers(#{durable => true}, M); + (priority, P, M) when is_integer(P) -> + amqp10_msg:set_headers(#{priority => P}, M); (correlation_id, Ct, M) -> amqp10_msg:set_properties(#{correlation_id => to_binary(Ct)}, M); (reply_to, Ct, M) -> @@ -397,8 +402,8 @@ set_message_properties(Props, Msg) -> (message_id, Ct, M) -> amqp10_msg:set_properties(#{message_id => to_binary(Ct)}, M); (timestamp, Ct, M) -> - amqp10_msg:set_properties(#{creation_time => Ct}, M); - (user_id, Ct, M) -> + amqp10_msg:set_properties(#{creation_time => timestamp_091_to_10(Ct)}, M); + (user_id, Ct, M) when Ct =/= undefined -> amqp10_msg:set_properties(#{user_id => Ct}, M); (headers, Headers0, M) when is_list(Headers0) -> %% AMPQ 0.9.1 are added as applicatin properties @@ -440,3 +445,63 @@ is_amqp10_compat(T) -> %% TODO: not all lists are compatible is_list(T) orelse is_boolean(T). + +to_amqp091_compatible_value(Key, Value) when is_binary(Value) -> + {Key, longstr, Value}; +to_amqp091_compatible_value(Key, Value) when is_integer(Value) -> + {Key, long, Value}; +to_amqp091_compatible_value(Key, Value) when is_float(Value) -> + {Key, double, Value}; +to_amqp091_compatible_value(Key, true) -> + {Key, bool, true}; +to_amqp091_compatible_value(Key, false) -> + {Key, bool, false}; +to_amqp091_compatible_value(_Key, _Value) -> + undefined. + +delivery_mode(Headers) -> + case maps:get(durable, Headers, undefined) of + undefined -> undefined; + true -> 2; + false -> 1 + end. + +timestamp_10_to_091(T) when is_integer(T) -> + trunc(T / 1000); +timestamp_10_to_091(_) -> + undefined. + +timestamp_091_to_10(T) when is_integer(T) -> + T * 1000; +timestamp_091_to_10(_Value) -> + undefined. + +ttl(T) when is_integer(T) -> + erlang:integer_to_binary(T); +ttl(_T) -> undefined. + +props_to_map(Msg) -> + AppProps = amqp10_msg:application_properties(Msg), + AppProps091Headers = lists:filtermap(fun({K, V}) -> + case to_amqp091_compatible_value(K, V) of + undefined -> + false; + Value -> + {true, Value} + end + end, maps:to_list(AppProps)), + InProps = amqp10_msg:properties(Msg), + Headers = amqp10_msg:headers(Msg), + #{ + headers => AppProps091Headers, + content_type => maps:get(content_type, InProps, undefined), + content_encoding => maps:get(content_encoding, InProps, undefined), + delivery_mode => delivery_mode(Headers), + priority => maps:get(priority, Headers, undefined), + correlation_id => maps:get(correlation_id, InProps, undefined), + reply_to => maps:get(reply_to, InProps, undefined), + expiration => ttl(maps:get(ttl, Headers, undefined)), + message_id => maps:get(message_id, InProps, undefined), + timestamp => timestamp_10_to_091(maps:get(creation_time, InProps, undefined)), + user_id => maps:get(user_id, InProps, undefined) + }. diff --git a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl index d6c855745c02..8a31d3aedf30 100644 --- a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl @@ -29,7 +29,9 @@ groups() -> autodelete_amqp091_dest_on_confirm, autodelete_amqp091_dest_on_publish, simple_amqp10_dest, - simple_amqp10_src + simple_amqp10_src, + message_prop_conversion, + message_prop_conversion_no_props ]}, {with_map_config, [], [ simple, @@ -168,6 +170,168 @@ simple_amqp10_src(Config) -> ok end). +message_prop_conversion(Config) -> + MapConfig = ?config(map_config, Config), + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_session(Config, + fun (Sess) -> + shovel_test_utils:set_param( + Config, + <<"test">>, [{<<"src-protocol">>, <<"amqp10">>}, + {<<"src-address">>, Src}, + {<<"dest-protocol">>, <<"amqp091">>}, + {<<"dest-queue">>, Dest}, + {<<"add-forward-headers">>, true}, + {<<"dest-add-timestamp-header">>, true}, + {<<"publish-properties">>, + case MapConfig of + true -> #{<<"cluster_id">> => <<"x">>}; + _ -> [{<<"cluster_id">>, <<"x">>}] + end} + ]), + LinkName = <<"dynamic-sender-", Dest/binary>>, + Tag = <<"tag1">>, + Payload = <<"payload">>, + {ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src, + unsettled, unsettled_state), + ok = await_amqp10_event(link, Sender, attached), + Headers = #{durable => true, priority => 3, ttl => 180000}, + Msg = amqp10_msg:set_headers(Headers, + amqp10_msg:new(Tag, Payload, false)), + Msg2 = amqp10_msg:set_properties(#{ + message_id => <<"message-id">>, + user_id => <<"guest">>, + to => <<"to">>, + subject => <<"subject">>, + reply_to => <<"reply-to">>, + correlation_id => <<"correlation-id">>, + content_type => <<"content-type">>, + content_encoding => <<"content-encoding">>, + %absolute_expiry_time => 123456789, + creation_time => 123456789, + group_id => <<"group-id">>, + group_sequence => 123, + reply_to_group_id => <<"reply-to-group-id">> + }, Msg), + Msg3 = amqp10_msg:set_application_properties(#{ + <<"x-binary">> => <<"binary">>, + <<"x-int">> => 33, + <<"x-negative-int">> => -33, + <<"x-float">> => 1.3, + <<"x-true">> => true, + <<"x-false">> => false + }, Msg2), + ok = amqp10_client:send_msg(Sender, Msg3), + receive + {amqp10_disposition, {accepted, Tag}} -> ok + after 3000 -> + exit(publish_disposition_not_received) + end, + amqp10_client:detach_link(Sender), + Channel = rabbit_ct_client_helpers:open_channel(Config), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload, props = #'P_basic'{ + content_type = ReceivedContentType, + content_encoding = ReceivedContentEncoding, + headers = Headers2, + delivery_mode = ReceivedDeliveryMode, + priority = ReceivedPriority, + correlation_id = ReceivedCorrelationId, + reply_to = ReceivedReplyTo, + expiration = ReceivedExpiration, + message_id = ReceivedMessageId, + timestamp = ReceivedTimestamp, + type = _ReceivedType, + user_id = ReceivedUserId, + app_id = _ReceivedAppId, + cluster_id = _ReceivedClusterId + }}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}), + + ?assertEqual(<<"payload">>, Payload), + ?assertEqual(2, ReceivedDeliveryMode), + ?assertEqual({longstr, <<"binary">>}, rabbit_misc:table_lookup(Headers2, <<"x-binary">>)), + ?assertEqual({long, 33}, rabbit_misc:table_lookup(Headers2, <<"x-int">>)), + ?assertEqual({long, -33}, rabbit_misc:table_lookup(Headers2, <<"x-negative-int">>)), + ?assertEqual({double, 1.3}, rabbit_misc:table_lookup(Headers2, <<"x-float">>)), + ?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers2, <<"x-true">>)), + ?assertEqual({bool, false}, rabbit_misc:table_lookup(Headers2, <<"x-false">>)), + + ?assertEqual(<<"content-type">>, ReceivedContentType), + ?assertEqual(<<"content-encoding">>, ReceivedContentEncoding), + + ?assertEqual(3, ReceivedPriority), + ?assertEqual(<<"correlation-id">>, ReceivedCorrelationId), + ?assertEqual(<<"reply-to">>, ReceivedReplyTo), + ?assertEqual(<<"180000">>, ReceivedExpiration), + ?assertEqual(<<"message-id">>, ReceivedMessageId), + ?assertEqual(123456, ReceivedTimestamp), % timestamp is divided by 1 000 + ?assertEqual(<<"guest">>, ReceivedUserId), + ok + end). + +message_prop_conversion_no_props(Config) -> + MapConfig = ?config(map_config, Config), + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_session(Config, + fun (Sess) -> + shovel_test_utils:set_param( + Config, + <<"test">>, [{<<"src-protocol">>, <<"amqp10">>}, + {<<"src-address">>, Src}, + {<<"dest-protocol">>, <<"amqp091">>}, + {<<"dest-queue">>, Dest}, + {<<"add-forward-headers">>, true}, + {<<"dest-add-timestamp-header">>, true}, + {<<"publish-properties">>, + case MapConfig of + true -> #{<<"cluster_id">> => <<"x">>}; + _ -> [{<<"cluster_id">>, <<"x">>}] + end} + ]), + LinkName = <<"dynamic-sender-", Dest/binary>>, + Tag = <<"tag1">>, + Payload = <<"payload">>, + {ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src, + unsettled, unsettled_state), + ok = await_amqp10_event(link, Sender, attached), + Msg = amqp10_msg:new(Tag, Payload, false), + ok = amqp10_client:send_msg(Sender, Msg), + receive + {amqp10_disposition, {accepted, Tag}} -> ok + after 3000 -> + exit(publish_disposition_not_received) + end, + amqp10_client:detach_link(Sender), + Channel = rabbit_ct_client_helpers:open_channel(Config), + {#'basic.get_ok'{}, #amqp_msg{payload = ReceivedPayload, props = #'P_basic'{ + content_type = undefined, + content_encoding = undefined, + headers = ReceivedHeaders, + delivery_mode = ReceivedDeliveryMode, + priority = ReceivedPriority, + correlation_id = undefined, + reply_to = undefined, + expiration = undefined, + message_id = undefined, + timestamp = undefined, + type = undefined, + user_id = undefined, + app_id = undefined, + cluster_id = ReceivedClusterId + }}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}), + + ?assertEqual(<<"payload">>, ReceivedPayload), + ?assertEqual(1, ReceivedDeliveryMode), + ?assertEqual(<<"x">>, ReceivedClusterId), + ?assertEqual(4, ReceivedPriority), + + ?assertNotEqual(undefined, rabbit_misc:table_lookup(ReceivedHeaders, <<"x-shovelled">>)), + + ok + end). + + change_definition(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config),