Skip to content

Commit

Permalink
Merge pull request #9 from emqx/1212-sync-upstream
Browse files Browse the repository at this point in the history
1212 sync upstream
  • Loading branch information
zmstone authored Dec 12, 2023
2 parents fd3d41a + 3ba7cd9 commit 563c801
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 27 deletions.
10 changes: 3 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ jobs:
postgis:
- "3"
otp:
- "25.2"
- "26.0"
- "25.3"
- "24.3"
rebar3:
- "3.20.0"
Expand All @@ -40,11 +41,6 @@ jobs:
rebar3: "3.15.2"
pg: "12"
postgis: "3"
- os: "ubuntu-20.04"
otp: "20.3"
rebar3: "3.15.2"
pg: "12"
postgis: "3"

# env:
# PATH: ".:/usr/lib/postgresql/12/bin:$PATH"
Expand All @@ -59,7 +55,7 @@ jobs:
rebar3-version: ${{ matrix.rebar3 }}

- name: Setup postgresql server with postgis
run: sudo apt install postgresql-${{matrix.pg}} postgresql-contrib-${{matrix.pg}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}}-scripts
run: sudo apt update && sudo apt install postgresql-${{matrix.pg}} postgresql-contrib-${{matrix.pg}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}} postgresql-${{matrix.pg}}-postgis-${{matrix.postgis}}-scripts

- name: elvis
run: make elvis
Expand Down
6 changes: 6 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
In 4.7.1

* Handle `ReadyForQuery` after `Error` in replication mode #279
* Fix the issue with columns in multi-squery response #283
* Fix compatibility with OTP-26 #284

In 4.7.0

* Flow control `{socket_active, N}` option in streaming replication #271
Expand Down
5 changes: 3 additions & 2 deletions src/commands/epgsql_cmd_squery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
-record(squery,
{query :: iodata(),
columns = [],
decoder}).
decoder = undefined :: epgsql_wire:row_decoder() | undefined}).

init(Sql) ->
#squery{query = Sql}.
Expand Down Expand Up @@ -63,7 +63,8 @@ handle_message(?COMMAND_COMPLETE, Bin, Sock, #squery{columns = Cols} = St) ->
_ ->
{ok, Cols, Rows}
end,
{add_result, Result, {complete, Complete}, Sock, St};
{add_result, Result, {complete, Complete}, Sock, St#squery{columns = [],
decoder = undefined}};
handle_message(?EMPTY_QUERY, _, Sock, St) ->
{add_result, {ok, [], []}, {complete, empty}, Sock, St};
handle_message(?READY_FOR_QUERY, _Status, Sock, _State) ->
Expand Down
10 changes: 7 additions & 3 deletions src/commands/epgsql_cmd_start_replication.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

-type response() :: ok | {error, epgsql:query_error()}.

-include("epgsql.hrl").
-include("protocol.hrl").
-include("../epgsql_replication.hrl").

