Skip to content

Commit

Permalink
Fix checking the current PoolName
Browse files Browse the repository at this point in the history
  • Loading branch information
lukyanov committed Aug 27, 2017
1 parent 347ee4e commit 8a82880
Showing 1 changed file with 36 additions and 28 deletions.
64 changes: 36 additions & 28 deletions src/pgapp_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,55 +26,70 @@
-define(MAXIMUM_DELAY, 5 * 60 * 1000). % Five minutes
-define(TIMEOUT, 5 * 1000).

-define(TX_CONNECTION_VAR(PoolName), {'$pgapp_tx_connection', PoolName}).
-define(STATE_VAR, '$pgapp_state').

squery(Sql) ->
squery(epgsql_pool, Sql).
case get(?STATE_VAR) of
undefined ->
squery(epgsql_pool, Sql);
{_PoolName, Conn} ->
epgsql:squery(Conn, Sql)
end.

squery(PoolName, Sql) when is_atom(PoolName) ->
squery(PoolName, Sql, ?TIMEOUT);
squery(Sql, Timeout) ->
squery(epgsql_pool, Sql, Timeout).

squery(PoolName, Sql, Timeout) ->
case get(?TX_CONNECTION_VAR(PoolName)) of
undefined ->
case get(?STATE_VAR) of
{PoolName, Conn} ->
epgsql:squery(Conn, Sql);
_ ->
middle_man_transaction(PoolName,
fun (W) ->
gen_server:call(W, {squery, Sql}, Timeout)
end, Timeout);
Conn ->
epgsql:squery(Conn, Sql)
end, Timeout)
end.

equery(Sql, Params) ->
equery(epgsql_pool, Sql, Params).
case get(?STATE_VAR) of
undefined ->
equery(epgsql_pool, Sql, Params);
{_PoolName, Conn} ->
epgsql:equery(Conn, Sql, Params)
end.

equery(PoolName, Sql, Params) when is_atom(PoolName) ->
equery(PoolName, Sql, Params, ?TIMEOUT);
equery(Sql, Params, Timeout) ->
equery(epgsql_pool, Sql, Params, Timeout).

equery(PoolName, Sql, Params, Timeout) ->
case get(?TX_CONNECTION_VAR(PoolName)) of
undefined ->
case get(?STATE_VAR) of
{PoolName, Conn} ->
epgsql:equery(Conn, Sql, Params);
_ ->
middle_man_transaction(PoolName,
fun (W) ->
gen_server:call(W, {equery, Sql, Params}, Timeout)
end, Timeout);
Conn ->
epgsql:equery(Conn, Sql, Params)
end, Timeout)
end.

with_transaction(PoolName, Fun) ->
with_transaction(PoolName, Fun, ?TIMEOUT).

with_transaction(PoolName, Fun, Timeout) ->
middle_man_transaction(PoolName,
fun (W) ->
gen_server:call(W, {transaction, PoolName, Fun},
Timeout)
end, Timeout).
case get(?STATE_VAR) of
{PoolName, _Conn} ->
Fun();
_ ->
middle_man_transaction(PoolName,
fun (W) ->
gen_server:call(W, {transaction, PoolName, Fun},
Timeout)
end, Timeout)
end.

middle_man_transaction(Pool, Fun, Timeout) ->
Tag = make_ref(),
Expand Down Expand Up @@ -111,16 +126,9 @@ handle_call({equery, Sql, Params}, _From,
{reply, epgsql:equery(Conn, Sql, Params), State};
handle_call({transaction, PoolName, Fun}, _From,
#state{conn = Conn} = State) ->
Result = case get(?TX_CONNECTION_VAR(PoolName)) of
undefined ->
put(?TX_CONNECTION_VAR(PoolName), Conn),
Res = epgsql:with_transaction(Conn, fun(_) -> Fun() end),
erase(?TX_CONNECTION_VAR(PoolName)),
Res;
_ ->
% transaction is already in progress
Fun()
end,
put(?STATE_VAR, {PoolName, Conn}),
Result = epgsql:with_transaction(Conn, fun(_) -> Fun() end),
erase(?STATE_VAR),
{reply, Result, State}.

handle_cast(reconnect, State) ->
Expand Down

0 comments on commit 8a82880

Please sign in to comment.