From 5430d376844efb288fe6bcc6e4d9cfa3499d0a57 Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Tue, 17 Mar 2015 16:49:46 +0100 Subject: [PATCH] Fix a bug where Sync message was not always sent when an error occurs --- src/pgsql_connection.erl | 35 ++++++++++++++++++++++------------ test/pgsql_connection_test.erl | 11 +++++++++++ 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/pgsql_connection.erl b/src/pgsql_connection.erl index 8d514ca..d3f8431 100644 --- a/src/pgsql_connection.erl +++ b/src/pgsql_connection.erl @@ -174,7 +174,9 @@ % expect command_complete | no_data % expect ready_for_query - | {result, any()}. + | {result, any()} + % expect copy_data or copy_done + | {copy, [pgsql_format()]}. -define(binary_to_integer(Bin), list_to_integer(binary_to_list(Bin))). @@ -790,8 +792,7 @@ pgsql_simple_query_loop(Result0, Acc, AsyncT, #state{socket = Socket, subscriber pgsql_simple_query_loop({rows, Fields, AccRows1}, Acc, AsyncT, State0); {ok, #copy_out_response{format = Format}} when Result0 =:= [] -> Fields = [Format], - State1 = oob_update_oid_map_from_fields_if_required(Fields, State0), - pgsql_simple_query_loop({copy, Fields, []}, Acc, AsyncT, State1); + pgsql_simple_query_loop({copy, Fields, []}, Acc, AsyncT, State0); {ok, #copy_data{data = Data}} when is_tuple(Result0) andalso element(1, Result0) =:= copy -> {copy, Fields, AccData0} = Result0, AccData1 = [Data | AccData0], @@ -997,8 +998,7 @@ pgsql_extended_query_receive_loop0(#data_row{values = Values}, {rows, Fields} = pgsql_extended_query_receive_loop(LoopState, Fun, Acc1, FinalizeFun, MaxRowsStep, AsyncT, State0); pgsql_extended_query_receive_loop0(#copy_out_response{format = Format}, _LoopState, Fun, Acc0, FinalizeFun, MaxRowsStep, AsyncT, State0) -> Fields = [Format], - State1 = oob_update_oid_map_from_fields_if_required(Fields, State0), - pgsql_extended_query_receive_loop({copy, Fields}, Fun, Acc0, FinalizeFun, MaxRowsStep, AsyncT, State1); + pgsql_extended_query_receive_loop({copy, Fields}, Fun, Acc0, FinalizeFun, MaxRowsStep, AsyncT, State0); pgsql_extended_query_receive_loop0(#copy_data{data = Data}, {copy, _Fields} = LoopState, Fun, Acc0, FinalizeFun, MaxRowsStep, AsyncT, State0) -> Acc1 = Fun(Data, Acc0), pgsql_extended_query_receive_loop(LoopState, Fun, Acc1, FinalizeFun, MaxRowsStep, AsyncT, State0); @@ -1032,14 +1032,25 @@ pgsql_extended_query_receive_loop0(#copy_in_response{}, LoopState, Fun, Acc0, Fi ok -> pgsql_extended_query_receive_loop(LoopState, Fun, Acc0, FinalizeFun, MaxRowsStep, AsyncT, State0); {error,_} = SendError -> return_async(SendError, AsyncT, State0) end; -pgsql_extended_query_receive_loop0(#error_response{fields = Fields}, _LoopState, _Fun, _Acc0, _FinalizeFun, 0, AsyncT, State0) -> - Error = {error, {pgsql_error, Fields}}, - flush_until_ready_for_query(Error, AsyncT, State0); -pgsql_extended_query_receive_loop0(#error_response{fields = Fields}, _LoopState, _Fun, _Acc0, _FinalizeFun, _MaxRowsStep, AsyncT, #state{socket = {SockModule, Sock}} = State0) -> +pgsql_extended_query_receive_loop0(#error_response{fields = Fields}, LoopState, _Fun, _Acc0, _FinalizeFun, MaxRowsStep, AsyncT, #state{socket = {SockModule, Sock}} = State0) -> Error = {error, {pgsql_error, Fields}}, - case SockModule:send(Sock, pgsql_protocol:encode_sync_message()) of - ok -> flush_until_ready_for_query(Error, AsyncT, State0); - {error, _} = SendSyncPacketError -> return_async(SendSyncPacketError, AsyncT, State0) + % We already sent a Sync except when we sent a Flush :-) + % - when we asked for the statement description + % - when MaxRowsStep > 0 + NeedSync = case LoopState of + {parse_complete_with_params, _Mode, _Args} -> true; + {parameter_description_with_params, _Mode, _Parameters} -> true; + _ when MaxRowsStep > 0 -> true; + _ -> false + end, + case NeedSync of + true -> + case SockModule:send(Sock, pgsql_protocol:encode_sync_message()) of + ok -> flush_until_ready_for_query(Error, AsyncT, State0); + {error, _} = SendSyncPacketError -> return_async(SendSyncPacketError, AsyncT, State0) + end; + false -> + flush_until_ready_for_query(Error, AsyncT, State0) end; pgsql_extended_query_receive_loop0(#ready_for_query{} = Message, _LoopState, _Fun, _Acc0, _FinalizeFun, _MaxRowsStep, AsyncT, State0) -> Result = {error, {unexpected_message, Message}}, diff --git a/test/pgsql_connection_test.erl b/test/pgsql_connection_test.erl index 12fc0a8..7bde708 100644 --- a/test/pgsql_connection_test.erl +++ b/test/pgsql_connection_test.erl @@ -1343,6 +1343,17 @@ invalid_query_test_() -> {'rollback',[]} = pgsql_connection:simple_query("ROLLBACK", [], 5000, Conn), R1 = pgsql_connection:extended_query("insert into tmp(id, other) values (6, $1)", ["toto"], Conn), ?assertEqual({{insert, 0, 1}, []}, R1) + end), + ?_test(begin + ?assertMatch({error, {pgsql_error, _Error}}, pgsql_connection:extended_query("FOO", [], Conn)), + % Empty array forces a Describe command, thus we end the normal sequence with Flush and not with Sync + % Error recovery therefore requires a Sync to get the ReadyForQuery message. + ?assertMatch({error, {pgsql_error, _Error}}, pgsql_connection:extended_query("FOO", [{array, [<<>>]}], Conn)), + % Likewise, cursor mode does send a Flush instead of a Sync after Bind + ?assertMatch({error, {pgsql_error, _Error}}, pgsql_connection:foreach(fun(_Row) -> ok end, "FOO", Conn)), + % connection still usable + R = pgsql_connection:extended_query("insert into tmp(id, other) values (7, $1)", ["toto"], Conn), + ?assertEqual({{insert, 0, 1}, []}, R) end) ] end