Expand Down Expand Up @@ -65,8 +64,13 @@ execute(Sock, #start_repl{slot = ReplicationSlot, callback = Callback,
%% CopyBothResponse
handle_message(?COPY_BOTH_RESPONSE, _Data, Sock, _State) ->
{finish, ok, ok, epgsql_sock:set_packet_handler(on_replication, Sock)};
handle_message(?ERROR, Error, _Sock, _State) ->
handle_message(?ERROR, Error, Sock, State) ->
%% In the case of error, Postgresql replication protocol sends a ReadyForQuery message.
%% Adds an error to results to handle it later in the ?READY_FOR_QUERY branch.
Result = {error, Error},
{sync_required, Result};
{add_result, Result, Result, Sock, State};
handle_message(?READY_FOR_QUERY, _Data, Sock, _State) ->
[Error = {error, _}] = epgsql_sock:get_results(Sock), % assert a single error response
{finish, Error, done, Sock};
handle_message(_, _, _, _) ->
unknown.
2 changes: 1 addition & 1 deletion src/epgsql.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, epgsql,
[{description, "PostgreSQL Client"},
{vsn, "4.7.0.1"},
{vsn, "4.7.1.1"},
{modules, []},
{registered, []},
{applications, [kernel,
Expand Down
15 changes: 12 additions & 3 deletions src/epgsql_sock.erl
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,15 @@ send_multi(#state{mod = Mod, sock = Sock}, List) ->
end, List)).

do_send(gen_tcp, Sock, Bin) ->
gen_tcp_send(Sock, Bin);
do_send(ssl, Sock, Bin) ->
ssl:send(Sock, Bin).

-if(?OTP_RELEASE >= 26).
gen_tcp_send(Sock, Bin) ->
gen_tcp:send(Sock, Bin).
-else.
gen_tcp_send(Sock, Bin) ->
%% Why not gen_tcp:send/2?
%% See https://github.com/rabbitmq/rabbitmq-common/blob/v3.7.4/src/rabbit_writer.erl#L367-L384
%% Since `epgsql' uses `{active, true}' socket option by-default, it may potentially quickly
Expand All @@ -515,15 +524,15 @@ do_send(gen_tcp, Sock, Bin) ->
%% `{active, true}' is still the default.
%%
%% Because we use `inet' driver directly, we also have `handle_info({inet_reply, ...`
%% This `gen_tcp:send/2' problem have been solved in OTP-26, so this hack is no longer needed.
try erlang:port_command(Sock, Bin) of
true ->
ok
catch
error:_Error ->
{error, einval}
end;
do_send(ssl, Sock, Bin) ->
ssl:send(Sock, Bin).
end.
-endif.

loop(#state{data = Data, handler = Handler, subproto_state = Repl} = State) ->
case epgsql_wire:decode_message(Data) of
Expand Down
17 changes: 13 additions & 4 deletions test/epgsql_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,15 @@ connect_with_ssl(Config) ->
{ok, _Cols, [{true}]} = Module:equery(C, "select ssl_is_used()")
end,
"epgsql_test",
[{ssl, true}]).
[{ssl, true}, {ssl_opts, [{verify, verify_none}]}]).

cancel_query_for_connection_with_ssl(Config) ->
Module = ?config(module, Config),
{Host, Port} = epgsql_ct:connection_data(Config),
Module = ?config(module, Config),
Args2 = [ {port, Port}, {database, "epgsql_test_db1"}
| [ {ssl, true}
, {ssl_opts, [{verify, verify_none}]}
, {timeout, 1000} ]
],
{ok, C} = Module:connect(Host, "epgsql_test", Args2),
Expand Down Expand Up @@ -378,7 +379,8 @@ connect_with_client_cert(Config) ->
end,
"epgsql_test_cert",
[{ssl, true}, {ssl_opts, [{keyfile, File("client.key")},
{certfile, File("client.crt")}]}]).
{certfile, File("client.crt")},
{verify, verify_none}]}]).

connect_with_invalid_client_cert(Config) ->
{Host, Port} = epgsql_ct:connection_data(Config),
Expand Down Expand Up @@ -408,6 +410,7 @@ connect_with_invalid_client_cert(Config) ->
ssl_opts =>
[{keyfile, File("bad-client.key")},
{certfile, File("bad-client.crt")},
{verify, verify_none},
%% TLS-1.3 seems to connect fine, but then sends alert asynchronously
{versions, ['tlsv1.2']}
]}
Expand Down Expand Up @@ -542,8 +545,14 @@ cursor(Config) ->
multiple_result(Config) ->
Module = ?config(module, Config),
epgsql_ct:with_connection(Config, fun(C) ->
Module:squery(C, "delete test_table1 where id = 3;"),
[{ok, _, [{<<"1">>}]}, {ok, _, [{<<"2">>}]}] = Module:squery(C, "select 1; select 2"),
[{ok, _, [{<<"1">>}]}, {error, #error{}}] = Module:squery(C, "select 1; select foo;")
[{ok, _, [{<<"1">>}]}, {error, #error{}}] = Module:squery(C, "select 1; select foo;"),
[{ok, _, [{<<"one">>}]}, {ok, 1}, {ok, 1}] =
Module:squery(C,
"select value from test_table1 where id = 1; "
"insert into test_table1 (id, value) values (3, 'three');"
"delete from test_table1 where id = 3;")
end).

execute_batch(Config) ->
Expand Down Expand Up @@ -1661,7 +1670,7 @@ incremental_sock_active_n_ssl(Config) ->
?assertEqual(10241, length(Rows))
end,
"epgsql_test",
[{ssl, true}, {socket_active, 2}]).
[{ssl, true}, {ssl_opts, [{verify, verify_none}]}, {socket_active, 2}]).
-else.
%% {active, N} for SSL is only supported on OTP-21+
incremental_sock_active_n_ssl(_Config) ->
Expand Down
73 changes: 66 additions & 7 deletions test/epgsql_replication_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@

connect_in_repl_mode/1,
create_drop_replication_slot/1,
no_replication_slot/1,
replication_sync/1,
replication_async/1,
replication_async_active_n_socket/1,
replication_sync_active_n_socket/1,
replication_async_active_n_ssl/1,
two_replications_on_same_slot/1,

%% Callbacks
handle_x_log_data/4
Expand All @@ -29,11 +31,13 @@ end_per_suite(_Config) ->
all() ->
[connect_in_repl_mode,
create_drop_replication_slot,
no_replication_slot,
replication_async,
replication_sync,
replication_async_active_n_socket,
replication_sync_active_n_socket,
replication_async_active_n_ssl
replication_async_active_n_ssl,
two_replications_on_same_slot
].

connect_in_repl_mode(Config) ->
Expand Down Expand Up @@ -68,13 +72,65 @@ replication_sync_active_n_socket(Config) ->

-ifdef(OTP_RELEASE).
replication_async_active_n_ssl(Config) ->
replication_test_run(Config, self(), [{socket_active, 1}, {ssl, require}]).
replication_test_run(Config, self(), [{socket_active, 1},
{ssl, require},
{ssl_opts, [{verify, verify_none}]}]).
-else.
%% {active, N} for SSL is only supported on OTP-21+
replication_async_active_n_ssl(Config) ->
noop.
-endif.

two_replications_on_same_slot(Config) ->
Module = ?config(module, Config),
User = "epgsql_test_replication",
SlotName = "epgsql_test",
Parent = self(),
epgsql_ct:with_connection(
Config,
fun(C) ->
create_replication_slot(Config, C),
Res1 = Module:start_replication(C, SlotName, Parent, {C, Parent}, "0/0"),
?assertEqual(ok, Res1),
ErrorReceivedMsg = error_received,
spawn(
fun() ->
%% Test that the second connection receives the ReadyForQuery message from PG
%% synchronously after getting an error that the slot is occupied:
%% ReadyForQuery (B), Byte1('Z'), Int32(5), Byte1
%% https://www.postgresql.org/docs/current/protocol-message-formats.html
epgsql_ct:with_connection(
Config,
fun(C2) ->
Res2 = Module:start_replication(C2, SlotName, self(), {C2, self()}, "0/0"),
?assertMatch({error, #error{codename = object_in_use}}, Res2),
Parent ! ErrorReceivedMsg
end,
User,
[{replication, "database"}])
end),
receive
Result -> ?assertEqual(ErrorReceivedMsg, Result)
after
1000 -> ?assert(false, "Expected answer hasn't been received in 1000ms when "
"establishing a second connection to the same replication slot")
end
end,
User,
[{replication, "database"}]),
drop_replication_slot(Config).

no_replication_slot(Config) ->
Module = ?config(module, Config),
epgsql_ct:with_connection(
Config,
fun(C) ->
Res = Module:start_replication(C, "epgsql_test", self(), {C, self()}, "0/0"),
?assertMatch({error, #error{codename = undefined_object}}, Res)
end,
"epgsql_test_replication",
[{replication, "database"}]).

replication_test_run(Config, Callback) ->
replication_test_run(Config, Callback, []).

Expand All @@ -99,11 +155,7 @@ replication_test_run(Config, Callback, ExtOpts) ->
"epgsql_test_replication",
[{replication, "database"} | ExtOpts]),
%% cleanup
epgsql_ct:with_connection(
Config,
fun(C) -> drop_replication_slot(Config, C) end,
"epgsql_test_replication",
[{replication, "database"}]).
drop_replication_slot(Config).

create_replication_slot(Config, Connection) ->
Module = ?config(module, Config),
Expand All @@ -118,6 +170,13 @@ create_replication_slot(Config, Connection) ->
Cols),
?assertMatch([{<<"epgsql_test">>, _, _, <<"test_decoding">>}], Rows).

drop_replication_slot(Config) ->
epgsql_ct:with_connection(
Config,
fun(C) -> drop_replication_slot(Config, C) end,
"epgsql_test_replication",
[{replication, "database"}]).

drop_replication_slot(Config, Connection) ->
Module = ?config(module, Config),
Result = Module:squery(Connection, "DROP_REPLICATION_SLOT ""epgsql_test"""),
Expand Down

0 comments on commit 563c801

Please sign in to comment.