Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1212 sync upstream #9

Merged
merged 19 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ jobs:
postgis:
- "3"
otp:
- "25.2"
- "26.0"
- "25.3"
- "24.3"
rebar3:
- "3.20.0"
Expand All @@ -40,11 +41,6 @@ jobs:
rebar3: "3.15.2"
pg: "12"
postgis: "3"
- os: "ubuntu-20.04"
otp: "20.3"
rebar3: "3.15.2"
pg: "12"
postgis: "3"

# env:
# PATH: ".:/usr/lib/postgresql/12/bin:$PATH"
Expand All @@ -59,7 +55,7 @@ jobs:
rebar3-version: ${{ matrix.rebar3 }}

- name: Setup postgresql server with postgis
run: sudo apt install postgresql-${{matrix.pg}} postgresql-contrib-${{matrix.pg}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}}-scripts
run: sudo apt update && sudo apt install postgresql-${{matrix.pg}} postgresql-contrib-${{matrix.pg}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}}-scripts

- name: elvis
run: make elvis
Expand Down
6 changes: 6 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
In 4.7.1

* Handle `ReadyForQuery` after `Error` in replication mode #279
* Fix the issue with columns in multi-squery response #283
* Fix compatibility with OTP-26 #284

In 4.7.0

* Flow control `{socket_active, N}` option in streaming replication #271
Expand Down
5 changes: 3 additions & 2 deletions src/commands/epgsql_cmd_squery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
-record(squery,
{query :: iodata(),
columns = [],
decoder}).
decoder = undefined :: epgsql_wire:row_decoder() | undefined}).

init(Sql) ->
#squery{query = Sql}.
Expand Down Expand Up @@ -63,7 +63,8 @@ handle_message(?COMMAND_COMPLETE, Bin, Sock, #squery{columns = Cols} = St) ->
_ ->
{ok, Cols, Rows}
end,
{add_result, Result, {complete, Complete}, Sock, St};
{add_result, Result, {complete, Complete}, Sock, St#squery{columns = [],
decoder = undefined}};
handle_message(?EMPTY_QUERY, _, Sock, St) ->
{add_result, {ok, [], []}, {complete, empty}, Sock, St};
handle_message(?READY_FOR_QUERY, _Status, Sock, _State) ->
Expand Down
10 changes: 7 additions & 3 deletions src/commands/epgsql_cmd_start_replication.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

-type response() :: ok | {error, epgsql:query_error()}.

-include("epgsql.hrl").
-include("protocol.hrl").
-include("../epgsql_replication.hrl").

