Skip to content

Commit

Permalink
Expose pgapp connected workers and fix dialyzer errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rymir committed Oct 25, 2015
1 parent 4e89f39 commit 83ecfc9
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 17 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ ebin/
*~
deps/
*.config
*.plt
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{deps,
[
{epgsql, ".*", {git, "git://github.com/epgsql/epgsql.git", {tag, "3.1.0"}}},
{epgsql, ".*", {git, "git://github.com/epgsql/epgsql.git", "792a93c"}},
{poolboy, ".*", {git, "git://github.com/devinus/poolboy.git", {tag, "1.4.2"}}}
]}.
21 changes: 17 additions & 4 deletions src/pgapp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@
-module(pgapp).

%% API
-export([connect/1, connect/2,
equery/2, equery/3, equery/4,
squery/1, squery/2, squery/3,
with_transaction/1, with_transaction/2, with_transaction/3]).
-export([connect/1, connect/2]).
-export([equery/2, equery/3, equery/4]).
-export([squery/1, squery/2, squery/3]).
-export([with_transaction/1, with_transaction/2, with_transaction/3]).
-export([connected_workers/1]).

%%%===================================================================
%%% API
%%%===================================================================

-spec connect(Settings :: list()) -> {ok, WorkerPid} when
WorkerPid :: pid().
connect(Settings) ->
connect(epgsql_pool, Settings).

-spec connect(PoolName :: atom(),
Settings :: list()) -> {ok, WorkerPid} when
WorkerPid :: pid().
connect(PoolName, Settings) ->
PoolSize = proplists:get_value(size, Settings, 5),
MaxOverflow = proplists:get_value(max_overflow, Settings, 5),
Expand Down Expand Up @@ -89,6 +95,13 @@ with_transaction(PoolName, Fun) when is_function(Fun, 0) ->
with_transaction(PoolName, Fun, Timeout) when is_function(Fun, 0) ->
pgapp_worker:with_transaction(PoolName, Fun, Timeout).

-spec connected_workers(PoolName :: atom()) -> {ok, ConnectedWorkerPids} when
ConnectedWorkerPids :: list(pid()).
connected_workers(PoolName) ->
WorkerPids = gen_server:call(PoolName, get_avail_workers),
{ok, [WorkerPid || WorkerPid <- WorkerPids,
pgapp_worker:is_connected(WorkerPid)]}.

%%--------------------------------------------------------------------
%% @doc
%% @spec
Expand Down
37 changes: 25 additions & 12 deletions src/pgapp_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
-behaviour(gen_server).
-behaviour(poolboy_worker).

-export([squery/1, squery/2, squery/3,
equery/2, equery/3, equery/4,
with_transaction/2, with_transaction/3]).
-export([squery/1, squery/2, squery/3]).
-export([equery/2, equery/3, equery/4]).
-export([with_transaction/2, with_transaction/3]).
-export([is_connected/1]).

-export([start_link/1]).

Expand All @@ -29,34 +30,37 @@
-define(STATE_VAR, '$pgapp_state').

squery(Sql) ->
squery(Sql, ?TIMEOUT).

squery(PoolName, Sql) when is_atom(PoolName) ->
squery(PoolName, Sql, ?TIMEOUT);
squery(Sql, Timeout) ->
case get(?STATE_VAR) of
undefined ->
squery(epgsql_pool, Sql);
squery(epgsql_pool, Sql, Timeout);
Conn ->
epgsql:squery(Conn, Sql)
end.

squery(PoolName, Sql) ->
squery(PoolName, Sql, ?TIMEOUT).

squery(PoolName, Sql, Timeout) ->
poolboy:transaction(PoolName,
fun (Worker) ->
gen_server:call(Worker, {squery, Sql}, Timeout)
end, Timeout).


equery(Sql, Params) ->
equery(Sql, Params, ?TIMEOUT).

equery(PoolName, Sql, Params) when is_atom(PoolName) ->
equery(PoolName, Sql, Params, ?TIMEOUT);
equery(Sql, Params, Timeout) ->
case get(?STATE_VAR) of
undefined ->
equery(epgsql_pool, Sql, Params);
equery(epgsql_pool, Sql, Params, Timeout);
Conn ->
epgsql:equery(Conn, Sql, Params)
end.

equery(PoolName, Sql, Params) ->
equery(PoolName, Sql, Params, ?TIMEOUT).

equery(PoolName, Sql, Params, Timeout) ->
poolboy:transaction(PoolName,
fun (Worker) ->
Expand All @@ -74,13 +78,22 @@ with_transaction(PoolName, Fun, Timeout) ->
{transaction, Fun}, Timeout)
end, Timeout).

is_connected(WorkerPid) ->
gen_server:call(WorkerPid, {is_connected}, ?TIMEOUT).

start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).

init(Args) ->
process_flag(trap_exit, true),
{ok, connect(#state{start_args = Args, delay = ?INITIAL_DELAY})}.

handle_call({is_connected}, _From, #state{conn = undefined} = State) ->
{reply, false, State};
handle_call({is_connected}, _From, #state{conn = _Conn} = State) ->
{reply, true, State};
handle_call(_Query, _From, #state{conn = undefined} = State) ->
{reply, {error, disconnected}, State};
handle_call({squery, Sql}, _From,
#state{conn=Conn} = State) when Conn /= undefined ->
{reply, epgsql:squery(Conn, Sql), State};
Expand Down

0 comments on commit 83ecfc9

Please sign in to comment.