diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0d647cf1..f79b243c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,7 +20,8 @@ jobs: postgis: - "3" otp: - - "25.2" + - "26.0" + - "25.3" - "24.3" rebar3: - "3.20.0" @@ -40,11 +41,6 @@ jobs: rebar3: "3.15.2" pg: "12" postgis: "3" - - os: "ubuntu-20.04" - otp: "20.3" - rebar3: "3.15.2" - pg: "12" - postgis: "3" # env: # PATH: ".:/usr/lib/postgresql/12/bin:$PATH" @@ -59,7 +55,7 @@ jobs: rebar3-version: ${{ matrix.rebar3 }} - name: Setup postgresql server with postgis - run: sudo apt install postgresql-${{matrix.pg}} postgresql-contrib-${{matrix.pg}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}}-scripts + run: sudo apt update && sudo apt install postgresql-${{matrix.pg}} postgresql-contrib-${{matrix.pg}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}}-scripts - name: elvis run: make elvis diff --git a/CHANGES b/CHANGES index 55b935ff..206a597c 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,9 @@ +In 4.7.1 + +* Handle `ReadyForQuery` after `Error` in replication mode #279 +* Fix the issue with columns in multi-squery response #283 +* Fix compatibility with OTP-26 #284 + In 4.7.0 * Flow control `{socket_active, N}` option in streaming replication #271 diff --git a/src/commands/epgsql_cmd_squery.erl b/src/commands/epgsql_cmd_squery.erl index b324a2b5..17b01e6f 100644 --- a/src/commands/epgsql_cmd_squery.erl +++ b/src/commands/epgsql_cmd_squery.erl @@ -32,7 +32,7 @@ -record(squery, {query :: iodata(), columns = [], - decoder}). + decoder = undefined :: epgsql_wire:row_decoder() | undefined}). init(Sql) -> #squery{query = Sql}. @@ -63,7 +63,8 @@ handle_message(?COMMAND_COMPLETE, Bin, Sock, #squery{columns = Cols} = St) -> _ -> {ok, Cols, Rows} end, - {add_result, Result, {complete, Complete}, Sock, St}; + {add_result, Result, {complete, Complete}, Sock, St#squery{columns = [], + decoder = undefined}}; handle_message(?EMPTY_QUERY, _, Sock, St) -> {add_result, {ok, [], []}, {complete, empty}, Sock, St}; handle_message(?READY_FOR_QUERY, _Status, Sock, _State) -> diff --git a/src/commands/epgsql_cmd_start_replication.erl b/src/commands/epgsql_cmd_start_replication.erl index 92f10d50..a35bb8b7 100644 --- a/src/commands/epgsql_cmd_start_replication.erl +++ b/src/commands/epgsql_cmd_start_replication.erl @@ -12,7 +12,6 @@ -type response() :: ok | {error, epgsql:query_error()}. --include("epgsql.hrl"). -include("protocol.hrl"). -include("../epgsql_replication.hrl"). @@ -65,8 +64,13 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback, %% CopyBothResponse handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) -> {finish, ok, ok, epgsql_sock:set_packet_handler(on_replication, Sock)}; -handle_message(?ERROR, Error, _Sock, _State) -> +handle_message(?ERROR, Error, Sock, State) -> + %% In the case of error, Postgresql replication protocol sends a ReadyForQuery message. + %% Adds an error to results to handle it later in the ?READY_FOR_QUERY branch. Result = {error, Error}, - {sync_required, Result}; + {add_result, Result, Result, Sock, State}; +handle_message(?READY_FOR_QUERY, _Data, Sock, _State) -> + [Error = {error, _}] = epgsql_sock:get_results(Sock), % assert a single error response + {finish, Error, done, Sock}; handle_message(_, _, _, _) -> unknown. diff --git a/src/epgsql.app.src b/src/epgsql.app.src index e289a8db..74d84d8d 100644 --- a/src/epgsql.app.src +++ b/src/epgsql.app.src @@ -1,6 +1,6 @@ {application, epgsql, [{description, "PostgreSQL Client"}, - {vsn, "4.7.0.1"}, + {vsn, "4.7.1.1"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/src/epgsql_sock.erl b/src/epgsql_sock.erl index 2e1b89c3..a7862fcf 100644 --- a/src/epgsql_sock.erl +++ b/src/epgsql_sock.erl @@ -507,6 +507,15 @@ send_multi(#state{mod = Mod, sock = Sock}, List) -> end, List)). do_send(gen_tcp, Sock, Bin) -> + gen_tcp_send(Sock, Bin); +do_send(ssl, Sock, Bin) -> + ssl:send(Sock, Bin). + +-if(?OTP_RELEASE >= 26). +gen_tcp_send(Sock, Bin) -> + gen_tcp:send(Sock, Bin). +-else. +gen_tcp_send(Sock, Bin) -> %% Why not gen_tcp:send/2? %% See https://github.com/rabbitmq/rabbitmq-common/blob/v3.7.4/src/rabbit_writer.erl#L367-L384 %% Since `epgsql' uses `{active, true}' socket option by-default, it may potentially quickly @@ -515,15 +524,15 @@ do_send(gen_tcp, Sock, Bin) -> %% `{active, true}' is still the default. %% %% Because we use `inet' driver directly, we also have `handle_info({inet_reply, ...` + %% This `gen_tcp:send/2' problem have been solved in OTP-26, so this hack is no longer needed. try erlang:port_command(Sock, Bin) of true -> ok catch error:_Error -> {error, einval} - end; -do_send(ssl, Sock, Bin) -> - ssl:send(Sock, Bin). + end. +-endif. loop(#state{data = Data, handler = Handler, subproto_state = Repl} = State) -> case epgsql_wire:decode_message(Data) of diff --git a/test/epgsql_SUITE.erl b/test/epgsql_SUITE.erl index 92589ab1..ae381975 100644 --- a/test/epgsql_SUITE.erl +++ b/test/epgsql_SUITE.erl @@ -307,7 +307,7 @@ connect_with_ssl(Config) -> {ok, _Cols, [{true}]} = Module:equery(C, "select ssl_is_used()") end, "epgsql_test", - [{ssl, true}]). + [{ssl, true}, {ssl_opts, [{verify, verify_none}]}]). cancel_query_for_connection_with_ssl(Config) -> Module = ?config(module, Config), @@ -315,6 +315,7 @@ cancel_query_for_connection_with_ssl(Config) -> Module = ?config(module, Config), Args2 = [ {port, Port}, {database, "epgsql_test_db1"} | [ {ssl, true} + , {ssl_opts, [{verify, verify_none}]} , {timeout, 1000} ] ], {ok, C} = Module:connect(Host, "epgsql_test", Args2), @@ -378,7 +379,8 @@ connect_with_client_cert(Config) -> end, "epgsql_test_cert", [{ssl, true}, {ssl_opts, [{keyfile, File("client.key")}, - {certfile, File("client.crt")}]}]). + {certfile, File("client.crt")}, + {verify, verify_none}]}]). connect_with_invalid_client_cert(Config) -> {Host, Port} = epgsql_ct:connection_data(Config), @@ -408,6 +410,7 @@ connect_with_invalid_client_cert(Config) -> ssl_opts => [{keyfile, File("bad-client.key")}, {certfile, File("bad-client.crt")}, + {verify, verify_none}, %% TLS-1.3 seems to connect fine, but then sends alert asynchronously {versions, ['tlsv1.2']} ]} @@ -542,8 +545,14 @@ cursor(Config) -> multiple_result(Config) -> Module = ?config(module, Config), epgsql_ct:with_connection(Config, fun(C) -> + Module:squery(C, "delete test_table1 where id = 3;"), [{ok, _, [{<<"1">>}]}, {ok, _, [{<<"2">>}]}] = Module:squery(C, "select 1; select 2"), - [{ok, _, [{<<"1">>}]}, {error, #error{}}] = Module:squery(C, "select 1; select foo;") + [{ok, _, [{<<"1">>}]}, {error, #error{}}] = Module:squery(C, "select 1; select foo;"), + [{ok, _, [{<<"one">>}]}, {ok, 1}, {ok, 1}] = + Module:squery(C, + "select value from test_table1 where id = 1; " + "insert into test_table1 (id, value) values (3, 'three');" + "delete from test_table1 where id = 3;") end). execute_batch(Config) -> @@ -1661,7 +1670,7 @@ incremental_sock_active_n_ssl(Config) -> ?assertEqual(10241, length(Rows)) end, "epgsql_test", - [{ssl, true}, {socket_active, 2}]). + [{ssl, true}, {ssl_opts, [{verify, verify_none}]}, {socket_active, 2}]). -else. %% {active, N} for SSL is only supported on OTP-21+ incremental_sock_active_n_ssl(_Config) -> diff --git a/test/epgsql_replication_SUITE.erl b/test/epgsql_replication_SUITE.erl index d8ff1eef..d0510d4c 100644 --- a/test/epgsql_replication_SUITE.erl +++ b/test/epgsql_replication_SUITE.erl @@ -10,11 +10,13 @@ connect_in_repl_mode/1, create_drop_replication_slot/1, + no_replication_slot/1, replication_sync/1, replication_async/1, replication_async_active_n_socket/1, replication_sync_active_n_socket/1, replication_async_active_n_ssl/1, + two_replications_on_same_slot/1, %% Callbacks handle_x_log_data/4 @@ -29,11 +31,13 @@ end_per_suite(_Config) -> all() -> [connect_in_repl_mode, create_drop_replication_slot, + no_replication_slot, replication_async, replication_sync, replication_async_active_n_socket, replication_sync_active_n_socket, - replication_async_active_n_ssl + replication_async_active_n_ssl, + two_replications_on_same_slot ]. connect_in_repl_mode(Config) -> @@ -68,13 +72,65 @@ replication_sync_active_n_socket(Config) -> -ifdef(OTP_RELEASE). replication_async_active_n_ssl(Config) -> - replication_test_run(Config, self(), [{socket_active, 1}, {ssl, require}]). + replication_test_run(Config, self(), [{socket_active, 1}, + {ssl, require}, + {ssl_opts, [{verify, verify_none}]}]). -else. %% {active, N} for SSL is only supported on OTP-21+ replication_async_active_n_ssl(Config) -> noop. -endif. +two_replications_on_same_slot(Config) -> + Module = ?config(module, Config), + User = "epgsql_test_replication", + SlotName = "epgsql_test", + Parent = self(), + epgsql_ct:with_connection( + Config, + fun(C) -> + create_replication_slot(Config, C), + Res1 = Module:start_replication(C, SlotName, Parent, {C, Parent}, "0/0"), + ?assertEqual(ok, Res1), + ErrorReceivedMsg = error_received, + spawn( + fun() -> + %% Test that the second connection receives the ReadyForQuery message from PG + %% synchronously after getting an error that the slot is occupied: + %% ReadyForQuery (B), Byte1('Z'), Int32(5), Byte1 + %% https://www.postgresql.org/docs/current/protocol-message-formats.html + epgsql_ct:with_connection( + Config, + fun(C2) -> + Res2 = Module:start_replication(C2, SlotName, self(), {C2, self()}, "0/0"), + ?assertMatch({error, #error{codename = object_in_use}}, Res2), + Parent ! ErrorReceivedMsg + end, + User, + [{replication, "database"}]) + end), + receive + Result -> ?assertEqual(ErrorReceivedMsg, Result) + after + 1000 -> ?assert(false, "Expected answer hasn't been received in 1000ms when " + "establishing a second connection to the same replication slot") + end + end, + User, + [{replication, "database"}]), + drop_replication_slot(Config). + +no_replication_slot(Config) -> + Module = ?config(module, Config), + epgsql_ct:with_connection( + Config, + fun(C) -> + Res = Module:start_replication(C, "epgsql_test", self(), {C, self()}, "0/0"), + ?assertMatch({error, #error{codename = undefined_object}}, Res) + end, + "epgsql_test_replication", + [{replication, "database"}]). + replication_test_run(Config, Callback) -> replication_test_run(Config, Callback, []). @@ -99,11 +155,7 @@ replication_test_run(Config, Callback, ExtOpts) -> "epgsql_test_replication", [{replication, "database"} | ExtOpts]), %% cleanup - epgsql_ct:with_connection( - Config, - fun(C) -> drop_replication_slot(Config, C) end, - "epgsql_test_replication", - [{replication, "database"}]). + drop_replication_slot(Config). create_replication_slot(Config, Connection) -> Module = ?config(module, Config), @@ -118,6 +170,13 @@ create_replication_slot(Config, Connection) -> Cols), ?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows). +drop_replication_slot(Config) -> + epgsql_ct:with_connection( + Config, + fun(C) -> drop_replication_slot(Config, C) end, + "epgsql_test_replication", + [{replication, "database"}]). + drop_replication_slot(Config, Connection) -> Module = ?config(module, Config), Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),