From 91baf6af0c2e2ac0696ea65c0e9f7c966673dc77 Mon Sep 17 00:00:00 2001 From: Eugene Shubin Date: Tue, 11 Aug 2015 17:12:45 +0200 Subject: [PATCH 1/4] Prepare SQL from file on application start. New function for using prepared SQL --- pgapp.config.sample | 14 +++++++- priv/empty.sql | 0 priv/prepared.sql | 2 ++ src/pgapp.erl | 11 +++++- src/pgapp_sup.erl | 33 +++++++++++++---- src/pgapp_worker.erl | 85 +++++++++++++++++++++++++++++++++++++------- src/worker_args.hrl | 6 ++++ 7 files changed, 130 insertions(+), 21 deletions(-) create mode 100644 priv/empty.sql create mode 100644 priv/prepared.sql create mode 100644 src/worker_args.hrl diff --git a/pgapp.config.sample b/pgapp.config.sample index bfdf587..f21e192 100644 --- a/pgapp.config.sample +++ b/pgapp.config.sample @@ -19,7 +19,19 @@ {database, "db2"}, {username, "user"}, {password, "pass"} - ]} + ]}, + {pool3, [ + {size, 10}, + {max_overflow, 20} + ], + [ + {host, "localhost"}, + {database, "db3"}, + {username, "user"}, + {password, "pass"} + ], + "priv/prepared.sql"}, + ]} ] }]. diff --git a/priv/empty.sql b/priv/empty.sql new file mode 100644 index 0000000..e69de29 diff --git a/priv/prepared.sql b/priv/prepared.sql new file mode 100644 index 0000000..0907f81 --- /dev/null +++ b/priv/prepared.sql @@ -0,0 +1,2 @@ +{"simple", "select 1+1"}. +{"increment", "select $1+1"}. \ No newline at end of file diff --git a/src/pgapp.erl b/src/pgapp.erl index fdc7989..69a726b 100644 --- a/src/pgapp.erl +++ b/src/pgapp.erl @@ -9,7 +9,7 @@ -module(pgapp). %% API --export([connect/1, connect/2, equery/2, equery/3, squery/1, squery/2]). +-export([connect/1, connect/2, equery/2, equery/3, squery/1, squery/2, prepared_query/3]). %%%=================================================================== %%% API @@ -41,6 +41,15 @@ equery(PoolName, Sql, Params) -> gen_server:call(Worker, {equery, Sql, Params}) end). +-spec prepared_query(PoolName::atom(), Name::string, + Params :: list(epgsql:bind_param())) -> epgsql:reply(epgsql:equery_row()). +prepared_query(PoolName, Name, Params) -> + poolboy:transaction(PoolName, + fun(Worker) -> + gen_server:call(Worker, {prepared_query, Name, Params}) + end). + + -spec squery(Sql::epgsql:sql_query()) -> epgsql:reply(epgsql:squery_row()) | [epgsql:reply(epgsql:squery_row())]. squery(Sql) -> diff --git a/src/pgapp_sup.erl b/src/pgapp_sup.erl index b135376..8d399e7 100644 --- a/src/pgapp_sup.erl +++ b/src/pgapp_sup.erl @@ -2,6 +2,8 @@ -behaviour(supervisor). +-include("worker_args.hrl"). + %% API -export([start_link/0, add_pool/3]). @@ -24,13 +26,30 @@ start_link() -> init([]) -> {ok, Pools} = application:get_env(pgapp, pools), - PoolSpec = lists:map(fun ({PoolName, SizeArgs, WorkerArgs}) -> - PoolArgs = [{name, {local, PoolName}}, - {worker_module, pgapp_worker}] ++ SizeArgs, - poolboy:child_spec(PoolName, PoolArgs, WorkerArgs) + PoolSpec = lists:map(fun ({PoolName, SizeArgs, ConnectionArgs}) -> + make_child_spec(PoolName, SizeArgs, ConnectionArgs, undefined); + ({PoolName, SizeArgs, ConnectionArgs, SQLFile}) -> + make_child_spec(PoolName, SizeArgs, ConnectionArgs, SQLFile) end, Pools), {ok, { {one_for_one, 10, 10}, PoolSpec} }. -add_pool(Name, PoolArgs, WorkerArgs) -> - ChildSpec = poolboy:child_spec(Name, PoolArgs, WorkerArgs), - supervisor:start_child(?MODULE, ChildSpec). +make_child_spec(PoolName, SizeArgs, ConnectionArgs, SQLFile) -> + PoolArgs = [{name, {local, PoolName}}, + {worker_module, pgapp_worker}] ++ SizeArgs, + poolboy:child_spec(PoolName, PoolArgs, make_worker_args(ConnectionArgs, SQLFile)). + +make_worker_args(ConnectionArgs, SQLFileName) -> + SQL = case SQLFileName of + undefined -> []; + File -> + {ok, PreparedSQL} = file:consult(File), + PreparedSQL + end, + #worker_args{connection_args = ConnectionArgs, prepared_sql = SQL}. + +add_pool(Name, PoolArgs, ConnectionArgs) -> + add_pool(Name, PoolArgs, ConnectionArgs, undefined). + +add_pool(Name, PoolArgs, ConnectionArgs, SQLFile) -> + ChildSpec = poolboy:child_spec(Name, PoolArgs, make_worker_args(ConnectionArgs, SQLFile)), + supervisor:start_child(?MODULE, ChildSpec). diff --git a/src/pgapp_worker.erl b/src/pgapp_worker.erl index 5a8412c..a3ee3d1 100644 --- a/src/pgapp_worker.erl +++ b/src/pgapp_worker.erl @@ -16,23 +16,51 @@ -record(state, {conn::pid(), delay::pos_integer(), timer::timer:tref(), - start_args::proplists:proplist()}). + start_args::proplists:proplist(), + sql_text::proplists:proplist(), + sql_statements::ets:tid()}). -define(INITIAL_DELAY, 500). % Half a second -define(MAXIMUM_DELAY, 5 * 60 * 1000). % Five minutes + +-include_lib("epgsql/include/epgsql.hrl"). +-include("worker_args.hrl"). + start_link(Args) -> gen_server:start_link(?MODULE, Args, []). -init(Args) -> +init(#worker_args{connection_args = Args, prepared_sql = PreparedSQL}) -> process_flag(trap_exit, true), - {ok, connect(#state{start_args = Args, delay = ?INITIAL_DELAY})}. + {ok, connect(#state{ + start_args = Args, + sql_text = PreparedSQL, + delay = ?INITIAL_DELAY, + sql_statements = ets:new(prepared_sql, [private, {keypos, #statement.name}]) + })}. handle_call({squery, Sql}, _From, #state{conn=Conn} = State) when Conn /= undefined -> {reply, epgsql:squery(Conn, Sql), State}; handle_call({equery, Sql, Params}, _From, #state{conn = Conn} = State) when Conn /= undefined -> - {reply, epgsql:equery(Conn, Sql, Params), State}. + {reply, epgsql:equery(Conn, Sql, Params), State}; + +handle_call({prepared_query, Name, Params}, _From, #state{conn = Conn} = State) when Conn /= undefined -> + Statements = State#state.sql_statements, + case ets:lookup(Statements, Name) of + [Statement] -> + case epgsql:bind(Conn, Statement, Params) of + ok -> + {reply, epgsql:execute(Conn, Statement), State}; + Error -> + {reply, Error, State} + end; + _ -> + error_logger:error_msg("SQL statement ~p is not found", [Name]), + {reply, {error, sql_not_found}, State} + end. + + handle_cast(reconnect, State) -> {noreply, connect(State)}. @@ -58,13 +86,31 @@ handle_info({'EXIT', From, Reason}, State) -> [self(), From, Reason, NewDelay]), {noreply, State#state{conn = undefined, delay = NewDelay, timer = Tref}}. -terminate(_Reason, #state{conn=Conn}) -> +terminate(_Reason, #state{conn=Conn, sql_statements = Statements}) -> + ets:delete(Statements), ok = epgsql:close(Conn), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. + +prepare_statements(Con, PreparedSQL, Statements) -> + ets:delete_all_objects(Statements), + Results = lists:map( + fun({Name, Query}) -> + case epgsql:parse(Con, Name, Query, []) of + {ok, Statement} -> + ets:insert(Statements, Statement); + {error, Reason} -> + error_logger:error_msg("Error ~p parsing SQL query ~p", [Reason, Query]), + false + end + end, + PreparedSQL + ), + lists:all(fun(E) -> E end, Results). + connect(State) -> Args = State#state.start_args, Hostname = proplists:get_value(host, Args), @@ -77,19 +123,34 @@ connect(State) -> "~p Connected to ~s at ~s with user ~s: ~p~n", [self(), Database, Hostname, Username, Conn]), timer:cancel(State#state.timer), - State#state{conn=Conn, delay=?INITIAL_DELAY, timer = undefined}; + case prepare_statements(Conn, State#state.sql_text, State#state.sql_statements) of + true -> + State#state{conn=Conn, delay=?INITIAL_DELAY, timer = undefined}; + false -> + ok = epgsql:close(Conn), + NewState = handle_connection_error(State), + error_logger:error_msg( + "~p Unable to prepare statements on ~s at ~s with user ~s " + "- attempting reconnect in ~p ms~n", + [self(), Database, Hostname, Username, NewState#state.delay]), + NewState + end; Error -> - NewDelay = calculate_delay(State#state.delay), + NewState = handle_connection_error(State), error_logger:warning_msg( "~p Unable to connect to ~s at ~s with user ~s (~p) " "- attempting reconnect in ~p ms~n", - [self(), Database, Hostname, Username, Error, NewDelay]), - {ok, Tref} = - timer:apply_after( - State#state.delay, gen_server, cast, [self(), reconnect]), - State#state{conn=undefined, delay = NewDelay, timer = Tref} + [self(), Database, Hostname, Username, Error, NewState#state.delay]), + NewState end. +handle_connection_error(#state{delay = Delay} = State) -> + NewDelay = calculate_delay(Delay), + {ok, Tref} = + timer:apply_after( + Delay, gen_server, cast, [self(), reconnect]), + State#state{conn=undefined, delay = NewDelay, timer = Tref}. + calculate_delay(Delay) when (Delay * 2) >= ?MAXIMUM_DELAY -> ?MAXIMUM_DELAY; calculate_delay(Delay) -> diff --git a/src/worker_args.hrl b/src/worker_args.hrl new file mode 100644 index 0000000..40f81a7 --- /dev/null +++ b/src/worker_args.hrl @@ -0,0 +1,6 @@ +-record( + worker_args, + { + prepared_sql, + connection_args + }). \ No newline at end of file From ba407713aaba2ce63db04162246391f8dc3d8b41 Mon Sep 17 00:00:00 2001 From: Eugene Shubin Date: Wed, 12 Aug 2015 09:51:31 +0200 Subject: [PATCH 2/4] use describe to obtain statement by name from SQL server --- src/pgapp_worker.erl | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/src/pgapp_worker.erl b/src/pgapp_worker.erl index a3ee3d1..7c4b3f0 100644 --- a/src/pgapp_worker.erl +++ b/src/pgapp_worker.erl @@ -17,8 +17,7 @@ delay::pos_integer(), timer::timer:tref(), start_args::proplists:proplist(), - sql_text::proplists:proplist(), - sql_statements::ets:tid()}). + sql_text::proplists:proplist()}). -define(INITIAL_DELAY, 500). % Half a second -define(MAXIMUM_DELAY, 5 * 60 * 1000). % Five minutes @@ -35,8 +34,7 @@ init(#worker_args{connection_args = Args, prepared_sql = PreparedSQL}) -> {ok, connect(#state{ start_args = Args, sql_text = PreparedSQL, - delay = ?INITIAL_DELAY, - sql_statements = ets:new(prepared_sql, [private, {keypos, #statement.name}]) + delay = ?INITIAL_DELAY })}. handle_call({squery, Sql}, _From, #state{conn=Conn} = State) when Conn /= undefined -> @@ -46,18 +44,16 @@ handle_call({equery, Sql, Params}, _From, #state{conn = Conn} = State) when Conn {reply, epgsql:equery(Conn, Sql, Params), State}; handle_call({prepared_query, Name, Params}, _From, #state{conn = Conn} = State) when Conn /= undefined -> - Statements = State#state.sql_statements, - case ets:lookup(Statements, Name) of - [Statement] -> + case epgsql:describe(Conn, statement, Name) of + {ok, Statement} -> case epgsql:bind(Conn, Statement, Params) of ok -> {reply, epgsql:execute(Conn, Statement), State}; Error -> {reply, Error, State} end; - _ -> - error_logger:error_msg("SQL statement ~p is not found", [Name]), - {reply, {error, sql_not_found}, State} + Err -> + {reply, Err, State} end. @@ -86,8 +82,7 @@ handle_info({'EXIT', From, Reason}, State) -> [self(), From, Reason, NewDelay]), {noreply, State#state{conn = undefined, delay = NewDelay, timer = Tref}}. -terminate(_Reason, #state{conn=Conn, sql_statements = Statements}) -> - ets:delete(Statements), +terminate(_Reason, #state{conn=Conn}) -> ok = epgsql:close(Conn), ok. @@ -95,21 +90,19 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -prepare_statements(Con, PreparedSQL, Statements) -> - ets:delete_all_objects(Statements), - Results = lists:map( +prepare_statements(Con, PreparedSQL) -> + lists:all( fun({Name, Query}) -> case epgsql:parse(Con, Name, Query, []) of - {ok, Statement} -> - ets:insert(Statements, Statement); + {ok, _Statement} -> + true; {error, Reason} -> error_logger:error_msg("Error ~p parsing SQL query ~p", [Reason, Query]), false end end, PreparedSQL - ), - lists:all(fun(E) -> E end, Results). + ). connect(State) -> Args = State#state.start_args, @@ -123,7 +116,7 @@ connect(State) -> "~p Connected to ~s at ~s with user ~s: ~p~n", [self(), Database, Hostname, Username, Conn]), timer:cancel(State#state.timer), - case prepare_statements(Conn, State#state.sql_text, State#state.sql_statements) of + case prepare_statements(Conn, State#state.sql_text) of true -> State#state{conn=Conn, delay=?INITIAL_DELAY, timer = undefined}; false -> From 74b0ff4c5c0a84de0dbccdf941c2367a0273b3b8 Mon Sep 17 00:00:00 2001 From: Eugene Shubin Date: Tue, 25 Aug 2015 17:15:00 +0200 Subject: [PATCH 3/4] prepared_query function with default pool name --- src/pgapp.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/pgapp.erl b/src/pgapp.erl index 69a726b..108e936 100644 --- a/src/pgapp.erl +++ b/src/pgapp.erl @@ -9,7 +9,8 @@ -module(pgapp). %% API --export([connect/1, connect/2, equery/2, equery/3, squery/1, squery/2, prepared_query/3]). +-export([connect/1, connect/2, equery/2, equery/3, squery/1, squery/2, + prepared_query/3, prepared_query/2]). %%%=================================================================== %%% API @@ -41,6 +42,11 @@ equery(PoolName, Sql, Params) -> gen_server:call(Worker, {equery, Sql, Params}) end). +-spec prepared_query(Name::string,Params :: list(epgsql:bind_param())) -> + epgsql:reply(epgsql:equery_row()). +prepared_query(Name, Params) -> + prepared_query(epgsql_pool, Name, Params). + -spec prepared_query(PoolName::atom(), Name::string, Params :: list(epgsql:bind_param())) -> epgsql:reply(epgsql:equery_row()). prepared_query(PoolName, Name, Params) -> From dc1fdaf92ac3f86431ac5a133e2fda7b9f0375d1 Mon Sep 17 00:00:00 2001 From: Eugene Shubin Date: Wed, 2 Sep 2015 09:33:22 +0200 Subject: [PATCH 4/4] sync after execute statement. --- src/pgapp_worker.erl | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/pgapp_worker.erl b/src/pgapp_worker.erl index 7c4b3f0..4b3b672 100644 --- a/src/pgapp_worker.erl +++ b/src/pgapp_worker.erl @@ -44,17 +44,25 @@ handle_call({equery, Sql, Params}, _From, #state{conn = Conn} = State) when Conn {reply, epgsql:equery(Conn, Sql, Params), State}; handle_call({prepared_query, Name, Params}, _From, #state{conn = Conn} = State) when Conn /= undefined -> - case epgsql:describe(Conn, statement, Name) of + FinRepl = case epgsql:describe(Conn, statement, Name) of {ok, Statement} -> case epgsql:bind(Conn, Statement, Params) of ok -> - {reply, epgsql:execute(Conn, Statement), State}; - Error -> - {reply, Error, State} + case epgsql:execute(Conn, Statement) of + {ok, _} = R -> + case epgsql:sync(Conn) of + ok -> R; + E -> E + end; + Er -> Er + end; + Error -> Error end; - Err -> - {reply, Err, State} - end. + Err -> Err + end, + {reply, FinRepl, State}. + +