diff --git a/pgapp.config.sample b/pgapp.config.sample index bfdf587..f21e192 100644 --- a/pgapp.config.sample +++ b/pgapp.config.sample @@ -19,7 +19,19 @@ {database, "db2"}, {username, "user"}, {password, "pass"} - ]} + ]}, + {pool3, [ + {size, 10}, + {max_overflow, 20} + ], + [ + {host, "localhost"}, + {database, "db3"}, + {username, "user"}, + {password, "pass"} + ], + "priv/prepared.sql"}, + ]} ] }]. diff --git a/priv/empty.sql b/priv/empty.sql new file mode 100644 index 0000000..e69de29 diff --git a/priv/prepared.sql b/priv/prepared.sql new file mode 100644 index 0000000..0907f81 --- /dev/null +++ b/priv/prepared.sql @@ -0,0 +1,2 @@ +{"simple", "select 1+1"}. +{"increment", "select $1+1"}. \ No newline at end of file diff --git a/src/pgapp.erl b/src/pgapp.erl index fdc7989..69a726b 100644 --- a/src/pgapp.erl +++ b/src/pgapp.erl @@ -9,7 +9,7 @@ -module(pgapp). %% API --export([connect/1, connect/2, equery/2, equery/3, squery/1, squery/2]). +-export([connect/1, connect/2, equery/2, equery/3, squery/1, squery/2, prepared_query/3]). %%%=================================================================== %%% API @@ -41,6 +41,15 @@ equery(PoolName, Sql, Params) -> gen_server:call(Worker, {equery, Sql, Params}) end). +-spec prepared_query(PoolName::atom(), Name::string, + Params :: list(epgsql:bind_param())) -> epgsql:reply(epgsql:equery_row()). +prepared_query(PoolName, Name, Params) -> + poolboy:transaction(PoolName, + fun(Worker) -> + gen_server:call(Worker, {prepared_query, Name, Params}) + end). + + -spec squery(Sql::epgsql:sql_query()) -> epgsql:reply(epgsql:squery_row()) | [epgsql:reply(epgsql:squery_row())]. squery(Sql) -> diff --git a/src/pgapp_sup.erl b/src/pgapp_sup.erl index b135376..8d399e7 100644 --- a/src/pgapp_sup.erl +++ b/src/pgapp_sup.erl @@ -2,6 +2,8 @@ -behaviour(supervisor). +-include("worker_args.hrl"). + %% API -export([start_link/0, add_pool/3]). @@ -24,13 +26,30 @@ start_link() -> init([]) -> {ok, Pools} = application:get_env(pgapp, pools), - PoolSpec = lists:map(fun ({PoolName, SizeArgs, WorkerArgs}) -> - PoolArgs = [{name, {local, PoolName}}, - {worker_module, pgapp_worker}] ++ SizeArgs, - poolboy:child_spec(PoolName, PoolArgs, WorkerArgs) + PoolSpec = lists:map(fun ({PoolName, SizeArgs, ConnectionArgs}) -> + make_child_spec(PoolName, SizeArgs, ConnectionArgs, undefined); + ({PoolName, SizeArgs, ConnectionArgs, SQLFile}) -> + make_child_spec(PoolName, SizeArgs, ConnectionArgs, SQLFile) end, Pools), {ok, { {one_for_one, 10, 10}, PoolSpec} }. -add_pool(Name, PoolArgs, WorkerArgs) -> - ChildSpec = poolboy:child_spec(Name, PoolArgs, WorkerArgs), - supervisor:start_child(?MODULE, ChildSpec). +make_child_spec(PoolName, SizeArgs, ConnectionArgs, SQLFile) -> + PoolArgs = [{name, {local, PoolName}}, + {worker_module, pgapp_worker}] ++ SizeArgs, + poolboy:child_spec(PoolName, PoolArgs, make_worker_args(ConnectionArgs, SQLFile)). + +make_worker_args(ConnectionArgs, SQLFileName) -> + SQL = case SQLFileName of + undefined -> []; + File -> + {ok, PreparedSQL} = file:consult(File), + PreparedSQL + end, + #worker_args{connection_args = ConnectionArgs, prepared_sql = SQL}. + +add_pool(Name, PoolArgs, ConnectionArgs) -> + add_pool(Name, PoolArgs, ConnectionArgs, undefined). + +add_pool(Name, PoolArgs, ConnectionArgs, SQLFile) -> + ChildSpec = poolboy:child_spec(Name, PoolArgs, make_worker_args(ConnectionArgs, SQLFile)), + supervisor:start_child(?MODULE, ChildSpec). diff --git a/src/pgapp_worker.erl b/src/pgapp_worker.erl index 5a8412c..7c4b3f0 100644 --- a/src/pgapp_worker.erl +++ b/src/pgapp_worker.erl @@ -16,23 +16,47 @@ -record(state, {conn::pid(), delay::pos_integer(), timer::timer:tref(), - start_args::proplists:proplist()}). + start_args::proplists:proplist(), + sql_text::proplists:proplist()}). -define(INITIAL_DELAY, 500). % Half a second -define(MAXIMUM_DELAY, 5 * 60 * 1000). % Five minutes + +-include_lib("epgsql/include/epgsql.hrl"). +-include("worker_args.hrl"). + start_link(Args) -> gen_server:start_link(?MODULE, Args, []). -init(Args) -> +init(#worker_args{connection_args = Args, prepared_sql = PreparedSQL}) -> process_flag(trap_exit, true), - {ok, connect(#state{start_args = Args, delay = ?INITIAL_DELAY})}. + {ok, connect(#state{ + start_args = Args, + sql_text = PreparedSQL, + delay = ?INITIAL_DELAY + })}. handle_call({squery, Sql}, _From, #state{conn=Conn} = State) when Conn /= undefined -> {reply, epgsql:squery(Conn, Sql), State}; handle_call({equery, Sql, Params}, _From, #state{conn = Conn} = State) when Conn /= undefined -> - {reply, epgsql:equery(Conn, Sql, Params), State}. + {reply, epgsql:equery(Conn, Sql, Params), State}; + +handle_call({prepared_query, Name, Params}, _From, #state{conn = Conn} = State) when Conn /= undefined -> + case epgsql:describe(Conn, statement, Name) of + {ok, Statement} -> + case epgsql:bind(Conn, Statement, Params) of + ok -> + {reply, epgsql:execute(Conn, Statement), State}; + Error -> + {reply, Error, State} + end; + Err -> + {reply, Err, State} + end. + + handle_cast(reconnect, State) -> {noreply, connect(State)}. @@ -65,6 +89,21 @@ terminate(_Reason, #state{conn=Conn}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +prepare_statements(Con, PreparedSQL) -> + lists:all( + fun({Name, Query}) -> + case epgsql:parse(Con, Name, Query, []) of + {ok, _Statement} -> + true; + {error, Reason} -> + error_logger:error_msg("Error ~p parsing SQL query ~p", [Reason, Query]), + false + end + end, + PreparedSQL + ). + connect(State) -> Args = State#state.start_args, Hostname = proplists:get_value(host, Args), @@ -77,19 +116,34 @@ connect(State) -> "~p Connected to ~s at ~s with user ~s: ~p~n", [self(), Database, Hostname, Username, Conn]), timer:cancel(State#state.timer), - State#state{conn=Conn, delay=?INITIAL_DELAY, timer = undefined}; + case prepare_statements(Conn, State#state.sql_text) of + true -> + State#state{conn=Conn, delay=?INITIAL_DELAY, timer = undefined}; + false -> + ok = epgsql:close(Conn), + NewState = handle_connection_error(State), + error_logger:error_msg( + "~p Unable to prepare statements on ~s at ~s with user ~s " + "- attempting reconnect in ~p ms~n", + [self(), Database, Hostname, Username, NewState#state.delay]), + NewState + end; Error -> - NewDelay = calculate_delay(State#state.delay), + NewState = handle_connection_error(State), error_logger:warning_msg( "~p Unable to connect to ~s at ~s with user ~s (~p) " "- attempting reconnect in ~p ms~n", - [self(), Database, Hostname, Username, Error, NewDelay]), - {ok, Tref} = - timer:apply_after( - State#state.delay, gen_server, cast, [self(), reconnect]), - State#state{conn=undefined, delay = NewDelay, timer = Tref} + [self(), Database, Hostname, Username, Error, NewState#state.delay]), + NewState end. +handle_connection_error(#state{delay = Delay} = State) -> + NewDelay = calculate_delay(Delay), + {ok, Tref} = + timer:apply_after( + Delay, gen_server, cast, [self(), reconnect]), + State#state{conn=undefined, delay = NewDelay, timer = Tref}. + calculate_delay(Delay) when (Delay * 2) >= ?MAXIMUM_DELAY -> ?MAXIMUM_DELAY; calculate_delay(Delay) -> diff --git a/src/worker_args.hrl b/src/worker_args.hrl new file mode 100644 index 0000000..40f81a7 --- /dev/null +++ b/src/worker_args.hrl @@ -0,0 +1,6 @@ +-record( + worker_args, + { + prepared_sql, + connection_args + }). \ No newline at end of file