Skip to content

Commit

Permalink
Fix a bug where Sync message was not always sent when an error occurs
Browse files Browse the repository at this point in the history
  • Loading branch information
pguyot committed Mar 17, 2015
1 parent c9d7418 commit 5430d37
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
35 changes: 23 additions & 12 deletions src/pgsql_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@
% expect command_complete
| no_data
% expect ready_for_query
| {result, any()}.
| {result, any()}
% expect copy_data or copy_done
| {copy, [pgsql_format()]}.

-define(binary_to_integer(Bin), list_to_integer(binary_to_list(Bin))).

Expand Down Expand Up @@ -790,8 +792,7 @@ pgsql_simple_query_loop(Result0, Acc, AsyncT, #state{socket = Socket, subscriber
pgsql_simple_query_loop({rows, Fields, AccRows1}, Acc, AsyncT, State0);
{ok, #copy_out_response{format = Format}} when Result0 =:= [] ->
Fields = [Format],
State1 = oob_update_oid_map_from_fields_if_required(Fields, State0),
pgsql_simple_query_loop({copy, Fields, []}, Acc, AsyncT, State1);
pgsql_simple_query_loop({copy, Fields, []}, Acc, AsyncT, State0);
{ok, #copy_data{data = Data}} when is_tuple(Result0) andalso element(1, Result0) =:= copy ->
{copy, Fields, AccData0} = Result0,
AccData1 = [Data | AccData0],
Expand Down Expand Up @@ -997,8 +998,7 @@ pgsql_extended_query_receive_loop0(#data_row{values = Values}, {rows, Fields} =
pgsql_extended_query_receive_loop(LoopState, Fun, Acc1, FinalizeFun, MaxRowsStep, AsyncT, State0);
pgsql_extended_query_receive_loop0(#copy_out_response{format = Format}, _LoopState, Fun, Acc0, FinalizeFun, MaxRowsStep, AsyncT, State0) ->
Fields = [Format],
State1 = oob_update_oid_map_from_fields_if_required(Fields, State0),
pgsql_extended_query_receive_loop({copy, Fields}, Fun, Acc0, FinalizeFun, MaxRowsStep, AsyncT, State1);
pgsql_extended_query_receive_loop({copy, Fields}, Fun, Acc0, FinalizeFun, MaxRowsStep, AsyncT, State0);
pgsql_extended_query_receive_loop0(#copy_data{data = Data}, {copy, _Fields} = LoopState, Fun, Acc0, FinalizeFun, MaxRowsStep, AsyncT, State0) ->
Acc1 = Fun(Data, Acc0),
pgsql_extended_query_receive_loop(LoopState, Fun, Acc1, FinalizeFun, MaxRowsStep, AsyncT, State0);
Expand Down Expand Up @@ -1032,14 +1032,25 @@ pgsql_extended_query_receive_loop0(#copy_in_response{}, LoopState, Fun, Acc0, Fi
ok -> pgsql_extended_query_receive_loop(LoopState, Fun, Acc0, FinalizeFun, MaxRowsStep, AsyncT, State0);
{error,_} = SendError -> return_async(SendError, AsyncT, State0)
end;
pgsql_extended_query_receive_loop0(#error_response{fields = Fields}, _LoopState, _Fun, _Acc0, _FinalizeFun, 0, AsyncT, State0) ->
Error = {error, {pgsql_error, Fields}},
flush_until_ready_for_query(Error, AsyncT, State0);
pgsql_extended_query_receive_loop0(#error_response{fields = Fields}, _LoopState, _Fun, _Acc0, _FinalizeFun, _MaxRowsStep, AsyncT, #state{socket = {SockModule, Sock}} = State0) ->
pgsql_extended_query_receive_loop0(#error_response{fields = Fields}, LoopState, _Fun, _Acc0, _FinalizeFun, MaxRowsStep, AsyncT, #state{socket = {SockModule, Sock}} = State0) ->
Error = {error, {pgsql_error, Fields}},
case SockModule:send(Sock, pgsql_protocol:encode_sync_message()) of
ok -> flush_until_ready_for_query(Error, AsyncT, State0);
{error, _} = SendSyncPacketError -> return_async(SendSyncPacketError, AsyncT, State0)
% We already sent a Sync except when we sent a Flush :-)
% - when we asked for the statement description
% - when MaxRowsStep > 0
NeedSync = case LoopState of
{parse_complete_with_params, _Mode, _Args} -> true;
{parameter_description_with_params, _Mode, _Parameters} -> true;
_ when MaxRowsStep > 0 -> true;
_ -> false
end,
case NeedSync of
true ->
case SockModule:send(Sock, pgsql_protocol:encode_sync_message()) of
ok -> flush_until_ready_for_query(Error, AsyncT, State0);
{error, _} = SendSyncPacketError -> return_async(SendSyncPacketError, AsyncT, State0)
end;
false ->
flush_until_ready_for_query(Error, AsyncT, State0)
end;
pgsql_extended_query_receive_loop0(#ready_for_query{} = Message, _LoopState, _Fun, _Acc0, _FinalizeFun, _MaxRowsStep, AsyncT, State0) ->
Result = {error, {unexpected_message, Message}},
Expand Down
11 changes: 11 additions & 0 deletions test/pgsql_connection_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,17 @@ invalid_query_test_() ->
{'rollback',[]} = pgsql_connection:simple_query("ROLLBACK", [], 5000, Conn),
R1 = pgsql_connection:extended_query("insert into tmp(id, other) values (6, $1)", ["toto"], Conn),
?assertEqual({{insert, 0, 1}, []}, R1)
end),
?_test(begin
?assertMatch({error, {pgsql_error, _Error}}, pgsql_connection:extended_query("FOO", [], Conn)),
% Empty array forces a Describe command, thus we end the normal sequence with Flush and not with Sync
% Error recovery therefore requires a Sync to get the ReadyForQuery message.
?assertMatch({error, {pgsql_error, _Error}}, pgsql_connection:extended_query("FOO", [{array, [<<>>]}], Conn)),
% Likewise, cursor mode does send a Flush instead of a Sync after Bind
?assertMatch({error, {pgsql_error, _Error}}, pgsql_connection:foreach(fun(_Row) -> ok end, "FOO", Conn)),
% connection still usable
R = pgsql_connection:extended_query("insert into tmp(id, other) values (7, $1)", ["toto"], Conn),
?assertEqual({{insert, 0, 1}, []}, R)
end)
]
end
Expand Down

0 comments on commit 5430d37

Please sign in to comment.