diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl index e7cd85d091..135a2eaefb 100644 --- a/src/couch/include/couch_db.hrl +++ b/src/couch/include/couch_db.hrl @@ -231,6 +231,12 @@ atts = [] }). +-record(couch_lru, { + count=0, + updates, + counts, + close_fun +}). -type doc() :: #doc{}. -type ddoc() :: #doc{}. diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl index b79286ee0f..91ffdccd62 100644 --- a/src/couch/src/couch_lru.erl +++ b/src/couch/src/couch_lru.erl @@ -11,34 +11,36 @@ % the License. -module(couch_lru). --export([new/0, insert/2, update/2, close/1]). +-export([new/1, insert/2, update/2, close/1]). -include_lib("couch/include/couch_db.hrl"). -new() -> +new(CloseFun) -> Updates = ets:new(couch_lru_updates, [ordered_set]), - Dbs = ets:new(couch_lru_dbs, [set]), - {0, Updates, Dbs}. + Counts = ets:new(couch_lru_counts, [set]), + #couch_lru{updates=Updates, counts=Counts, close_fun=CloseFun}. -insert(DbName, {Count, Updates, Dbs}) -> - update(DbName, {Count, Updates, Dbs}). +insert(Name, Lru) -> + update(Name, Lru). -update(DbName, {Count, Updates, Dbs}) -> - case ets:lookup(Dbs, DbName) of +update(Name, Lru) -> + #couch_lru{counts=Counts, updates=Updates, count=Count} = Lru, + case ets:lookup(Counts, Name) of [] -> - true = ets:insert(Dbs, {DbName, Count}); - [{DbName, OldCount}] -> - true = ets:update_element(Dbs, DbName, {2, Count}), - true = ets:delete(Updates, {OldCount, DbName}) + true = ets:insert(Counts, {Name, Count}); + [{Name, OldCount}] -> + true = ets:update_element(Counts, Name, {2, Count}), + true = ets:delete(Updates, {OldCount, Name}) end, - true = ets:insert(Updates, {{Count, DbName}}), - {Count + 1, Updates, Dbs}. + true = ets:insert(Updates, {{Count, Name}}), + Lru#couch_lru{count=Count+1}. -close({Count, Updates, Dbs}) -> - case close_int(ets:next(Updates, {-1, <<>>}), Updates, Dbs) of +close(Lru) -> + #couch_lru{updates=Updates} = Lru, + case close_int(ets:next(Updates, {-1, <<>>}), Lru) of true -> - {true, {Count, Updates, Dbs}}; + {true, Lru}; false -> false end. @@ -46,26 +48,19 @@ close({Count, Updates, Dbs}) -> %% internals -close_int('$end_of_table', _Updates, _Dbs) -> +close_int('$end_of_table', _Lru) -> false; -close_int({_Count, DbName} = Key, Updates, Dbs) -> - case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of - true -> - [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName), - case couch_db:is_idle(Db) of true -> - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Pid), - exit(Pid, kill), +close_int({_Count, Name} = Key, Lru) -> + #couch_lru{updates=Updates, counts=Counts, close_fun=CloseFun} = Lru, + {Stop, Remove} = CloseFun(Name), + case Remove of + true -> true = ets:delete(Updates, Key), - true = ets:delete(Dbs, DbName), - true; + true = ets:delete(Counts, Name); false -> - true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}), - couch_stats:increment_counter([couchdb, couch_server, lru_skip]), - close_int(ets:next(Updates, Key), Updates, Dbs) - end; - false -> - true = ets:delete(Updates, Key), - true = ets:delete(Dbs, DbName), - close_int(ets:next(Updates, Key), Updates, Dbs) + ok + end, + case Stop of + false -> close_int(ets:next(Updates, Key), Lru); + true -> true end. diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index 115230029e..53dc9e13a4 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -20,7 +20,7 @@ -export([init/1, handle_call/3,sup_start_link/0]). -export([handle_cast/2,code_change/3,handle_info/2,terminate/2]). -export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]). --export([close_lru/0]). +-export([close_lru/0, maybe_close_db/1]). % config_listener api -export([handle_config_change/5, handle_config_terminate/3]). @@ -36,7 +36,7 @@ dbs_open=0, start_time="", update_lru_on_read=true, - lru = couch_lru:new() + lru = couch_lru:new(fun maybe_close_db/1) }). dev_start() -> @@ -97,6 +97,26 @@ update_lru(DbName) -> close_lru() -> gen_server:call(couch_server, close_lru). +maybe_close_db(DbName) -> + % First element of return indicates if we should stop closing DBs from the + % LRU and the second element indicates if we closed a DB. + case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of + true -> + [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName), + case couch_db:is_idle(Db) of true -> + true = ets:delete(couch_dbs, DbName), + true = ets:delete(couch_dbs_pid_to_name, Pid), + exit(Pid, kill), + {true, true}; + false -> + true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}), + couch_stats:increment_counter([couchdb, couch_server, lru_skip]), + {false, false} + end; + false -> + {false, true} + end. + create(DbName, Options0) -> Options = maybe_add_sys_db_callbacks(DbName, Options0), case gen_server:call(couch_server, {create, DbName, Options}, infinity) of diff --git a/src/couch/test/couch_lru_tests.erl b/src/couch/test/couch_lru_tests.erl index 15598358f0..02a131ca89 100644 --- a/src/couch/test/couch_lru_tests.erl +++ b/src/couch/test/couch_lru_tests.erl @@ -20,7 +20,7 @@ setup() -> ok = meck:new(couch_db, [passthrough]), ets:new(couch_dbs, [set, public, named_table, {keypos, #db.name}]), ets:new(couch_dbs_pid_to_name, [set, public, named_table]), - couch_lru:new(). + couch_lru:new(fun couch_server:maybe_close_db/1). teardown(_) -> ets:delete(couch_dbs), @@ -29,18 +29,18 @@ teardown(_) -> new_test_() -> {setup, - fun() -> couch_lru:new() end, + fun() -> couch_lru:new(fun couch_server:maybe_close_db/1) end, fun(Lru) -> - ?_assertMatch({0, _, _}, Lru) + ?_assertEqual(0, Lru#couch_lru.count) end }. insert_test_() -> {setup, - fun() -> couch_lru:new() end, + fun() -> couch_lru:new(fun couch_server:maybe_close_db/1) end, fun(Lru) -> Key = <<"test">>, - {1, Updates, Dbs} = couch_lru:insert(Key, Lru), + #couch_lru{count=1, updates=Updates, counts=Dbs} = couch_lru:insert(Key, Lru), [ ?_assertEqual(1, ets_size(Dbs)), ?_assert(ets:member(Dbs, Key)), @@ -52,11 +52,11 @@ insert_test_() -> insert_same_test_() -> {setup, - fun() -> couch_lru:new() end, + fun() -> couch_lru:new(fun couch_server:maybe_close_db/1) end, fun(Lru) -> Key = <<"test">>, - Lru1 = {1, Updates, Dbs} = couch_lru:insert(Key, Lru), - {2, Updates, Dbs} = couch_lru:insert(Key, Lru1), + Lru1 = #couch_lru{count=1} = couch_lru:insert(Key, Lru), + #couch_lru{count=2, updates=Updates, counts=Dbs} = couch_lru:insert(Key, Lru1), [ ?_assertEqual(1, ets_size(Dbs)), ?_assert(ets:member(Dbs, Key)), @@ -68,11 +68,11 @@ insert_same_test_() -> update_test_() -> {setup, - fun() -> couch_lru:new() end, + fun() -> couch_lru:new(fun couch_server:maybe_close_db/1) end, fun(Lru) -> Key = <<"test">>, - Lru1 = {1, Updates, Dbs} = couch_lru:update(Key, Lru), - {2, Updates, Dbs} = couch_lru:update(Key, Lru1), + Lru1 = #couch_lru{count=1} = couch_lru:update(Key, Lru), + #couch_lru{count=2, updates=Updates, counts=Dbs} = couch_lru:update(Key, Lru1), [ ?_assertEqual(1, ets_size(Dbs)), ?_assert(ets:member(Dbs, Key)), @@ -90,7 +90,7 @@ close_test_() -> ok = meck:expect(couch_db, is_idle, 1, true), {ok, Lru1} = add_record(Lru, <<"test1">>, c:pid(0, 1001, 0)), {ok, Lru2} = add_record(Lru1, <<"test2">>, c:pid(0, 2001, 0)), - {true, {2, Updates, Dbs}} = couch_lru:close(Lru2), + {true, #couch_lru{count=2, updates=Updates, counts=Dbs}} = couch_lru:close(Lru2), [ ?_assertEqual(1, ets_size(Dbs)), ?_assert(ets:member(Dbs, <<"test2">>)), diff --git a/src/couch/test/couchdb_compaction_daemon_tests.erl b/src/couch/test/couchdb_compaction_daemon_tests.erl index 25d9b131ec..5af6a62c70 100644 --- a/src/couch/test/couchdb_compaction_daemon_tests.erl +++ b/src/couch/test/couchdb_compaction_daemon_tests.erl @@ -222,8 +222,9 @@ spawn_compaction_monitor(DbName) -> DbPid = couch_util:with_db(DbName, fun(Db) -> Db#db.main_pid end), - {ok, ViewPid} = couch_index_server:get_index(couch_mrview_index, + {ok, ViewPid, Mon} = couch_index_server:get_index(couch_mrview_index, DbName, <<"_design/foo">>), + couch_index_server:close(Mon), TestPid ! {self(), started}, receive {TestPid, go} -> ok diff --git a/src/couch/test/couchdb_views_tests.erl b/src/couch/test/couchdb_views_tests.erl index f1fddfc1b0..130be014cf 100644 --- a/src/couch/test/couchdb_views_tests.erl +++ b/src/couch/test/couchdb_views_tests.erl @@ -226,7 +226,7 @@ should_cleanup_all_index_files({DbName, {FooRev, BooRev}})-> couchdb_1138(DbName) -> ?_test(begin - {ok, IndexerPid} = couch_index_server:get_index( + {ok, IndexerPid, Mon} = couch_index_server:get_index( couch_mrview_index, DbName, <<"_design/foo">>), ?assert(is_pid(IndexerPid)), ?assert(is_process_alive(IndexerPid)), @@ -259,12 +259,14 @@ couchdb_1138(DbName) -> ?assertEqual(5, length(Rows2)), ?assertEqual(2, count_users(DbName)), - ?assert(is_process_alive(IndexerPid)) + ?assert(is_process_alive(IndexerPid)), + + couch_index_server:close(Mon) end). couchdb_1309(DbName) -> ?_test(begin - {ok, IndexerPid} = couch_index_server:get_index( + {ok, IndexerPid, Mon} = couch_index_server:get_index( couch_mrview_index, DbName, <<"_design/foo">>), ?assert(is_pid(IndexerPid)), ?assert(is_process_alive(IndexerPid)), @@ -281,7 +283,7 @@ couchdb_1309(DbName) -> ?assert(is_process_alive(IndexerPid)), update_design_doc(DbName, <<"_design/foo">>, <<"bar">>), - {ok, NewIndexerPid} = couch_index_server:get_index( + {ok, NewIndexerPid, NewMon} = couch_index_server:get_index( couch_mrview_index, DbName, <<"_design/foo">>), ?assert(is_pid(NewIndexerPid)), ?assert(is_process_alive(NewIndexerPid)), @@ -301,12 +303,15 @@ couchdb_1309(DbName) -> ?assertEqual(4, length(Rows2)), ok = stop_indexer( %% FIXME we need to grab monitor earlier - fun() -> ok end, + fun() -> couch_index_server:close(Mon) end, IndexerPid, ?LINE, "old view group is not dead after ddoc update"), ok = stop_indexer( - fun() -> couch_server:delete(DbName, [?ADMIN_USER]) end, + fun() -> + couch_index_server:close(NewMon), + couch_server:delete(DbName, [?ADMIN_USER]) + end, NewIndexerPid, ?LINE, "new view group did not die after DB deletion") end). @@ -363,9 +368,10 @@ couchdb_1283() -> %% because we need have access to compaction Pid, not a Ref. %% {ok, MonRef} = couch_mrview:compact(MDb1#db.name, <<"_design/foo">>, %% [monitor]), - {ok, Pid} = couch_index_server:get_index( + {ok, Pid, Mon} = couch_index_server:get_index( couch_mrview_index, MDb1#db.name, <<"_design/foo">>), {ok, CPid} = gen_server:call(Pid, compact), + couch_index_server:close(Mon), %% By suspending compaction process we ensure that compaction won't get %% finished too early to make get_writer_status assertion fail. erlang:suspend_process(CPid), diff --git a/src/couch_index/priv/stats_descriptions.cfg b/src/couch_index/priv/stats_descriptions.cfg new file mode 100644 index 0000000000..aac317fe62 --- /dev/null +++ b/src/couch_index/priv/stats_descriptions.cfg @@ -0,0 +1,20 @@ +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. + +% Style guide for descriptions: Start with a lowercase letter & do not add +% a trailing full-stop / period +% Please keep this in alphabetical order + +{[couchdb, couch_index_server, lru_skip], [ + {type, counter}, + {desc, <<"number of couch_index_server LRU operations skipped">>} +]}. diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl index c86f5e1222..99b5e8a5ac 100644 --- a/src/couch_index/src/couch_index.erl +++ b/src/couch_index/src/couch_index.erl @@ -240,8 +240,11 @@ handle_cast({new_state, NewIdxState}, State) -> couch_log:debug("Updated index for db: ~s idx: ~s seq: ~B", Args), Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState), case State#st.committed of - true -> erlang:send_after(commit_delay(), self(), commit); - false -> ok + true -> + ok = couch_index_server:set_committing(self(), true), + erlang:send_after(commit_delay(), self(), commit); + false -> + ok end, {noreply, State#st{ idx_state=NewIdxState, @@ -297,6 +300,7 @@ handle_info(commit, State) -> % Commit the updates ok = Mod:commit(IdxState), couch_event:notify(DbName, {index_commit, IdxName}), + ok = couch_index_server:set_committing(self(), false), {noreply, State#st{committed=true}}; _ -> % We can't commit the header because the database seq that's @@ -305,6 +309,7 @@ handle_info(commit, State) -> % forever out of sync with the database. But a crash before we % commit these changes, no big deal, we only lose incremental % changes since last committal. + ok = couch_index_server:set_committing(self(), true), erlang:send_after(commit_delay(), self(), commit), {noreply, State} end; @@ -385,8 +390,11 @@ commit_compacted(NewIdxState, State) -> false -> ok end, case State#st.committed of - true -> erlang:send_after(commit_delay(), self(), commit); - false -> ok + true -> + ok = couch_index_server:set_committing(self(), true), + erlang:send_after(commit_delay(), self(), commit); + false -> + ok end, State#st{ idx_state=NewIdxState1, diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl index 61f406c1a7..d224747167 100644 --- a/src/couch_index/src/couch_index_compactor.erl +++ b/src/couch_index/src/couch_index_compactor.erl @@ -64,12 +64,14 @@ handle_call({compact, _}, _From, #st{pid=Pid}=State) when is_pid(Pid) -> {reply, {ok, Pid}, State}; handle_call({compact, IdxState}, _From, #st{idx=Idx}=State) -> Pid = spawn_link(fun() -> compact(Idx, State#st.mod, IdxState) end), + ok = couch_index_server:set_compacting(Idx, true), {reply, {ok, Pid}, State#st{pid=Pid}}; handle_call(cancel, _From, #st{pid=undefined}=State) -> {reply, ok, State}; handle_call(cancel, _From, #st{pid=Pid}=State) -> unlink(Pid), exit(Pid, kill), + ok = couch_index_server:set_compacting(State#st.idx, false), {reply, ok, State#st{pid=undefined}}; handle_call(get_compacting_pid, _From, #st{pid=Pid}=State) -> {reply, {ok, Pid}, State}; @@ -84,6 +86,7 @@ handle_cast(_Mesg, State) -> handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) -> + ok = couch_index_server:set_compacting(State#st.idx, false), {noreply, State#st{pid=undefined}}; handle_info({'EXIT', Pid, Reason}, #st{pid = Pid} = State) -> #st{idx = Idx, mod = Mod} = State, @@ -92,11 +95,10 @@ handle_info({'EXIT', Pid, Reason}, #st{pid = Pid} = State) -> IdxName = Mod:get(idx_name, IdxState), Args = [DbName, IdxName, Reason], couch_log:error("Compaction failed for db: ~s idx: ~s reason: ~p", Args), + ok = couch_index_server:set_compacting(State#st.idx, false), {noreply, State#st{pid = undefined}}; handle_info({'EXIT', _Pid, normal}, State) -> {noreply, State}; -handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) -> - {stop, normal, State}; handle_info(_Mesg, State) -> {stop, unknown_info, State}. diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl index 4e86f5e80d..7ec91c3b00 100644 --- a/src/couch_index/src/couch_index_server.erl +++ b/src/couch_index/src/couch_index_server.erl @@ -16,7 +16,8 @@ -vsn(2). --export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]). +-export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2, close/1]). +-export([set_committing/2, set_compacting/2]). -export([init/1, terminate/2, code_change/3]). -export([handle_call/3, handle_cast/2, handle_info/2]). @@ -34,8 +35,76 @@ -define(BY_PID, couchdb_indexes_by_pid). -define(BY_DB, couchdb_indexes_by_db). -define(RELISTEN_DELAY, 5000). +-define(MAX_INDICES_OPEN, 500). + +-record(st, { + lru=couch_lru:new(fun maybe_close_index/1), + open=0, + max_open=?MAX_INDICES_OPEN, + root_dir +}). + +-record(entry, { + name, + pid, + locked=false, + committing=false, + compacting=false, + waiters=undefined +}). + +close(Mon) -> + erlang:demonitor(Mon, [flush]), + ok. + +maybe_close_lru_view(#st{open=Open, max_open=Max}=State) when Open =< Max -> + {ok, State}; +maybe_close_lru_view(State) -> + #st{lru=Lru, open=Open} = State, + case couch_lru:close(Lru) of + false -> + {ok, State}; + {true, NewLru} -> + maybe_close_lru_view(State#st{lru=NewLru, open=Open-1}) + end. + +is_idle(Pid) -> + case erlang:process_info(Pid, monitored_by) of + undefined -> + true; + {monitored_by, Pids} -> + [] =:= Pids -- [whereis(couch_stats_process_tracker)] + end. --record(st, {root_dir}). +set_compacting(Idx, IsCompacting) -> + gen_server:call(?MODULE, {compacting, Idx, IsCompacting}, infinity). + +set_committing(Pid, IsCommitting) -> + gen_server:call(?MODULE, {committing, Pid, IsCommitting}, infinity). + +maybe_close_index({DbName, DDocId, Sig}) -> + case ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, true}) of + true -> + case ets:lookup(?BY_SIG, {DbName, Sig}) of + [#entry{pid=Pid, committing=false, compacting=false}] -> + case is_idle(Pid) of + true -> + rem_from_ets(DbName, Sig, DDocId, Pid), + couch_index:stop(Pid), + {true, true}; + false -> + ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, false}), + couch_stats:increment_counter([couchdb, couch_index_server, lru_skip]), + {false, false} + end; + _ -> + ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, false}), + couch_stats:increment_counter([couchdb, couch_index_server, lru_skip]), + {false, false} + end; + false -> + {false, true} + end. start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -94,8 +163,8 @@ get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) -> get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) -> {ok, InitState} = Module:init(Db, DDoc), {ok, FunResp} = Fun(InitState), - {ok, Pid} = get_index(Module, InitState), - {ok, Pid, FunResp}; + {ok, Pid, Monitor} = get_index(Module, InitState), + {ok, Pid, Monitor, FunResp}; get_index(Module, Db, DDoc, _Fun) -> {ok, InitState} = Module:init(Db, DDoc), get_index(Module, InitState). @@ -105,24 +174,31 @@ get_index(Module, IdxState) -> DbName = Module:get(db_name, IdxState), Sig = Module:get(signature, IdxState), case ets:lookup(?BY_SIG, {DbName, Sig}) of - [{_, Pid}] when is_pid(Pid) -> - {ok, Pid}; + [#entry{pid=Pid, locked=false}] when is_pid(Pid) -> + Monitor = erlang:monitor(process, Pid), + {ok, Pid, Monitor}; _ -> Args = {Module, IdxState, DbName, Sig}, - gen_server:call(?MODULE, {get_index, Args}, infinity) + case gen_server:call(?MODULE, {get_index, Args}, infinity) of + {ok, Pid} -> + Monitor = erlang:monitor(process, Pid), + {ok, Pid, Monitor}; + {error, Reason} -> + {error, Reason} + end end. - init([]) -> process_flag(trap_exit, true), ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()), - ets:new(?BY_SIG, [protected, set, named_table]), - ets:new(?BY_PID, [private, set, named_table]), + ets:new(?BY_SIG, [protected, set, named_table, {keypos, #entry.name}]), + ets:new(?BY_PID, [protected, set, named_table]), ets:new(?BY_DB, [protected, bag, named_table]), couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]), RootDir = couch_index_util:root_dir(), + MaxIndicesOpen = config:get_integer("couchdb", "max_indices_open", ?MAX_INDICES_OPEN), couch_file:init_delete_dir(RootDir), - {ok, #st{root_dir=RootDir}}. + {ok, #st{root_dir=RootDir, max_open=MaxIndicesOpen}}. terminate(_Reason, _State) -> @@ -134,47 +210,69 @@ terminate(_Reason, _State) -> handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) -> case ets:lookup(?BY_SIG, {DbName, Sig}) of [] -> + {ok, NewState} = maybe_close_lru_view(State#st{open=(State#st.open)+1}), spawn_link(fun() -> new_index(Args) end), - ets:insert(?BY_SIG, {{DbName, Sig}, [From]}), + ets:insert(?BY_SIG, #entry{name={DbName, Sig}, waiters=[From]}), + {noreply, NewState}; + [#entry{waiters=Waiters}=Entry] when is_list(Waiters) -> + ets:insert(?BY_SIG, Entry#entry{waiters=[From | Waiters]}), {noreply, State}; - [{_, Waiters}] when is_list(Waiters) -> - ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}), - {noreply, State}; - [{_, Pid}] when is_pid(Pid) -> + [#entry{pid=Pid}] when is_pid(Pid) -> {reply, {ok, Pid}, State} end; handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) -> - [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), + [#entry{waiters=Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), + NewLru = couch_lru:insert({DbName, DDocId, Sig}, State#st.lru), [gen_server:reply(From, {ok, Pid}) || From <- Waiters], link(Pid), add_to_ets(DbName, Sig, DDocId, Pid), - {reply, ok, State}; + {reply, ok, State#st{lru=NewLru}}; handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) -> - [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), + [#entry{waiters=Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), [gen_server:reply(From, Error) || From <- Waiters], ets:delete(?BY_SIG, {DbName, Sig}), + {reply, ok, State#st{open=(State#st.open)-1}}; +handle_call({compacting, Pid, IsCompacting}, _From, State) -> + case ets:lookup(?BY_PID, Pid) of + [{Pid, {DbName, Sig}}] -> + ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.compacting, IsCompacting}); + [] -> + ok + end, + {reply, ok, State}; +handle_call({committing, Pid, IsCommitting}, _From, State) -> + case ets:lookup(?BY_PID, Pid) of + [{Pid, {DbName, Sig}}] -> + ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.committing, IsCommitting}); + [] -> + ok + end, {reply, ok, State}; handle_call({reset_indexes, DbName}, _From, State) -> reset_indexes(DbName, State#st.root_dir), - {reply, ok, State}. + {reply, ok, State}; +handle_call(get_open_count, _From, State) -> + {reply, State#st.open, State}. handle_cast({reset_indexes, DbName}, State) -> reset_indexes(DbName, State#st.root_dir), - {noreply, State}. + {noreply, State}; +handle_cast(close_indexes, State) -> + {ok, NewState} = maybe_close_lru_view(State), + {noreply, NewState}. -handle_info({'EXIT', Pid, Reason}, Server) -> - case ets:lookup(?BY_PID, Pid) of +handle_info({'EXIT', Pid, Reason}, State) -> + NewState = case ets:lookup(?BY_PID, Pid) of [{Pid, {DbName, Sig}}] -> - [{DbName, {DDocId, Sig}}] = - ets:match_object(?BY_DB, {DbName, {'$1', Sig}}), - rem_from_ets(DbName, Sig, DDocId, Pid); + rem_from_ets(DbName, Sig, Pid), + State#st{open=(State#st.open)-1}; [] when Reason /= normal -> exit(Reason); _Else -> - ok + State end, - {noreply, Server}; + {noreply, NewState}; handle_info(restart_config_listener, State) -> ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()), {noreply, State}; @@ -187,18 +285,20 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_config_change("couchdb", "index_dir", RootDir, _, RootDir) -> - {ok, RootDir}; -handle_config_change("couchdb", "view_index_dir", RootDir, _, RootDir) -> - {ok, RootDir}; +handle_config_change("couchdb", "index_dir", RootDir, _, #st{root_dir=RootDir}=State) -> + {ok, State}; +handle_config_change("couchdb", "view_index_dir", RootDir, _, #st{root_dir=RootDir}=State) -> + {ok, State}; handle_config_change("couchdb", "index_dir", _, _, _) -> exit(whereis(couch_index_server), config_change), remove_handler; handle_config_change("couchdb", "view_index_dir", _, _, _) -> exit(whereis(couch_index_server), config_change), remove_handler; -handle_config_change(_, _, _, _, RootDir) -> - {ok, RootDir}. +handle_config_change("couchdb", "max_indices_open", Max, _, State) -> + {ok, State#st{max_open=list_to_integer(Max)}}; +handle_config_change(_, _, _, _, State) -> + {ok, State}. handle_config_terminate(_, stop, _) -> ok; @@ -222,7 +322,7 @@ new_index({Mod, IdxState, DbName, Sig}) -> reset_indexes(DbName, Root) -> % shutdown all the updaters and clear the files, the db got changed Fun = fun({_, {DDocId, Sig}}) -> - [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}), + [#entry{pid=Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}), MRef = erlang:monitor(process, Pid), gen_server:cast(Pid, delete), receive {'DOWN', MRef, _, _, _} -> ok end, @@ -234,11 +334,17 @@ reset_indexes(DbName, Root) -> add_to_ets(DbName, Sig, DDocId, Pid) -> - ets:insert(?BY_SIG, {{DbName, Sig}, Pid}), + ets:insert(?BY_SIG, #entry{name={DbName, Sig}, pid=Pid}), ets:insert(?BY_PID, {Pid, {DbName, Sig}}), ets:insert(?BY_DB, {DbName, {DDocId, Sig}}). +rem_from_ets(DbName, Sig, Pid) -> + [{DbName, {DDocId, Sig}}] = + ets:match_object(?BY_DB, {DbName, {'$1', Sig}}), + rem_from_ets(DbName, Sig, DDocId, Pid). + + rem_from_ets(DbName, Sig, DDocId, Pid) -> ets:delete(?BY_SIG, {DbName, Sig}), ets:delete(?BY_PID, Pid), @@ -254,7 +360,7 @@ handle_db_event(DbName, deleted, St) -> handle_db_event(DbName, {ddoc_updated, DDocId}, St) -> lists:foreach(fun({_DbName, {_DDocId, Sig}}) -> case ets:lookup(?BY_SIG, {DbName, Sig}) of - [{_, IndexPid}] -> + [#entry{pid=IndexPid}] -> (catch gen_server:cast(IndexPid, ddoc_updated)); [] -> ok diff --git a/src/couch_index/test/couch_index_compaction_tests.erl b/src/couch_index/test/couch_index_compaction_tests.erl index 0787151ae4..6cbffc8979 100644 --- a/src/couch_index/test/couch_index_compaction_tests.erl +++ b/src/couch_index/test/couch_index_compaction_tests.erl @@ -19,9 +19,9 @@ setup() -> DbName = ?tempdb(), {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), couch_db:close(Db), - {ok, IndexerPid} = fake_index(Db), + {ok, IndexerPid, Mon} = fake_index(Db), ?assertNot(is_opened(Db)), - {Db, IndexerPid}. + {Db, IndexerPid, Mon}. fake_index(#db{name = DbName} = Db) -> ok = meck:new([test_index], [non_strict]), @@ -67,7 +67,7 @@ compaction_test_() -> }. -hold_db_for_recompaction({Db, Idx}) -> +hold_db_for_recompaction({Db, Idx, Mon}) -> ?_test(begin ?assertNot(is_opened(Db)), ok = meck:reset(test_index), @@ -87,6 +87,7 @@ hold_db_for_recompaction({Db, Idx}) -> end, ?assertNot(is_opened(Db)), + couch_index_server:close(Mon), ok end). diff --git a/src/couch_index/test/couch_index_lru_tests.erl b/src/couch_index/test/couch_index_lru_tests.erl new file mode 100644 index 0000000000..52f4244844 --- /dev/null +++ b/src/couch_index/test/couch_index_lru_tests.erl @@ -0,0 +1,226 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_index_lru_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(MAX_INDICES_OPEN, 10). + +-record(test_idx, { + db_name, + idx_name, + signature +}). + + +setup() -> + test_util:start_couch([]), + DbName = ?tempdb(), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + config:set("couchdb", "max_indices_open", integer_to_list(?MAX_INDICES_OPEN)), + Db. + + +teardown(Db) -> + ok = couch_server:delete(Db#db.name, [?ADMIN_CTX]), + config:delete("couchdb", "max_indices_open"), + (catch couch_db:close(Db)), + ok. + + +lru_test_() -> + { + "Test the view index LRU", + { + setup, + fun() -> test_util:start_couch([]) end, fun test_util:stop_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun test_close_while_compacting/1, + fun test_soft_max/1 + ] + } + } + }. + + +test_close_while_compacting(Db) -> + ?_test(begin + ok = meck:new([couch_index_server], [passthrough]), + Self = self(), + ok = meck:expect(couch_index_server, set_compacting, fun(Idx, IsCompacting) -> + meck:passthrough([Idx, IsCompacting]), + Self ! {compact, IsCompacting, self()}, + receive finish -> + ok + end, + ok + end), + + ok = meck:expect(couch_index_server, set_committing, fun(Idx, IsCommitting) -> + meck:passthrough([Idx, IsCommitting]), + Self ! {commit, IsCommitting, self()}, + ok + end), + + % create ddocs + DDocIds = lists:map(fun(I) -> + BI = integer_to_binary(I), + <<"_design/ddoc_", BI/binary>> + end, lists:seq(1,?MAX_INDICES_OPEN+10)), + ok = create_ddocs(Db, DDocIds), + + % open and compact indexes + Openers = lists:map(fun(DDocId) -> + spawn_link(fun() -> + {ok, Pid, Mon} = couch_index_server:get_index(couch_mrview_index, Db#db.name, DDocId), + couch_index:compact(Pid), + receive close -> + ok + end, + couch_index_server:close(Mon) + end) + end, DDocIds), + + % check that all indexes are open + ToClose = wait_all_compacting(true, [], ?MAX_INDICES_OPEN+10), + ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)), + % close compactor pids, but still block flag from being set in BY_SIG table + lists:foreach(fun(Opener) -> Opener ! close end, Openers), + % check that compaction flag block pids from closing + gen_server:cast(couch_index_server, close_indexes), + ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)), + % allow compaction flag to be unset + lists:foreach(fun(CPid) -> CPid ! finish end, ToClose), + % wait until all compaction flags are unset + Finished = wait_all_compacting(false, [], ?MAX_INDICES_OPEN+10), + lists:foreach(fun(CPid) -> CPid ! finish end, Finished), + gen_server:cast(couch_index_server, close_indexes), + ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)), + % wait for all commits to start + Indexers = wait_all_committing(dict:new(), true, ?MAX_INDICES_OPEN+10), + ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)), + % force premature commit + [Indexer ! commit || Indexer <- Indexers], + % wait until commits happen + wait_all_committing(dict:new(), false, ?MAX_INDICES_OPEN+10), + gen_server:cast(couch_index_server, close_indexes), + % since all commits and all compacts are done, make sure indexes are closed + ?assertEqual(?MAX_INDICES_OPEN, gen_server:call(couch_index_server, get_open_count)), + % clean up + (catch meck:unload(couch_index_server)), + ok + end). + + +test_soft_max(Db) -> + ?_test(begin + ok = meck:new([test_index], [non_strict]), + ok = meck:expect(test_index, init, fun(Db0, DDoc) -> + Sig = couch_crypto:hash(md5, term_to_binary({Db0#db.name, DDoc})), + {ok, #test_idx{db_name=Db0#db.name, idx_name=DDoc, signature=Sig}} + end), + ok = meck:expect(test_index, close, ['_'], {true, true}), + ok = meck:expect(test_index, open, fun(_Db, State) -> + {ok, State} + end), + ok = meck:expect(test_index, compact, ['_', '_', '_'], + meck:seq([{ok, 9}, {ok, 10}])), %% to trigger recompaction + ok = meck:expect(test_index, commit, ['_'], ok), + ok = meck:expect(test_index, get, fun + (db_name, State) -> + State#test_idx.db_name; + (idx_name, State) -> + State#test_idx.idx_name; + (signature, State) -> + State#test_idx.signature; + (update_seq, Seq) -> + Seq + end), + + ok = meck:reset(test_index), + + IdxOpens = lists:map(fun(I) -> + BI = integer_to_binary(I), + % hack: use tuple as index name so couch_index_server won't try to open + % it as a design document. + IndexName = {<<"_design/i", BI/binary>>}, + ?assertEqual(I-1, gen_server:call(couch_index_server, get_open_count)), + couch_index_server:get_index(test_index, Db, IndexName) + end, lists:seq(1, 500)), + + lists:foldl(fun(IdxOpen, Acc) -> + ?assertMatch({ok, _, _}, IdxOpen), + {ok, Pid, Mon} = IdxOpen, + ?assert(is_pid(Pid)), + ?assert(is_reference(Mon)), + ?assertNotEqual(undefined, process_info(Pid)), + gen_server:cast(couch_index_server, close_indexes), + OpenCount = gen_server:call(couch_index_server, get_open_count), + ?assertEqual(max(?MAX_INDICES_OPEN, Acc), OpenCount), + couch_index_server:close(Mon), + Acc-1 + end, 500, IdxOpens), + + config:delete("couchdb", "max_indices_open"), + (catch meck:unload(test_index)), + (catch meck:unload(couch_util)), + ok + end). + + +wait_all_compacting(_IsCompacting, Acc, 0) -> + Acc; +wait_all_compacting(IsCompacting, Acc, Remaining) -> + receive {compact, IsCompacting, From} -> + wait_all_compacting(IsCompacting, [From | Acc], Remaining-1) + end. + + +wait_all_committing(Pids, ShouldBe, Count) -> + receive {commit, IsCommitting, From} -> + Pids0 = dict:store(From, IsCommitting, Pids), + CommitCount = dict:fold(fun(_K, V, Acc) -> + case V of + ShouldBe -> Acc+1; + _ -> Acc + end + end, 0, Pids0), + case Count =:= CommitCount of + true -> + [Pid || {Pid, _} <- dict:to_list(Pids0)]; + false -> + wait_all_committing(Pids0, ShouldBe, Count) + end + end. + + +create_ddocs(Db, DDocIds) -> + Docs = lists:map(fun(DDocId) -> + MapFun = <<"function(doc) {emit(\"", DDocId/binary, "\", 1);}">>, + Json = {[ + {<<"_id">>, DDocId}, + {<<"language">>, <<"javascript">>}, + {<<"views">>, {[ + {<<"v">>, {[ + {<<"map">>, MapFun} + ]}} + ]}} + ]}, + couch_doc:from_json_obj(Json) + end, DDocIds), + {ok, _} = couch_db:update_docs(Db, Docs, [?ADMIN_CTX]), + ok. diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl index 0373919659..d7165ede48 100644 --- a/src/couch_mrview/src/couch_mrview.erl +++ b/src/couch_mrview/src/couch_mrview.erl @@ -325,16 +325,19 @@ count_view_changes_since(Db, DDoc, VName, SinceSeq, Options) -> get_info(Db, DDoc) -> - {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), - couch_index:get_info(Pid). + {ok, Pid, Mon} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), + Info = couch_index:get_info(Pid), + ok = couch_index_server:close(Mon), + Info. trigger_update(Db, DDoc) -> trigger_update(Db, DDoc, couch_db:get_update_seq(Db)). trigger_update(Db, DDoc, UpdateSeq) -> - {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), - couch_index:trigger_update(Pid, UpdateSeq). + {ok, Pid, Mon} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), + couch_index:trigger_update(Pid, UpdateSeq), + couch_index_server:close(Mon). %% get informations on a view get_view_info(Db, DDoc, VName) -> @@ -369,10 +372,14 @@ refresh(Db, DDoc) -> end), case couch_index_server:get_index(couch_mrview_index, Db, DDoc) of - {ok, Pid} -> - case catch couch_index:get_state(Pid, UpdateSeq) of - {ok, _} -> ok; - Error -> {error, Error} + {ok, Pid, Mon} -> + try + case catch couch_index:get_state(Pid, UpdateSeq) of + {ok, _} -> ok; + Error -> {error, Error} + end + after + couch_index_server:close(Mon) end; Error -> {error, Error} @@ -383,17 +390,20 @@ compact(Db, DDoc) -> compact(Db, DDoc, Opts) -> - {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), - couch_index:compact(Pid, Opts). + {ok, Pid, Mon} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), + Ret = couch_index:compact(Pid, Opts), + couch_index_server:close(Mon), + Ret. cancel_compaction(Db, DDoc) -> - {ok, IPid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), + {ok, IPid, Mon} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), {ok, CPid} = couch_index:get_compactor_pid(IPid), ok = couch_index_compactor:cancel(CPid), % Cleanup the compaction file if it exists {ok, #mrst{sig=Sig, db_name=DbName}} = couch_index:get_state(IPid, 0), + couch_index_server:close(Mon), couch_mrview_util:delete_compaction_file(DbName, Sig), ok. diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index 27f8737d47..89a6a3670c 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -40,7 +40,7 @@ get_view(Db, DDoc, ViewName, Args0) -> - {ok, Pid, Args2} = get_view_index_pid(Db, DDoc, ViewName, Args0), + {ok, Pid, Mon, Args2} = get_view_index_pid(Db, DDoc, ViewName, Args0), DbUpdateSeq = couch_util:with_db(Db, fun(WDb) -> couch_db:get_update_seq(WDb) end), @@ -51,9 +51,14 @@ get_view(Db, DDoc, ViewName, Args0) -> {ok, _} = Resp -> Resp; Error -> throw(Error) end, + ok = couch_index_server:close(Mon), Ref = erlang:monitor(process, State#mrst.fd), if Args2#mrargs.update == lazy -> - spawn(fun() -> catch couch_index:get_state(Pid, DbUpdateSeq) end); + spawn(fun() -> + {ok, Pid2, Mon2} = couch_index_server:get_index(?MOD, Db, DDoc), + catch couch_index:get_state(Pid2, DbUpdateSeq), + ok = couch_index_server:close(Mon2) + end); true -> ok end, #mrst{language=Lang, views=Views} = State,