Expand Down Expand Up @@ -65,8 +64,13 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
%% CopyBothResponse
handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) ->
{finish, ok, ok, epgsql_sock:set_packet_handler(on_replication, Sock)};
handle_message(?ERROR, Error, _Sock, _State) ->
handle_message(?ERROR, Error, Sock, State) ->
%% In the case of error, Postgresql replication protocol sends a ReadyForQuery message.
%% Adds an error to results to handle it later in the ?READY_FOR_QUERY branch.
Result = {error, Error},
{sync_required, Result};
{add_result, Result, Result, Sock, State};
handle_message(?READY_FOR_QUERY, _Data, Sock, _State) ->
[Error = {error, _}] = epgsql_sock:get_results(Sock), % assert a single error response
{finish, Error, done, Sock};
handle_message(_, _, _, _) ->
unknown.
2 changes: 1 addition & 1 deletion src/epgsql.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, epgsql,
[{description, "PostgreSQL Client"},
{vsn, "4.7.0.1"},
{vsn, "4.7.1.1"},
{modules, []},
{registered, []},
{applications, [kernel,
Expand Down
15 changes: 12 additions & 3 deletions src/epgsql_sock.erl
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,15 @@ send_multi(#state{mod = Mod, sock = Sock}, List) ->
end, List)).

do_send(gen_tcp, Sock, Bin) ->
gen_tcp_send(Sock, Bin);
do_send(ssl, Sock, Bin) ->
ssl:send(Sock, Bin).

-if(?OTP_RELEASE >= 26).
gen_tcp_send(Sock, Bin) ->
gen_tcp:send(Sock, Bin).
-else.
gen_tcp_send(Sock, Bin) ->
%% Why not gen_tcp:send/2?
%% See https://github.com/rabbitmq/rabbitmq-common/blob/v3.7.4/src/rabbit_writer.erl#L367-L384
%% Since `epgsql' uses `{active, true}' socket option by-default, it may potentially quickly
Expand All @@ -515,15 +524,15 @@ do_send(gen_tcp, Sock, Bin) ->
%% `{active, true}' is still the default.
%%
%% Because we use `inet' driver directly, we also have `handle_info({inet_reply, ...`
%% This `gen_tcp:send/2' problem have been solved in OTP-26, so this hack is no longer needed.
try erlang:port_command(Sock, Bin) of
true ->
ok
catch
error:_Error ->
{error, einval}
end;
do_send(ssl, Sock, Bin) ->
ssl:send(Sock, Bin).
end.
-endif.

loop(#state{data = Data, handler = Handler, subproto_state = Repl} = State) ->
case epgsql_wire:decode_message(Data) of
Expand Down
17 changes: 13 additions & 4 deletions test/epgsql_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,15 @@ connect_with_ssl(Config) ->
{ok, _Cols, [{true}]} = Module:equery(C, "select ssl_is_used()")
end,
"epgsql_test",
[{ssl, true}]).
[{ssl, true}, {ssl_opts, [{verify, verify_none}]}]).

cancel_query_for_connection_with_ssl(Config) ->
Module = ?config(module, Config),
{Host, Port} = epgsql_ct:connection_data(Config),
Module = ?config(module, Config),
Args2 = [ {port, Port}, {database, "epgsql_test_db1"}
| [ {ssl, true}
, {ssl_opts, [{verify, verify_none}]}
, {timeout, 1000} ]
],
{ok, C} = Module:connect(Host, "epgsql_test", Args2),
Expand Down Expand Up @@ -378,7 +379,8 @@ connect_with_client_cert(Config) ->
end,
"epgsql_test_cert",
[{ssl, true}, {ssl_opts, [{keyfile, File("client.key")},
{certfile, File("client.crt")}]}]).
{certfile, File("client.crt")},
{verify, verify_none}]}]).

connect_with_invalid_client_cert(Config) ->
{Host, Port} = epgsql_ct:connection_data(Config),
Expand Down Expand Up @@ -408,6 +410,7 @@ connect_with_invalid_client_cert(Config) ->
ssl_opts =>
[{keyfile, File("bad-client.key")},
{certfile, File("bad-client.crt")},
{verify, verify_none},
%% TLS-1.3 seems to connect fine, but then sends alert asynchronously
{versions, ['tlsv1.2']}
]}
Expand Down Expand Up @@ -542,8 +545,14 @@ cursor(Config) ->
multiple_result(Config) ->
Module = ?config(module, Config),
epgsql_ct:with_connection(Config, fun(C) ->
Module:squery(C, "delete test_table1 where id = 3;"),
[{ok, _, [{<<"1">>}]}, {ok, _, [{<<"2">>}]}] = Module:squery(C, "select 1; select 2"),
[{ok, _, [{<<"1">>}]}, {error, #error{}}] = Module:squery(C, "select 1; select foo;")
[{ok, _, [{<<"1">>}]}, {error, #error{}}] = Module:squery(C, "select 1; select foo;"),
[{ok, _, [{<<"one">>}]}, {ok, 1}, {ok, 1}] =
Module:squery(C,
"select value from test_table1 where id = 1; "
"insert into test_table1 (id, value) values (3, 'three');"
"delete from test_table1 where id = 3;")
end).

execute_batch(Config) ->
Expand Down Expand Up @@ -1661,7 +1670,7 @@ incremental_sock_active_n_ssl(Config) ->
?assertEqual(10241, length(Rows))
end,
"epgsql_test",
[{ssl, true}, {socket_active, 2}]).
[{ssl, true}, {ssl_opts, [{verify, verify_none}]}, {socket_active, 2}]).
-else.
%% {active, N} for SSL is only supported on OTP-21+
incremental_sock_active_n_ssl(_Config) ->
Expand Down
73 changes: 66 additions & 7 deletions test/epgsql_replication_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@

connect_in_repl_mode/1,
create_drop_replication_slot/1,
no_replication_slot/1,
replication_sync/1,
replication_async/1,
replication_async_active_n_socket/1,
replication_sync_active_n_socket/1,
replication_async_active_n_ssl/1,
two_replications_on_same_slot/1,

%% Callbacks
handle_x_log_data/4
Expand All @@ -29,11 +31,13 @@ end_per_suite(_Config) ->
all() ->
[connect_in_repl_mode,
create_drop_replication_slot,
no_replication_slot,
replication_async,
replication_sync,
replication_async_active_n_socket,
replication_sync_active_n_socket,
replication_async_active_n_ssl
replication_async_active_n_ssl,
two_replications_on_same_slot
].

connect_in_repl_mode(Config) ->
Expand Down Expand Up @@ -68,13 +72,65 @@ replication_sync_active_n_socket(Config) ->

-ifdef(OTP_RELEASE).
replication_async_active_n_ssl(Config) ->
replication_test_run(Config, self(), [{socket_active, 1}, {ssl, require}]).
replication_test_run(Config, self(), [{socket_active, 1},
{ssl, require},
{ssl_opts, [{verify, verify_none}]}]).
-else.
%% {active, N} for SSL is only supported on OTP-21+
replication_async_active_n_ssl(Config) ->
noop.
-endif.

two_replications_on_same_slot(Config) ->
Module = ?config(module, Config),
User = "epgsql_test_replication",
SlotName = "epgsql_test",
Parent = self(),
epgsql_ct:with_connection(
Config,
fun(C) ->
create_replication_slot(Config, C),
Res1 = Module:start_replication(C, SlotName, Parent, {C, Parent}, "0/0"),
?assertEqual(ok, Res1),
ErrorReceivedMsg = error_received,
spawn(
fun() ->
%% Test that the second connection receives the ReadyForQuery message from PG
%% synchronously after getting an error that the slot is occupied:
%% ReadyForQuery (B), Byte1('Z'), Int32(5), Byte1
%% https://www.postgresql.org/docs/current/protocol-message-formats.html
epgsql_ct:with_connection(
Config,
fun(C2) ->
Res2 = Module:start_replication(C2, SlotName, self(), {C2, self()}, "0/0"),
?assertMatch({error, #error{codename = object_in_use}}, Res2),
Parent ! ErrorReceivedMsg
end,
User,
[{replication, "database"}])
end),
receive
Result -> ?assertEqual(ErrorReceivedMsg, Result)
after
1000 -> ?assert(false, "Expected answer hasn't been received in 1000ms when "
"establishing a second connection to the same replication slot")
end
end,
User,
[{replication, "database"}]),
drop_replication_slot(Config).

no_replication_slot(Config) ->
Module = ?config(module, Config),
epgsql_ct:with_connection(
Config,
fun(C) ->
Res = Module:start_replication(C, "epgsql_test", self(), {C, self()}, "0/0"),
?assertMatch({error, #error{codename = undefined_object}}, Res)
end,
"epgsql_test_replication",
[{replication, "database"}]).

replication_test_run(Config, Callback) ->
replication_test_run(Config, Callback, []).

Expand All @@ -99,11 +155,7 @@ replication_test_run(Config, Callback, ExtOpts) ->
"epgsql_test_replication",
[{replication, "database"} | ExtOpts]),
%% cleanup
epgsql_ct:with_connection(
Config,
fun(C) -> drop_replication_slot(Config, C) end,
"epgsql_test_replication",
[{replication, "database"}]).
drop_replication_slot(Config).

create_replication_slot(Config, Connection) ->
Module = ?config(module, Config),
Expand All @@ -118,6 +170,13 @@ create_replication_slot(Config, Connection) ->
Cols),
?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).

drop_replication_slot(Config) ->
epgsql_ct:with_connection(
Config,
fun(C) -> drop_replication_slot(Config, C) end,
"epgsql_test_replication",
[{replication, "database"}]).

drop_replication_slot(Config, Connection) ->
Module = ?config(module, Config),
Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
Expand Down