diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 902b5b95ba..8f143fe657 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -84,7 +84,7 @@ handle_changes_req1(#httpd{}=Req, Db) -> #changes_args{filter=Raw, style=Style} = Args0 = parse_changes_query(Req), ChangesArgs = Args0#changes_args{ filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db), - db_open_options = [{user_ctx, Db#db.user_ctx}] + db_open_options = [{user_ctx, couch_db:get_user_ctx(Db)}] }, Max = chttpd:chunked_response_buffer_size(), case ChangesArgs#changes_args.feed of @@ -253,7 +253,7 @@ handle_view_cleanup_req(Req, Db) -> handle_design_req(#httpd{ path_parts=[_DbName, _Design, Name, <<"_",_/binary>> = Action | _Rest] }=Req, Db) -> - DbName = mem3:dbname(Db#db.name), + DbName = mem3:dbname(couch_db:name(Db)), case ddoc_cache:open(DbName, <<"_design/", Name/binary>>) of {ok, DDoc} -> Handler = chttpd_handlers:design_handler(Action, fun bad_action_req/3), @@ -309,7 +309,8 @@ delete_db_req(#httpd{}=Req, DbName) -> do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) -> fabric:get_security(DbName, [{user_ctx,Ctx}]), % calls check_is_reader - Fun(Req, #db{name=DbName, user_ctx=Ctx}). + {ok, Db} = couch_db:clustered_db(DbName, Ctx), + Fun(Req, Db). db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) -> % measure the time required to generate the etag, see if it's worth it @@ -767,16 +768,17 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> } = parse_doc_query(Req), couch_doc:validate_docid(DocId), + DbName = couch_db:name(Db), W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), Options = [{user_ctx,Ctx}, {w,W}], - Loc = absolute_uri(Req, [$/, couch_util:url_encode(Db#db.name), + Loc = absolute_uri(Req, [$/, couch_util:url_encode(DbName), $/, couch_util:url_encode(DocId)]), RespHeaders = [{"Location", Loc}], case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of ("multipart/related;" ++ _) = ContentType -> couch_httpd:check_max_request_length(Req), - couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(Db#db.name), DocId)), + couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)), {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(ContentType, fun() -> receive_request_data(Req) end), Doc = couch_doc_from_req(Req, DocId, Doc0), @@ -833,8 +835,9 @@ db_doc_req(#httpd{method='COPY', user_ctx=Ctx}=Req, Db, SourceDocId) -> HttpCode = 202 end, % respond + DbName = couch_db:name(Db), {PartRes} = update_doc_result_to_json(TargetDocId, {ok, NewTargetRev}), - Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(Db#db.name) ++ "/" ++ couch_util:url_encode(TargetDocId)), + Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName) ++ "/" ++ couch_util:url_encode(TargetDocId)), send_json(Req, HttpCode, [{"Location", Loc}, {"ETag", "\"" ++ ?b2l(couch_doc:rev_to_str(NewTargetRev)) ++ "\""}], @@ -1057,8 +1060,8 @@ couch_doc_from_req(Req, DocId, Json) -> % couch_doc_open(Db, DocId) -> % couch_doc_open(Db, DocId, nil, []). -couch_doc_open(#db{} = Db, DocId, Rev, Options0) -> - Options = [{user_ctx, Db#db.user_ctx} | Options0], +couch_doc_open(Db, DocId, Rev, Options0) -> + Options = [{user_ctx, couch_db:get_user_ctx(Db)} | Options0], case Rev of nil -> % open most recent rev case fabric:open_doc(Db, DocId, Options) of @@ -1262,7 +1265,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa HttpCode = 202 end, erlang:put(mochiweb_request_recv, true), - #db{name=DbName} = Db, + DbName = couch_db:name(Db), {Status, Headers} = case Method of 'DELETE' -> diff --git a/src/chttpd/src/chttpd_external.erl b/src/chttpd/src/chttpd_external.erl index 4abeecb377..64664b98e4 100644 --- a/src/chttpd/src/chttpd_external.erl +++ b/src/chttpd/src/chttpd_external.erl @@ -120,16 +120,22 @@ json_req_obj_field(<<"secObj">>, #httpd{user_ctx=UserCtx}, Db, _DocId) -> get_db_security(Db, UserCtx). -get_db_info(#db{main_pid = nil} = Db) -> - fabric:get_db_info(Db); -get_db_info(#db{} = Db) -> - couch_db:get_db_info(Db). +get_db_info(Db) -> + case couch_db:is_clustered(Db) of + true -> + fabric:get_db_info(Db); + false -> + couch_db:get_db_info(Db) + end. -get_db_security(#db{main_pid = nil}=Db, #user_ctx{}) -> - fabric:get_security(Db); -get_db_security(#db{}=Db, #user_ctx{}) -> - couch_db:get_security(Db). +get_db_security(Db, #user_ctx{}) -> + case couch_db:is_clustered(Db) of + true -> + fabric:get_security(Db); + false -> + couch_db:get_security(Db) + end. to_json_terms(Data) -> diff --git a/src/chttpd/src/chttpd_show.erl b/src/chttpd/src/chttpd_show.erl index 49fed7b8d4..0b45495d03 100644 --- a/src/chttpd/src/chttpd_show.erl +++ b/src/chttpd/src/chttpd_show.erl @@ -196,7 +196,8 @@ handle_view_list_req(Req, _Db, _DDoc) -> handle_view_list(Req, Db, DDoc, LName, {ViewDesignName, ViewName}, Keys) -> %% Will throw an exception if the _list handler is missing couch_util:get_nested_json_value(DDoc#doc.body, [<<"lists">>, LName]), - {ok, VDoc} = ddoc_cache:open(Db#db.name, <<"_design/", ViewDesignName/binary>>), + DbName = couch_db:name(Db), + {ok, VDoc} = ddoc_cache:open(DbName, <<"_design/", ViewDesignName/binary>>), CB = fun couch_mrview_show:list_cb/2, QueryArgs = couch_mrview_http:parse_params(Req, Keys), Options = [{user_ctx, Req#httpd.user_ctx}], diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl index e7cd85d091..5abb316602 100644 --- a/src/couch/include/couch_db.hrl +++ b/src/couch/include/couch_db.hrl @@ -128,33 +128,6 @@ handler }). --record(db, { - main_pid = nil, - compactor_pid = nil, - instance_start_time, % number of microsecs since jan 1 1970 as a binary string - fd, - fd_monitor, - header = couch_db_header:new(), - committed_update_seq, - id_tree, - seq_tree, - local_tree, - update_seq, - name, - filepath, - validate_doc_funs = undefined, - security = [], - security_ptr = nil, - user_ctx = #user_ctx{}, - waiting_delayed_commit = nil, - revs_limit = 1000, - fsync_options = [], - options = [], - compression, - before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc - after_doc_read = nil % nil | fun(Doc, Db) -> NewDoc -}). - -record(view_fold_helper_funs, { reduce_count, passed_end, diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl index 9d38cfae2d..481ef2c1da 100644 --- a/src/couch/src/couch_att.erl +++ b/src/couch/src/couch_att.erl @@ -481,6 +481,9 @@ flush(Fd, Att) -> flush_data(Fd, fetch(data, Att), Att). +flush_data(Fd, {stream, {couch_bt_engine_stream, {OtherFd, StreamPointer}}}, + Att) -> + flush_data(Fd, {OtherFd, StreamPointer}, Att); flush_data(Fd, {Fd0, _}, Att) when Fd0 == Fd -> % already written to our file, nothing to write Att; diff --git a/src/couch/src/couch_auth_cache.erl b/src/couch/src/couch_auth_cache.erl index 1c4b866510..16c59d19ab 100644 --- a/src/couch/src/couch_auth_cache.erl +++ b/src/couch/src/couch_auth_cache.erl @@ -322,13 +322,15 @@ refresh_entries(AuthDb) -> nil -> ok; AuthDb2 -> - case AuthDb2#db.update_seq > AuthDb#db.update_seq of + AuthDbSeq = couch_db:get_update_seq(AuthDb), + AuthDb2Seq = couch_db:get_update_seq(AuthDb2), + case AuthDb2Seq > AuthDbSeq of true -> {ok, _, _} = couch_db:enum_docs_since( AuthDb2, - AuthDb#db.update_seq, + AuthDbSeq, fun(DocInfo, _, _) -> refresh_entry(AuthDb2, DocInfo) end, - AuthDb#db.update_seq, + AuthDbSeq, [] ), true = ets:insert(?STATE, {auth_db, AuthDb2}); @@ -386,7 +388,9 @@ cache_needs_refresh() -> nil -> false; AuthDb2 -> - AuthDb2#db.update_seq > AuthDb#db.update_seq + AuthDbSeq = couch_db:get_update_seq(AuthDb), + AuthDb2Seq = couch_db:get_update_seq(AuthDb2), + AuthDb2Seq > AuthDbSeq end end, false @@ -407,7 +411,7 @@ exec_if_auth_db(Fun) -> exec_if_auth_db(Fun, DefRes) -> case ets:lookup(?STATE, auth_db) of - [{auth_db, #db{} = AuthDb}] -> + [{auth_db, AuthDb}] -> Fun(AuthDb); _ -> DefRes diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl index 52ff39dedb..ea7f65c632 100644 --- a/src/couch/src/couch_changes.erl +++ b/src/couch/src/couch_changes.erl @@ -78,9 +78,10 @@ handle_changes(Args1, Req, Db0, Type) -> _ -> {false, undefined, undefined} end, + DbName = couch_db:name(Db0), {StartListenerFun, View} = if UseViewChanges -> {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view( - Db0#db.name, DDocName, ViewName, #mrargs{}), + DbName, DDocName, ViewName, #mrargs{}), case View0#mrview.seq_btree of #btree{} -> ok; @@ -89,14 +90,14 @@ handle_changes(Args1, Req, Db0, Type) -> end, SNFun = fun() -> couch_event:link_listener( - ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, Db0#db.name}] + ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}] ) end, {SNFun, View0}; true -> SNFun = fun() -> couch_event:link_listener( - ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}] + ?MODULE, handle_db_event, self(), [{dbname, DbName}] ) end, {SNFun, undefined} @@ -111,7 +112,7 @@ handle_changes(Args1, Req, Db0, Type) -> end, View2 = if UseViewChanges -> {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view( - Db0#db.name, DDocName, ViewName, #mrargs{}), + DbName, DDocName, ViewName, #mrargs{}), View1; true -> undefined @@ -219,11 +220,11 @@ configure_filter("_view", Style, Req, Db) -> catch _:_ -> view end, - case Db#db.id_tree of - undefined -> + case couch_db:is_clustered(Db) of + true -> DIR = fabric_util:doc_id_and_rev(DDoc), {fetch, FilterType, Style, DIR, VName}; - _ -> + false -> {FilterType, Style, DDoc, VName} end; [] -> @@ -242,11 +243,11 @@ configure_filter(FilterName, Style, Req, Db) -> [DName, FName] -> {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>), check_member_exists(DDoc, [<<"filters">>, FName]), - case Db#db.id_tree of - undefined -> + case couch_db:is_clustered(Db) of + true -> DIR = fabric_util:doc_id_and_rev(DDoc), {fetch, custom, Style, Req, DIR, FName}; - _ -> + false-> {custom, Style, Req, DDoc, FName} end; @@ -395,15 +396,19 @@ check_fields(_Fields) -> throw({bad_request, "Selector error: fields must be JSON array"}). -open_ddoc(#db{name=DbName, id_tree=undefined}, DDocId) -> - case ddoc_cache:open_doc(mem3:dbname(DbName), DDocId) of - {ok, _} = Resp -> Resp; - Else -> throw(Else) - end; open_ddoc(Db, DDocId) -> - case couch_db:open_doc(Db, DDocId, [ejson_body]) of - {ok, _} = Resp -> Resp; - Else -> throw(Else) + DbName = couch_db:name(Db), + case couch_db:is_clustered(Db) of + true -> + case ddoc_cache:open_doc(mem3:dbname(DbName), DDocId) of + {ok, _} = Resp -> Resp; + Else -> throw(Else) + end; + false -> + case couch_db:open_doc(Db, DDocId, [ejson_body]) of + {ok, _} = Resp -> Resp; + Else -> throw(Else) + end end. @@ -566,7 +571,7 @@ can_optimize(_, _) -> send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) -> - Lookups = couch_btree:lookup(Db#db.id_tree, DocIds), + Lookups = couch_db:get_full_doc_infos(Db, DocIds), FullInfos = lists:foldl(fun ({ok, FDI}, Acc) -> [FDI | Acc]; (not_found, Acc) -> Acc @@ -575,11 +580,9 @@ send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) -> send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) -> - FoldFun = fun(FullDocInfo, _, Acc) -> - {ok, [FullDocInfo | Acc]} - end, + FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end, KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}], - {ok, _, FullInfos} = couch_btree:fold(Db#db.id_tree, FoldFun, [], KeyOpts), + {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], KeyOpts), send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). @@ -640,8 +643,8 @@ keep_sending_changes(Args, Acc0, FirstRound) -> true -> case wait_updated(Timeout, TimeoutFun, UserAcc2) of {updated, UserAcc4} -> - DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions], - case couch_db:open(Db#db.name, DbOptions1) of + DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions], + case couch_db:open(couch_db:name(Db), DbOptions1) of {ok, Db2} -> keep_sending_changes( Args#changes_args{limit=NewLimit}, @@ -665,7 +668,8 @@ keep_sending_changes(Args, Acc0, FirstRound) -> maybe_refresh_view(_, undefined, undefined) -> undefined; maybe_refresh_view(Db, DDocName, ViewName) -> - {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(Db#db.name, DDocName, ViewName, #mrargs{}), + DbName = couch_db:name(Db), + {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}), View. end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> diff --git a/src/couch/src/couch_compaction_daemon.erl b/src/couch/src/couch_compaction_daemon.erl index 8f95eb21e9..f3b646d293 100644 --- a/src/couch/src/couch_compaction_daemon.erl +++ b/src/couch/src/couch_compaction_daemon.erl @@ -319,7 +319,7 @@ can_db_compact(#config{db_frag = Threshold} = Config, Db) -> {Frag, SpaceRequired} = frag(DbInfo), couch_log:debug("Fragmentation for database `~s` is ~p%, estimated" " space for compaction is ~p bytes.", - [Db#db.name, Frag, SpaceRequired]), + [couch_db:name(Db), Frag, SpaceRequired]), case check_frag(Threshold, Frag) of false -> false; @@ -332,7 +332,7 @@ can_db_compact(#config{db_frag = Threshold} = Config, Db) -> couch_log:warning("Compaction daemon - skipping database `~s` " "compaction: the estimated necessary disk space is about ~p" " bytes but the currently available disk space is ~p bytes.", - [Db#db.name, SpaceRequired, Free]), + [couch_db:name(Db), SpaceRequired, Free]), false end end diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index d01a3e0c43..418b149e80 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -12,32 +12,118 @@ -module(couch_db). --export([open/2,open_int/2,close/1,create/2,get_db_info/1,get_design_docs/1]). --export([start_compact/1, cancel_compact/1]). --export([wait_for_compaction/1, wait_for_compaction/2]). --export([is_idle/1,monitor/1,count_changes_since/2]). --export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). --export([get_doc_info/2,get_full_doc_info/2,get_full_doc_infos/2]). --export([open_doc/2,open_doc/3,open_doc_revs/4]). --export([set_revs_limit/2,get_revs_limit/1]). --export([get_missing_revs/2,name/1,get_update_seq/1,get_committed_update_seq/1]). --export([get_uuid/1, get_epochs/1, get_compacted_seq/1]). --export([enum_docs/4,enum_docs_since/5]). --export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). --export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). --export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]). --export([set_security/2,get_security/1]). --export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]). --export([check_is_admin/1, is_admin/1, check_is_member/1, get_doc_count/1]). --export([reopen/1, is_system_db/1, compression/1, make_doc/5]). --export([load_validation_funs/1]). --export([check_md5/2, with_stream/3]). --export([monitored_by/1]). --export([normalize_dbname/1]). --export([validate_dbname/1]). --export([dbname_suffix/1]). +-export([ + create/2, + open/2, + open_int/2, + incref/1, + reopen/1, + close/1, + + clustered_db/2, + clustered_db/3, + + monitor/1, + monitored_by/1, + is_idle/1, + + is_admin/1, + check_is_admin/1, + check_is_member/1, + + name/1, + compression/1, + get_after_doc_read_fun/1, + get_before_doc_update_fun/1, + get_committed_update_seq/1, + get_compacted_seq/1, + get_compactor_pid/1, + get_db_info/1, + get_doc_count/1, + get_epochs/1, + get_filepath/1, + get_instance_start_time/1, + get_last_purged/1, + get_pid/1, + get_revs_limit/1, + get_security/1, + get_update_seq/1, + get_user_ctx/1, + get_uuid/1, + get_purge_seq/1, + + is_db/1, + is_system_db/1, + is_clustered/1, + + increment_update_seq/1, + set_revs_limit/2, + set_security/2, + set_user_ctx/2, + + ensure_full_commit/1, + ensure_full_commit/2, + + load_validation_funs/1, + + open_doc/2, + open_doc/3, + open_doc_revs/4, + open_doc_int/3, + read_doc/2, + get_doc_info/2, + get_full_doc_info/2, + get_full_doc_infos/2, + get_missing_revs/2, + get_design_docs/1, + + update_doc/3, + update_doc/4, + update_docs/4, + update_docs/2, + update_docs/3, + delete_doc/3, + + purge_docs/2, + + with_stream/3, + + fold_docs/4, + fold_local_docs/4, + enum_docs/4, + enum_docs_reduce_to_count/1, + + enum_docs_since/5, + enum_docs_since_reduce_to_count/1, + changes_since/4, + changes_since/5, + count_changes_since/2, + + calculate_start_seq/3, + owner_of/2, + + start_compact/1, + cancel_compact/1, + wait_for_compaction/1, + wait_for_compaction/2, + + dbname_suffix/1, + normalize_dbname/1, + validate_dbname/1, + + check_md5/2, + make_doc/5, + new_revid/1 +]). + + +-export([ + start_link/3 +]). + -include_lib("couch/include/couch_db.hrl"). +-include("couch_db_int.hrl"). -define(DBNAME_REGEX, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*" % use the stock CouchDB regex @@ -112,9 +198,31 @@ reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) -> {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}} end. +incref(#db{fd = Fd} = Db) -> + Ref = erlang:monitor(process, Fd), + {ok, Db#db{fd_monitor = Ref}}. + +clustered_db(DbName, UserCtx) -> + clustered_db(DbName, UserCtx, []). + +clustered_db(DbName, UserCtx, SecProps) -> + {ok, #db{name = DbName, user_ctx = UserCtx, security = SecProps}}. + +is_db(#db{}) -> + true; +is_db(_) -> + false. + is_system_db(#db{options = Options}) -> lists:member(sys_db, Options). +is_clustered(#db{main_pid = nil}) -> + true; +is_clustered(#db{}) -> + false; +is_clustered(?NEW_PSE_DB = Db) -> + ?PSE_DB_MAIN_PID(Db) == undefined. + ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) -> ok = gen_server:call(Pid, full_commit, infinity), {ok, StartTime}. @@ -126,6 +234,8 @@ ensure_full_commit(Db, RequiredSeq) -> close(#db{fd_monitor=Ref}) -> erlang:demonitor(Ref, [flush]), + ok; +close(?NEW_PSE_DB) -> ok. is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) -> @@ -295,12 +405,23 @@ increment_update_seq(#db{main_pid=Pid}) -> purge_docs(#db{main_pid=Pid}, IdsRevs) -> gen_server:call(Pid, {purge_docs, IdsRevs}). +get_after_doc_read_fun(#db{after_doc_read = Fun}) -> + Fun. + +get_before_doc_update_fun(#db{before_doc_update = Fun}) -> + Fun. + get_committed_update_seq(#db{committed_update_seq=Seq}) -> Seq. get_update_seq(#db{update_seq=Seq})-> Seq. +get_user_ctx(#db{user_ctx = UserCtx}) -> + UserCtx; +get_user_ctx(?NEW_PSE_DB = Db) -> + ?PSE_DB_USER_CTX(Db). + get_purge_seq(#db{}=Db) -> couch_db_header:purge_seq(Db#db.header). @@ -312,19 +433,33 @@ get_last_purged(#db{}=Db) -> couch_file:pread_term(Db#db.fd, Pointer) end. +get_pid(#db{main_pid = Pid}) -> + Pid. + get_doc_count(Db) -> - {ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree), - {ok, Count}. + {ok, Reds} = couch_btree:full_reduce(Db#db.id_tree), + {ok, element(1, Reds)}. get_uuid(#db{}=Db) -> couch_db_header:uuid(Db#db.header). get_epochs(#db{}=Db) -> - couch_db_header:epochs(Db#db.header). + Epochs = couch_db_header:epochs(Db#db.header), + validate_epochs(Epochs), + Epochs. + +get_filepath(#db{filepath = FilePath}) -> + FilePath. + +get_instance_start_time(#db{instance_start_time = IST}) -> + IST. get_compacted_seq(#db{}=Db) -> couch_db_header:compacted_seq(Db#db.header). +get_compactor_pid(#db{compactor_pid = Pid}) -> + Pid. + get_db_info(Db) -> #db{fd=Fd, header=Header, @@ -503,7 +638,9 @@ get_members(#db{security=SecProps}) -> couch_util:get_value(<<"readers">>, SecProps, {[]})). get_security(#db{security=SecProps}) -> - {SecProps}. + {SecProps}; +get_security(?NEW_PSE_DB = Db) -> + {?PSE_DB_SECURITY(Db)}. set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> check_is_admin(Db), @@ -514,6 +651,9 @@ set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> set_security(_, _) -> throw(bad_request). +set_user_ctx(#db{} = Db, UserCtx) -> + {ok, Db#db{user_ctx = UserCtx}}. + validate_security_object(SecProps) -> Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}), % we fallback to readers here for backwards compatibility @@ -549,7 +689,9 @@ set_revs_limit(_Db, _Limit) -> throw(invalid_revs_limit). name(#db{name=Name}) -> - Name. + Name; +name(?NEW_PSE_DB = Db) -> + ?PSE_DB_NAME(Db). compression(#db{compression=Compression}) -> Compression. @@ -1271,6 +1413,17 @@ enum_docs_since(Db, SinceSeq, InFun, Acc, Options) -> [{start_key, SinceSeq + 1} | Options]), {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}. + +fold_docs(Db, InFun, InAcc, Opts) -> + Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end, + {ok, _, AccOut} = couch_btree:fold(Db#db.id_tree, Wrapper, InAcc, Opts), + {ok, AccOut}. + +fold_local_docs(Db, InFun, InAcc, Opts) -> + Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end, + {ok, _, AccOut} = couch_btree:fold(Db#db.local_tree, Wrapper, InAcc, Opts), + {ok, AccOut}. + enum_docs(Db, InFun, InAcc, Options0) -> {NS, Options} = extract_namespace(Options0), enum_docs(Db, NS, InFun, InAcc, Options). @@ -1294,6 +1447,78 @@ enum_docs(Db, NS, InFun, InAcc, Options0) -> Db#db.id_tree, FoldFun, InAcc, Options), {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. + +calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) -> + Seq; +calculate_start_seq(Db, Node, {Seq, Uuid}) -> + % Treat the current node as the epoch node + calculate_start_seq(Db, Node, {Seq, Uuid, Node}); +calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) -> + case is_prefix(Uuid, get_uuid(Db)) of + true -> + case is_owner(EpochNode, Seq, get_epochs(Db)) of + true -> Seq; + false -> 0 + end; + false -> + %% The file was rebuilt, most likely in a different + %% order, so rewind. + 0 + end; +calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) -> + case is_prefix(Uuid, couch_db:get_uuid(Db)) of + true -> + start_seq(get_epochs(Db), OriginalNode, Seq); + false -> + {replace, OriginalNode, Uuid, Seq} + end. + + +validate_epochs(Epochs) -> + %% Assert uniqueness. + case length(Epochs) == length(lists:ukeysort(2, Epochs)) of + true -> ok; + false -> erlang:error(duplicate_epoch) + end, + %% Assert order. + case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of + true -> ok; + false -> erlang:error(epoch_order) + end. + + +is_prefix(Pattern, Subject) -> + binary:longest_common_prefix([Pattern, Subject]) == size(Pattern). + + +is_owner(Node, Seq, Epochs) -> + Node =:= owner_of(Epochs, Seq). + + +owner_of(Db, Seq) when not is_list(Db) -> + owner_of(get_epochs(Db), Seq); +owner_of([], _Seq) -> + undefined; +owner_of([{EpochNode, EpochSeq} | _Rest], Seq) when Seq > EpochSeq -> + EpochNode; +owner_of([_ | Rest], Seq) -> + owner_of(Rest, Seq). + + +start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq > EpochSeq -> + %% OrigNode is the owner of the Seq so we can safely stream from there + Seq; +start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq > NewSeq -> + %% We transferred this file before Seq was written on OrigNode, so we need + %% to stream from the beginning of the next epoch. Note that it is _not_ + %% necessary for the current node to own the epoch beginning at NewSeq + NewSeq; +start_seq([_ | Rest], OrigNode, Seq) -> + start_seq(Rest, OrigNode, Seq); +start_seq([], OrigNode, Seq) -> + erlang:error({epoch_mismatch, OrigNode, Seq}). + + extract_namespace(Options0) -> case proplists:split(Options0, [namespace]) of {[[{namespace, NS}]], Options} -> @@ -1632,6 +1857,30 @@ should_fail_validate_dbname(DbName) -> ok end)}. +calculate_start_seq_test() -> + %% uuid mismatch is always a rewind. + Hdr1 = couch_db_header:new(), + Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]), + ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})), + %% uuid matches and seq is owned by node. + Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]), + ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})), + %% uuids match but seq is not owned by node. + Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]), + ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})), + %% return integer if we didn't get a vector. + ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)). + +is_owner_test() -> + ?assertNot(is_owner(foo, 1, [])), + ?assertNot(is_owner(foo, 1, [{foo, 1}])), + ?assert(is_owner(foo, 2, [{foo, 1}])), + ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])), + ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])), + ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])), + ?assertError(duplicate_epoch, validate_epochs([{foo, 1}, {bar, 1}])), + ?assertError(epoch_order, validate_epochs([{foo, 100}, {bar, 200}])). + to_binary(DbName) when is_list(DbName) -> ?l2b(DbName); to_binary(DbName) when is_binary(DbName) -> diff --git a/src/couch/src/couch_db_int.hrl b/src/couch/src/couch_db_int.hrl new file mode 100644 index 0000000000..da1e45d75e --- /dev/null +++ b/src/couch/src/couch_db_int.hrl @@ -0,0 +1,93 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-record(db, { + main_pid = nil, + compactor_pid = nil, + instance_start_time, % number of microsecs since jan 1 1970 as a binary string + fd, + fd_monitor, + header = couch_db_header:new(), + committed_update_seq, + id_tree, + seq_tree, + local_tree, + update_seq, + name, + filepath, + validate_doc_funs = undefined, + security = [], + security_ptr = nil, + user_ctx = #user_ctx{}, + waiting_delayed_commit = nil, + revs_limit = 1000, + fsync_options = [], + options = [], + compression, + before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc + after_doc_read = nil % nil | fun(Doc, Db) -> NewDoc +}). + + +-record(new_pse_db, { + vsn, + name, + filepath, + + engine = {couch_bt_engine, undefined}, + + main_pid = nil, + compactor_pid = nil, + + committed_update_seq, + + instance_start_time, % number of microsecs since jan 1 1970 as a binary string + + user_ctx = #user_ctx{}, + security = [], + validate_doc_funs = undefined, + + before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc + after_doc_read = nil, % nil | fun(Doc, Db) -> NewDoc + + waiting_delayed_commit = nil, + + options = [], + compression +}). + + +-define(NEW_PSE_DB, { + db, + _, % Version + _, % Name + _, % FilePath + _, % Engine + _, % MainPid + _, % CompactorPid + _, % CommittedUpdateSeq + _, % InstanceStartTime + _, % UserCtx + _, % Security + _, % ValidateDocFuns + _, % BeforeDocUpdate + _, % AfterDocRead + _, % WaitingDelayedCommit + _, % Options + _ % Compression +}). + + +-define(PSE_DB_NAME(Db), element(3, Db)). +-define(PSE_DB_MAIN_PID(Db), element(6, Db)). +-define(PSE_DB_USER_CTX(Db), element(10, Db)). +-define(PSE_DB_SECURITY(Db), element(11, Db)). diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl index 774e9e0947..740b8121bc 100644 --- a/src/couch/src/couch_db_plugin.erl +++ b/src/couch/src/couch_db_plugin.erl @@ -32,13 +32,15 @@ validate_dbname(DbName, Normalized, Default) -> maybe_handle(validate_dbname, [DbName, Normalized], Default). -before_doc_update(#db{before_doc_update = Fun} = Db, Doc0) -> +before_doc_update(Db, Doc0) -> + Fun = couch_db:get_before_doc_update_fun(Db), case with_pipe(before_doc_update, [Doc0, Db]) of [Doc1, _Db] when is_function(Fun) -> Fun(Doc1, Db); [Doc1, _Db] -> Doc1 end. -after_doc_read(#db{after_doc_read = Fun} = Db, Doc0) -> +after_doc_read(Db, Doc0) -> + Fun = couch_db:get_after_doc_read_fun(Db), case with_pipe(after_doc_read, [Doc0, Db]) of [Doc1, _Db] when is_function(Fun) -> Fun(Doc1, Db); [Doc1, _Db] -> Doc1 diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 78726358e2..1970b78cda 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -20,6 +20,7 @@ -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -include_lib("couch/include/couch_db.hrl"). +-include("couch_db_int.hrl"). -record(comp_header, { db_header, diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl index a6d83d619b..34a1539aab 100644 --- a/src/couch/src/couch_httpd_db.erl +++ b/src/couch/src/couch_httpd_db.erl @@ -70,7 +70,8 @@ handle_changes_req(#httpd{method='GET'}=Req, Db, ChangesArgs, ChangesFun) -> handle_changes_req(#httpd{}=Req, _Db, _ChangesArgs, _ChangesFun) -> couch_httpd:send_method_not_allowed(Req, "GET,HEAD,POST"). -handle_changes_req1(Req, #db{name=DbName}=Db, ChangesArgs, ChangesFun) -> +handle_changes_req1(Req, Db, ChangesArgs, ChangesFun) -> + DbName = couch_db:name(Db), AuthDbName = ?l2b(config:get("couch_httpd_auth", "authentication_db")), case AuthDbName of DbName -> @@ -287,7 +288,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) - RequiredSeq > CommittedSeq -> couch_db:ensure_full_commit(Db); true -> - {ok, Db#db.instance_start_time} + {ok, couch_db:get_instance_start_time(Db)} end end, send_json(Req, 201, {[ @@ -733,7 +734,8 @@ update_doc_result_to_json(DocId, Error) -> update_doc(Req, Db, DocId, #doc{deleted=false}=Doc) -> - Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(Db#db.name) ++ "/" ++ couch_util:url_encode(DocId)), + DbName = couch_db:name(Db), + Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName) ++ "/" ++ couch_util:url_encode(DocId)), update_doc(Req, Db, DocId, Doc, [{"Location", Loc}]); update_doc(Req, Db, DocId, Doc) -> update_doc(Req, Db, DocId, Doc, []). @@ -1037,7 +1039,7 @@ db_attachment_req(#httpd{method=Method,mochi_req=MochiReq}=Req, Db, DocId, FileN []; _ -> [{"Location", absolute_uri(Req, "/" ++ - couch_util:url_encode(Db#db.name) ++ "/" ++ + couch_util:url_encode(couch_db:name(Db)) ++ "/" ++ couch_util:url_encode(DocId) ++ "/" ++ couch_util:url_encode(FileName) )}] @@ -1149,7 +1151,7 @@ parse_changes_query(Req, Db) -> {"descending", "true"} -> Args#changes_args{dir=rev}; {"since", "now"} -> - UpdateSeq = couch_util:with_db(Db#db.name, fun(WDb) -> + UpdateSeq = couch_util:with_db(couch_db:name(Db), fun(WDb) -> couch_db:get_update_seq(WDb) end), Args#changes_args{since=UpdateSeq}; diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl index b79286ee0f..5fd1fbe811 100644 --- a/src/couch/src/couch_lru.erl +++ b/src/couch/src/couch_lru.erl @@ -13,7 +13,7 @@ -module(couch_lru). -export([new/0, insert/2, update/2, close/1]). --include_lib("couch/include/couch_db.hrl"). +-include("couch_server_int.hrl"). new() -> Updates = ets:new(couch_lru_updates, [ordered_set]), @@ -49,18 +49,19 @@ close({Count, Updates, Dbs}) -> close_int('$end_of_table', _Updates, _Dbs) -> false; close_int({_Count, DbName} = Key, Updates, Dbs) -> - case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of + case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of true -> - [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName), + [#entry{db = Db, pid = DbPid}] = ets:lookup(couch_dbs, DbName), case couch_db:is_idle(Db) of true -> true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Pid), - exit(Pid, kill), + true = ets:delete(couch_dbs_pid_to_name, DbPid), + exit(DbPid, kill), true = ets:delete(Updates, Key), true = ets:delete(Dbs, DbName), true; false -> - true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}), + ElemSpec = {#entry.lock, unlocked}, + true = ets:update_element(couch_dbs, DbName, ElemSpec), couch_stats:increment_counter([couchdb, couch_server, lru_skip]), close_int(ets:next(Updates, Key), Updates, Dbs) end; diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index ad2a5f0ecb..2c177afa92 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -26,6 +26,7 @@ -export([handle_config_change/5, handle_config_terminate/3]). -include_lib("couch/include/couch_db.hrl"). +-include("couch_server_int.hrl"). -define(MAX_DBS_OPEN, 100). -define(RELISTEN_DELAY, 5000). @@ -73,16 +74,18 @@ sup_start_link() -> open(DbName, Options0) -> Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}), case ets:lookup(couch_dbs, DbName) of - [#db{fd=Fd, fd_monitor=Lock, options=Options} = Db] when Lock =/= locked -> - update_lru(DbName, Options), - {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; + [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked -> + update_lru(DbName, Entry#entry.db_options), + {ok, Db1} = couch_db:incref(Db0), + couch_db:set_user_ctx(Db1, Ctx); _ -> Options = maybe_add_sys_db_callbacks(DbName, Options0), Timeout = couch_util:get_value(timeout, Options, infinity), Create = couch_util:get_value(create_if_missing, Options, false), case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of - {ok, #db{fd=Fd} = Db} -> - {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; + {ok, Db0} -> + {ok, Db1} = couch_db:incref(Db0), + couch_db:set_user_ctx(Db1, Ctx); {not_found, no_db_file} when Create -> couch_log:warning("creating missing database: ~s", [DbName]), couch_server:create(DbName, Options); @@ -103,9 +106,10 @@ close_lru() -> create(DbName, Options0) -> Options = maybe_add_sys_db_callbacks(DbName, Options0), case gen_server:call(couch_server, {create, DbName, Options}, infinity) of - {ok, #db{fd=Fd} = Db} -> + {ok, Db0} -> Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}), - {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; + {ok, Db1} = couch_db:incref(Db0), + couch_db:set_user_ctx(Db1, Ctx); Error -> Error end. @@ -187,7 +191,7 @@ init([]) -> ok = config:listen_for_changes(?MODULE, nil), ok = couch_file:init_delete_dir(RootDir), hash_admin_passwords(), - ets:new(couch_dbs, [set, protected, named_table, {keypos, #db.name}]), + ets:new(couch_dbs, [set, protected, named_table, {keypos, #entry.name}]), ets:new(couch_dbs_pid_to_name, [set, protected, named_table]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir, @@ -199,8 +203,9 @@ terminate(Reason, Srv) -> couch_log:error("couch_server terminating with ~p, state ~2048p", [Reason, Srv#server{lru = redacted}]), - ets:foldl(fun(#db{main_pid=Pid}, _) -> couch_util:shutdown_sync(Pid) end, - nil, couch_dbs), + ets:foldl(fun(#entry{db = Db}, _) -> + couch_util:shutdown_sync(couch_db:get_pid(Db)) + end, nil, couch_dbs), ok. handle_config_change("couchdb", "database_dir", _, _, _) -> @@ -306,15 +311,13 @@ open_async(Server, From, DbName, Filepath, Options) -> true -> create; false -> open end, - % icky hack of field values - compactor_pid used to store clients - % and fd used for opening request info - true = ets:insert(couch_dbs, #db{ + true = ets:insert(couch_dbs, #entry{ name = DbName, - fd = ReqType, - main_pid = Opener, - compactor_pid = [From], - fd_monitor = locked, - options = Options + pid = Opener, + lock = locked, + waiters = [From], + req_type = ReqType, + db_options = Options }), true = ets:insert(couch_dbs_pid_to_name, {Opener, DbName}), db_opened(Server, Options). @@ -338,16 +341,15 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) -> true = ets:delete(couch_dbs_pid_to_name, FromPid), OpenTime = timer:now_diff(os:timestamp(), T0) / 1000, couch_stats:update_histogram([couchdb, db_open_time], OpenTime), - % icky hack of field values - compactor_pid used to store clients - % and fd used to possibly store a creation request + DbPid = couch_db:get_pid(Db), case ets:lookup(couch_dbs, DbName) of [] -> % db was deleted during async open - exit(Db#db.main_pid, kill), + exit(DbPid, kill), {reply, ok, Server}; - [#db{fd=ReqType, compactor_pid=Froms}] -> - link(Db#db.main_pid), - [gen_server:reply(From, {ok, Db}) || From <- Froms], + [#entry{req_type = ReqType, waiters = Waiters} = Entry] -> + link(DbPid), + [gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters], % Cancel the creation request if it exists. case ReqType of {create, DbName, _Filepath, _Options, CrFrom} -> @@ -355,8 +357,15 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) -> _ -> ok end, - true = ets:insert(couch_dbs, Db), - true = ets:insert(couch_dbs_pid_to_name, {Db#db.main_pid, DbName}), + true = ets:insert(couch_dbs, #entry{ + name = DbName, + db = Db, + pid = DbPid, + lock = unlocked, + db_options = Entry#entry.db_options, + start_time = couch_db:get_instance_start_time(Db) + }), + true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}), Lru = case couch_db:is_system_db(Db) of false -> couch_lru:insert(DbName, Server#server.lru); @@ -368,13 +377,12 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) -> handle_call({open_result, T0, DbName, {error, eexist}}, From, Server) -> handle_call({open_result, T0, DbName, file_exists}, From, Server); handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) -> - % icky hack of field values - compactor_pid used to store clients case ets:lookup(couch_dbs, DbName) of [] -> % db was deleted during async open {reply, ok, Server}; - [#db{fd=ReqType, compactor_pid=Froms}=Db] -> - [gen_server:reply(From, Error) || From <- Froms], + [#entry{req_type = ReqType, waiters = Waiters} = Entry] -> + [gen_server:reply(Waiter, Error) || Waiter <- Waiters], couch_log:info("open_result error ~p for ~s", [Error, DbName]), true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, FromPid), @@ -384,7 +392,7 @@ handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) -> _ -> Server end, - {reply, ok, db_closed(NewServer, Db#db.options)} + {reply, ok, db_closed(NewServer, Entry#entry.db_options)} end; handle_call({open, DbName, Options}, From, Server) -> case ets:lookup(couch_dbs, DbName) of @@ -402,15 +410,14 @@ handle_call({open, DbName, Options}, From, Server) -> Error -> {reply, Error, Server} end; - [#db{compactor_pid = Froms} = Db] when is_list(Froms) -> - % icky hack of field values - compactor_pid used to store clients - true = ets:insert(couch_dbs, Db#db{compactor_pid = [From|Froms]}), - if length(Froms) =< 10 -> ok; true -> + [#entry{waiters = Waiters} = Entry] when is_list(Waiters) -> + true = ets:insert(couch_dbs, Entry#entry{waiters = [From | Waiters]}), + if length(Waiters) =< 10 -> ok; true -> Fmt = "~b clients waiting to open db ~s", - couch_log:info(Fmt, [length(Froms), DbName]) + couch_log:info(Fmt, [length(Waiters), DbName]) end, {noreply, Server}; - [#db{} = Db] -> + [#entry{db = Db}] -> {reply, {ok, Db}, Server} end; handle_call({create, DbName, Options}, From, Server) -> @@ -427,14 +434,13 @@ handle_call({create, DbName, Options}, From, Server) -> CloseError -> {reply, CloseError, Server} end; - [#db{fd=open}=Db] -> + [#entry{req_type = open} = Entry] -> % We're trying to create a database while someone is in % the middle of trying to open it. We allow one creator % to wait while we figure out if it'll succeed. - % icky hack of field values - fd used to store create request CrOptions = [create | Options], - NewDb = Db#db{fd={create, DbName, Filepath, CrOptions, From}}, - true = ets:insert(couch_dbs, NewDb), + Req = {create, DbName, Filepath, CrOptions, From}, + true = ets:insert(couch_dbs, Entry#entry{req_type = Req}), {noreply, Server}; [_AlreadyRunningDb] -> {reply, file_exists, Server} @@ -450,18 +456,17 @@ handle_call({delete, DbName, Options}, _From, Server) -> Server2 = case ets:lookup(couch_dbs, DbName) of [] -> Server; - [#db{main_pid=Pid, compactor_pid=Froms} = Db] when is_list(Froms) -> - % icky hack of field values - compactor_pid used to store clients + [#entry{pid = Pid, waiters = Waiters} = Entry] when is_list(Waiters) -> true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, Pid), exit(Pid, kill), - [gen_server:reply(F, not_found) || F <- Froms], - db_closed(Server, Db#db.options); - [#db{main_pid=Pid} = Db] -> + [gen_server:reply(Waiter, not_found) || Waiter <- Waiters], + db_closed(Server, Entry#entry.db_options); + [#entry{pid = Pid} = Entry] -> true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, Pid), exit(Pid, kill), - db_closed(Server, Db#db.options) + db_closed(Server, Entry#entry.db_options) end, %% Delete any leftover compaction files. If we don't do this a @@ -487,11 +492,12 @@ handle_call({delete, DbName, Options}, _From, Server) -> Error -> {reply, Error, Server} end; -handle_call({db_updated, #db{}=Db}, _From, Server0) -> - #db{name = DbName, instance_start_time = StartTime} = Db, - Server = try ets:lookup_element(couch_dbs, DbName, #db.instance_start_time) of +handle_call({db_updated, Db}, _From, Server0) -> + DbName = couch_db:name(Db), + StartTime = couch_db:get_instance_start_time(Db), + Server = try ets:lookup_element(couch_dbs, DbName, #entry.start_time) of StartTime -> - true = ets:insert(couch_dbs, Db), + true = ets:update_element(couch_dbs, DbName, {#entry.db, Db}), Lru = case couch_db:is_system_db(Db) of false -> couch_lru:update(DbName, Server0#server.lru); true -> Server0#server.lru @@ -519,22 +525,19 @@ handle_info({'EXIT', _Pid, config_change}, Server) -> handle_info({'EXIT', Pid, Reason}, Server) -> case ets:lookup(couch_dbs_pid_to_name, Pid) of [{Pid, DbName}] -> - [#db{compactor_pid=Froms}=Db] = ets:lookup(couch_dbs, DbName), + [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs, DbName), if Reason /= snappy_nif_not_loaded -> ok; true -> Msg = io_lib:format("To open the database `~s`, Apache CouchDB " "must be built with Erlang OTP R13B04 or higher.", [DbName]), couch_log:error(Msg, []) end, couch_log:info("db ~s died with reason ~p", [DbName, Reason]), - % icky hack of field values - compactor_pid used to store clients - if is_list(Froms) -> - [gen_server:reply(From, Reason) || From <- Froms]; - true -> - ok + if not is_list(Waiters) -> ok; true -> + [gen_server:reply(Waiter, Reason) || Waiter <- Waiters] end, true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, Pid), - {noreply, db_closed(Server, Db#db.options)}; + {noreply, db_closed(Server, Entry#entry.db_options)}; [] -> {noreply, Server} end; diff --git a/src/couch/src/couch_server_int.hrl b/src/couch/src/couch_server_int.hrl new file mode 100644 index 0000000000..537a6abb91 --- /dev/null +++ b/src/couch/src/couch_server_int.hrl @@ -0,0 +1,23 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-record(entry, { + name, + db, + pid, + lock, + waiters, + req_type, + db_options, + start_time +}). diff --git a/src/couch/src/couch_users_db.erl b/src/couch/src/couch_users_db.erl index 6f7b9af73f..c7b41f1fca 100644 --- a/src/couch/src/couch_users_db.erl +++ b/src/couch/src/couch_users_db.erl @@ -39,8 +39,8 @@ % -> 404 // Not Found % Else % -> save_doc -before_doc_update(Doc, #db{user_ctx = UserCtx} = Db) -> - #user_ctx{name=Name} = UserCtx, +before_doc_update(Doc, Db) -> + #user_ctx{name=Name} = couch_db:get_user_ctx(Db), DocName = get_doc_name(Doc), case (catch couch_db:check_is_admin(Db)) of ok -> @@ -108,8 +108,8 @@ after_doc_read(#doc{id = <>} = Doc, Db) -> throw({forbidden, <<"Only administrators can view design docs in the users database.">>}) end; -after_doc_read(Doc, #db{user_ctx = UserCtx} = Db) -> - #user_ctx{name=Name} = UserCtx, +after_doc_read(Doc, Db) -> + #user_ctx{name=Name} = couch_db:get_user_ctx(Db), DocName = get_doc_name(Doc), case (catch couch_db:check_is_admin(Db)) of ok -> diff --git a/src/couch/src/couch_util.erl b/src/couch/src/couch_util.erl index 6001ae2e43..d688c126f0 100644 --- a/src/couch/src/couch_util.erl +++ b/src/couch/src/couch_util.erl @@ -198,7 +198,9 @@ json_apply_field({Key, NewValue}, [{OtherKey, OtherVal} | Headers], Acc) -> json_apply_field({Key, NewValue}, [], Acc) -> {[{Key, NewValue}|Acc]}. -json_user_ctx(#db{name=ShardName, user_ctx=Ctx}) -> +json_user_ctx(Db) -> + ShardName = couch_db:name(Db), + Ctx = couch_db:get_user_ctx(Db), {[{<<"db">>, mem3:dbname(ShardName)}, {<<"name">>,Ctx#user_ctx.name}, {<<"roles">>,Ctx#user_ctx.roles}]}. @@ -455,9 +457,7 @@ encode_doc_id(Id) -> url_encode(Id). -with_db(Db, Fun) when is_record(Db, db) -> - Fun(Db); -with_db(DbName, Fun) -> +with_db(DbName, Fun) when is_binary(DbName) -> case couch_db:open_int(DbName, [?ADMIN_CTX]) of {ok, Db} -> try @@ -467,6 +467,13 @@ with_db(DbName, Fun) -> end; Else -> throw(Else) + end; +with_db(Db, Fun) -> + case couch_db:is_db(Db) of + true -> + Fun(Db); + false -> + erlang:error({invalid_db, Db}) end. rfc1123_date() -> diff --git a/src/couch/src/test_util.erl b/src/couch/src/test_util.erl index 3c4d17060f..f879d8f518 100644 --- a/src/couch/src/test_util.erl +++ b/src/couch/src/test_util.erl @@ -13,6 +13,8 @@ -module(test_util). -include_lib("couch/include/couch_eunit.hrl"). +-include("couch_db.hrl"). +-include("couch_db_int.hrl"). -export([init_code_path/0]). -export([source_file/1, build_file/1]). @@ -32,6 +34,8 @@ -export([start/1, start/2, start/3, stop/1]). +-export([fake_db/1]). + -record(test_context, {mocked = [], started = [], module}). -define(DEFAULT_APPS, @@ -229,6 +233,16 @@ stop(#test_context{mocked = Mocked, started = Apps}) -> meck:unload(Mocked), stop_applications(Apps). +fake_db(Fields) -> + Indexes = lists:zip( + record_info(fields, db), + lists:seq(2, record_info(size, db)) + ), + lists:foldl(fun({FieldName, Value}, Acc) -> + Idx = couch_util:get_value(FieldName, Indexes), + setelement(Idx, Acc, Value) + end, #db{}, Fields). + now_us() -> {MegaSecs, Secs, MicroSecs} = now(), (MegaSecs * 1000000 + Secs) * 1000000 + MicroSecs. diff --git a/src/couch/test/couch_auth_cache_tests.erl b/src/couch/test/couch_auth_cache_tests.erl index 76179dea06..08aecd1563 100644 --- a/src/couch/test/couch_auth_cache_tests.erl +++ b/src/couch/test/couch_auth_cache_tests.erl @@ -265,7 +265,7 @@ hash_password(Password) -> shutdown_db(DbName) -> {ok, AuthDb} = couch_db:open_int(DbName, [?ADMIN_CTX]), ok = couch_db:close(AuthDb), - couch_util:shutdown_sync(AuthDb#db.main_pid), + couch_util:shutdown_sync(couch_db:get_pid(AuthDb)), ok = timer:sleep(1000). get_doc_rev(DbName, UserName) -> diff --git a/src/couch/test/couch_changes_tests.erl b/src/couch/test/couch_changes_tests.erl index 3c0e5f69ab..494d90fe3c 100644 --- a/src/couch/test/couch_changes_tests.erl +++ b/src/couch/test/couch_changes_tests.erl @@ -645,7 +645,7 @@ should_filter_by_user_ctx({DbName, _}) -> ]}), ChArgs = #changes_args{filter = "app/valid"}, UserCtx = #user_ctx{name = <<"doc3">>, roles = []}, - DbRec = #db{name = DbName, user_ctx = UserCtx}, + {ok, DbRec} = couch_db:clustered_db(DbName, UserCtx), Req = {json_req, {[{ <<"userCtx">>, couch_util:json_user_ctx(DbRec) }]}}, diff --git a/src/couch/test/couch_db_plugin_tests.erl b/src/couch/test/couch_db_plugin_tests.erl index ea9b230b16..94dd3dfa5e 100644 --- a/src/couch/test/couch_db_plugin_tests.erl +++ b/src/couch/test/couch_db_plugin_tests.erl @@ -43,6 +43,7 @@ data_providers() -> []. data_subscriptions() -> []. processes() -> []. notify(_, _, _) -> ok. +fake_db() -> element(2, couch_db:clustered_db(fake, totes_fake)). setup() -> couch_tests:setup([ @@ -133,33 +134,33 @@ validate_dbname_pass() -> before_doc_update_match() -> ?assertMatch( {true, [before_doc_update, doc]}, - couch_db_plugin:before_doc_update(#db{}, {true, [doc]})). + couch_db_plugin:before_doc_update(fake_db(), {true, [doc]})). before_doc_update_no_match() -> ?assertMatch( {false, [doc]}, - couch_db_plugin:before_doc_update(#db{}, {false, [doc]})). + couch_db_plugin:before_doc_update(fake_db(), {false, [doc]})). before_doc_update_throw() -> ?assertThrow( before_doc_update, - couch_db_plugin:before_doc_update(#db{}, {fail, [doc]})). + couch_db_plugin:before_doc_update(fake_db(), {fail, [doc]})). after_doc_read_match() -> ?assertMatch( {true, [after_doc_read, doc]}, - couch_db_plugin:after_doc_read(#db{}, {true, [doc]})). + couch_db_plugin:after_doc_read(fake_db(), {true, [doc]})). after_doc_read_no_match() -> ?assertMatch( {false, [doc]}, - couch_db_plugin:after_doc_read(#db{}, {false, [doc]})). + couch_db_plugin:after_doc_read(fake_db(), {false, [doc]})). after_doc_read_throw() -> ?assertThrow( after_doc_read, - couch_db_plugin:after_doc_read(#db{}, {fail, [doc]})). + couch_db_plugin:after_doc_read(fake_db(), {fail, [doc]})). validate_docid_match() -> diff --git a/src/couch/test/couch_lru_tests.erl b/src/couch/test/couch_lru_tests.erl index 15598358f0..e0087e3d6c 100644 --- a/src/couch/test/couch_lru_tests.erl +++ b/src/couch/test/couch_lru_tests.erl @@ -14,11 +14,12 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). +-include_lib("couch/src/couch_server_int.hrl"). setup() -> ok = meck:new(couch_db, [passthrough]), - ets:new(couch_dbs, [set, public, named_table, {keypos, #db.name}]), + ets:new(couch_dbs, [set, public, named_table, {keypos, #entry.name}]), ets:new(couch_dbs_pid_to_name, [set, public, named_table]), couch_lru:new(). @@ -101,7 +102,7 @@ close_test_() -> }. add_record(Lru, Key, Pid) -> - true = ets:insert(couch_dbs, #db{name = Key, main_pid = Pid}), + true = ets:insert(couch_dbs, #entry{name = Key, pid = Pid}), true = ets:insert(couch_dbs_pid_to_name, {Pid, Key}), {ok, couch_lru:insert(Key, Lru)}. diff --git a/src/couch/test/couch_server_tests.erl b/src/couch/test/couch_server_tests.erl index c8f8381d7a..c52b3f6b0c 100644 --- a/src/couch/test/couch_server_tests.erl +++ b/src/couch/test/couch_server_tests.erl @@ -32,8 +32,9 @@ setup(_) -> setup(). teardown(Db) -> + FilePath = couch_db:get_filepath(Db), (catch couch_db:close(Db)), - (catch file:delete(Db#db.filepath)). + (catch file:delete(FilePath)). teardown(rename, Db) -> config:set("couchdb", "enable_database_recovery", "false", false), @@ -61,7 +62,9 @@ make_test_case(Mod, Funs) -> {foreachx, fun setup/1, fun teardown/2, [{Mod, Fun} || Fun <- Funs]} }. -should_rename_on_delete(_, #db{filepath = Origin, name = DbName}) -> +should_rename_on_delete(_, Db) -> + DbName = couch_db:name(Db), + Origin = couch_db:get_filepath(Db), ?_test(begin ?assert(filelib:is_regular(Origin)), ?assertMatch(ok, couch_server:delete(DbName, [])), @@ -74,7 +77,9 @@ should_rename_on_delete(_, #db{filepath = Origin, name = DbName}) -> ?assert(filelib:is_regular(Renamed)) end). -should_delete(_, #db{filepath = Origin, name = DbName}) -> +should_delete(_, Db) -> + DbName = couch_db:name(Db), + Origin = couch_db:get_filepath(Db), ?_test(begin ?assert(filelib:is_regular(Origin)), ?assertMatch(ok, couch_server:delete(DbName, [])), diff --git a/src/couch/test/couchdb_compaction_daemon_tests.erl b/src/couch/test/couchdb_compaction_daemon_tests.erl index 25d9b131ec..c2920ed73a 100644 --- a/src/couch/test/couchdb_compaction_daemon_tests.erl +++ b/src/couch/test/couchdb_compaction_daemon_tests.erl @@ -182,7 +182,7 @@ update(DbName) -> lists:foreach(fun(_) -> Doc = couch_doc:from_json_obj({[{<<"_id">>, couch_uuids:new()}]}), {ok, _} = couch_db:update_docs(Db, [Doc]), - query_view(Db#db.name) + query_view(couch_db:name(Db)) end, lists:seq(1, 200)), couch_db:close(Db). @@ -220,7 +220,7 @@ spawn_compaction_monitor(DbName) -> {Pid, Ref} = spawn_monitor(fun() -> DaemonPid = whereis(couch_compaction_daemon), DbPid = couch_util:with_db(DbName, fun(Db) -> - Db#db.main_pid + couch_db:get_pid(Db) end), {ok, ViewPid} = couch_index_server:get_index(couch_mrview_index, DbName, <<"_design/foo">>), diff --git a/src/couch/test/couchdb_views_tests.erl b/src/couch/test/couchdb_views_tests.erl index 7b04e85270..69277e62d6 100644 --- a/src/couch/test/couchdb_views_tests.erl +++ b/src/couch/test/couchdb_views_tests.erl @@ -340,7 +340,7 @@ couchdb_1283() -> ]}), {ok, _} = couch_db:update_doc(MDb1, DDoc, []), ok = populate_db(MDb1, 100, 100), - query_view(MDb1#db.name, "foo", "foo"), + query_view(couch_db:name(MDb1), "foo", "foo"), ok = couch_db:close(MDb1), {ok, Db1} = couch_db:create(?tempdb(), [?ADMIN_CTX]), @@ -350,8 +350,8 @@ couchdb_1283() -> {ok, Db3} = couch_db:create(?tempdb(), [?ADMIN_CTX]), ok = couch_db:close(Db3), - Writer1 = spawn_writer(Db1#db.name), - Writer2 = spawn_writer(Db2#db.name), + Writer1 = spawn_writer(couch_db:name(Db1)), + Writer2 = spawn_writer(couch_db:name(Db2)), ?assert(is_process_alive(Writer1)), ?assert(is_process_alive(Writer2)), @@ -361,16 +361,16 @@ couchdb_1283() -> %% Below we do exactly the same as couch_mrview:compact holds inside %% because we need have access to compaction Pid, not a Ref. - %% {ok, MonRef} = couch_mrview:compact(MDb1#db.name, <<"_design/foo">>, + %% {ok, MonRef} = couch_mrview:compact(MDb1, <<"_design/foo">>, %% [monitor]), {ok, Pid} = couch_index_server:get_index( - couch_mrview_index, MDb1#db.name, <<"_design/foo">>), + couch_mrview_index, couch_db:name(MDb1), <<"_design/foo">>), {ok, CPid} = gen_server:call(Pid, compact), %% By suspending compaction process we ensure that compaction won't get %% finished too early to make get_writer_status assertion fail. erlang:suspend_process(CPid), MonRef = erlang:monitor(process, CPid), - Writer3 = spawn_writer(Db3#db.name), + Writer3 = spawn_writer(couch_db:name(Db3)), ?assert(is_process_alive(Writer3)), ?assertEqual({error, all_dbs_active}, get_writer_status(Writer3)), @@ -528,7 +528,8 @@ view_cleanup(DbName) -> count_users(DbName) -> {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), - {monitored_by, Monitors} = erlang:process_info(Db#db.main_pid, monitored_by), + DbPid = couch_db:get_pid(Db), + {monitored_by, Monitors} = erlang:process_info(DbPid, monitored_by), CouchFiles = [P || P <- Monitors, couch_file:process_info(P) =/= undefined], ok = couch_db:close(Db), length(lists:usort(Monitors) -- [self() | CouchFiles]). @@ -554,7 +555,8 @@ restore_backup_db_file(DbName) -> {ok, Db} = couch_db:open_int(DbName, []), ok = couch_db:close(Db), - exit(Db#db.main_pid, shutdown), + DbPid = couch_db:get_pid(Db), + exit(DbPid, shutdown), DbFile = filename:join([DbDir, ?b2l(DbName) ++ ".couch"]), ok = file:delete(DbFile), @@ -575,7 +577,8 @@ wait_db_compact_done(_DbName, 0) -> wait_db_compact_done(DbName, N) -> {ok, Db} = couch_db:open_int(DbName, []), ok = couch_db:close(Db), - case is_pid(Db#db.compactor_pid) of + CompactorPid = couch_db:get_compactor_pid(Db), + case is_pid(CompactorPid) of false -> ok; true -> diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl index 4e86f5e80d..9722dec1bd 100644 --- a/src/couch_index/src/couch_index_server.erl +++ b/src/couch_index/src/couch_index_server.erl @@ -60,11 +60,9 @@ validate(DbName, DDoc) -> lists:foreach(ValidateFun, EnabledIndexers). -get_index(Module, #db{name = <<"shards/", _/binary>> = DbName}, DDoc) -> - case is_record(DDoc, doc) of - true -> get_index(Module, DbName, DDoc, nil); - false -> get_index(Module, DbName, DDoc) - end; +get_index(Module, <<"shards/", _/binary>> = DbName, DDoc) + when is_record(DDoc, doc) -> + get_index(Module, DbName, DDoc, nil); get_index(Module, <<"shards/", _/binary>> = DbName, DDoc) -> {Pid, Ref} = spawn_monitor(fun() -> exit(fabric:open_doc(mem3:dbname(DbName), DDoc, [ejson_body, ?ADMIN_CTX])) @@ -77,9 +75,10 @@ get_index(Module, <<"shards/", _/binary>> = DbName, DDoc) -> erlang:demonitor(Ref, [flush]), {error, timeout} end; - -get_index(Module, DbName, DDoc) -> - get_index(Module, DbName, DDoc, nil). +get_index(Module, DbName, DDoc) when is_binary(DbName) -> + get_index(Module, DbName, DDoc, nil); +get_index(Module, Db, DDoc) -> + get_index(Module, couch_db:name(Db), DDoc). get_index(Module, DbName, DDoc, Fun) when is_binary(DbName) -> diff --git a/src/couch_index/src/couch_index_util.erl b/src/couch_index/src/couch_index_util.erl index 5694641ca0..dcb33b5b0d 100644 --- a/src/couch_index/src/couch_index_util.erl +++ b/src/couch_index/src/couch_index_util.erl @@ -25,7 +25,7 @@ root_dir() -> index_dir(Module, DbName) when is_binary(DbName) -> DbDir = "." ++ binary_to_list(DbName) ++ "_design", filename:join([root_dir(), DbDir, Module]); -index_dir(Module, #db{}=Db) -> +index_dir(Module, Db) -> index_dir(Module, couch_db:name(Db)). diff --git a/src/couch_index/test/couch_index_compaction_tests.erl b/src/couch_index/test/couch_index_compaction_tests.erl index 0787151ae4..18dd9ffe50 100644 --- a/src/couch_index/test/couch_index_compaction_tests.erl +++ b/src/couch_index/test/couch_index_compaction_tests.erl @@ -23,7 +23,8 @@ setup() -> ?assertNot(is_opened(Db)), {Db, IndexerPid}. -fake_index(#db{name = DbName} = Db) -> +fake_index(Db) -> + DbName = couch_db:name(Db), ok = meck:new([test_index], [non_strict]), ok = meck:expect(test_index, init, ['_', '_'], {ok, 10}), ok = meck:expect(test_index, open, fun(_Db, State) -> diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl index 0373919659..1c52cd5f02 100644 --- a/src/couch_mrview/src/couch_mrview.erl +++ b/src/couch_mrview/src/couch_mrview.erl @@ -360,15 +360,12 @@ get_view_info(Db, DDoc, VName) -> %% @doc refresh a view index -refresh(#db{name=DbName}, DDoc) -> - refresh(DbName, DDoc); - -refresh(Db, DDoc) -> - UpdateSeq = couch_util:with_db(Db, fun(WDb) -> +refresh(DbName, DDoc) when is_binary(DbName)-> + UpdateSeq = couch_util:with_db(DbName, fun(WDb) -> couch_db:get_update_seq(WDb) end), - case couch_index_server:get_index(couch_mrview_index, Db, DDoc) of + case couch_index_server:get_index(couch_mrview_index, DbName, DDoc) of {ok, Pid} -> case catch couch_index:get_state(Pid, UpdateSeq) of {ok, _} -> ok; @@ -376,7 +373,10 @@ refresh(Db, DDoc) -> end; Error -> {error, Error} - end. + end; + +refresh(Db, DDoc) -> + refresh(couch_db:name(Db), DDoc). compact(Db, DDoc) -> compact(Db, DDoc, []). @@ -653,11 +653,11 @@ make_meta(Args, UpdateSeq, Base) -> end. -get_total_rows(#db{local_tree = LocalTree} = Db, #mrargs{extra = Extra}) -> +get_total_rows(Db, #mrargs{extra = Extra}) -> case couch_util:get_value(namespace, Extra) of <<"_local">> -> - FoldFun = fun(_, _, Acc) -> {ok, Acc + 1} end, - {ok, _, Total} = couch_btree:foldl(LocalTree, FoldFun, 0), + FoldFun = fun(_, Acc) -> {ok, Acc + 1} end, + {ok, Total} = couch_db:fold_local_docs(Db, FoldFun, 0, []), Total; _ -> {ok, Info} = couch_db:get_db_info(Db), diff --git a/src/couch_mrview/src/couch_mrview_compactor.erl b/src/couch_mrview/src/couch_mrview_compactor.erl index fabe2894c5..3d1a330e7a 100644 --- a/src/couch_mrview/src/couch_mrview_compactor.erl +++ b/src/couch_mrview/src/couch_mrview_compactor.erl @@ -53,8 +53,7 @@ compact(State) -> {ok, Fd} = couch_mrview_util:open_file(CompactFName), ESt = couch_mrview_util:reset_index(Db, Fd, State), - {ok, DbReduce} = couch_btree:full_reduce(Db#db.id_tree), - Count = element(1, DbReduce), + {ok, Count} = couch_db:get_doc_count(Db), {ESt, Count} end), diff --git a/src/couch_mrview/src/couch_mrview_http.erl b/src/couch_mrview/src/couch_mrview_http.erl index 7e3fd78e36..e5638fe83a 100644 --- a/src/couch_mrview/src/couch_mrview_http.erl +++ b/src/couch_mrview/src/couch_mrview_http.erl @@ -103,11 +103,11 @@ handle_view_changes_req(#httpd{path_parts=[_,<<"_design">>,DDocName,<<"_view_cha handle_view_req(#httpd{method='GET', path_parts=[_, _, DDocName, _, VName, <<"_info">>]}=Req, Db, _DDoc) -> - + DbName = couch_db:name(Db), DDocId = <<"_design/", DDocName/binary >>, - {ok, Info} = couch_mrview:get_view_info(Db#db.name, DDocId, VName), + {ok, Info} = couch_mrview:get_view_info(DbName, DDocId, VName), - FinalInfo = [{db_name, Db#db.name}, + FinalInfo = [{db_name, DbName}, {ddoc, DDocId}, {view, VName}] ++ Info, chttpd:send_json(Req, 200, {FinalInfo}); @@ -212,7 +212,7 @@ is_restricted(Db, _) -> couch_db:is_system_db(Db). is_public_fields_configured(Db) -> - DbName = ?b2l(Db#db.name), + DbName = ?b2l(couch_db:name(Db)), case config:get("couch_httpd_auth", "authentication_db", "_users") of DbName -> UsersDbPublic = config:get("couch_httpd_auth", "users_db_public", "false"), @@ -237,7 +237,7 @@ do_all_docs_req(Req, Db, Keys, NS) -> {ok, Resp} = couch_httpd:etag_maybe(Req, fun() -> Max = chttpd:chunked_response_buffer_size(), VAcc0 = #vacc{db=Db, req=Req, threshold=Max}, - DbName = ?b2l(Db#db.name), + DbName = ?b2l(couch_db:name(Db)), UsersDbName = config:get("couch_httpd_auth", "authentication_db", "_users"), diff --git a/src/couch_mrview/src/couch_mrview_show.erl b/src/couch_mrview/src/couch_mrview_show.erl index 1ebc85b3ef..60e8a2c463 100644 --- a/src/couch_mrview/src/couch_mrview_show.erl +++ b/src/couch_mrview/src/couch_mrview_show.erl @@ -364,13 +364,17 @@ json_apply_field({Key, NewValue}, [], Acc) -> % This loads the db info if we have a fully loaded db record, but we might not % have the db locally on this node, so then load the info through fabric. -json_req_obj(Req, #db{main_pid=Pid}=Db) when is_pid(Pid) -> - chttpd_external:json_req_obj(Req, Db); json_req_obj(Req, Db) -> - % use a separate process because we're already in a receive loop, and - % json_req_obj calls fabric:get_db_info() - spawn_monitor(fun() -> exit(chttpd_external:json_req_obj(Req, Db)) end), - receive {'DOWN', _, _, _, JsonReq} -> JsonReq end. + case couch_db:is_clustered(Db) of + true -> + % use a separate process because we're already in a receive loop, + % and json_req_obj calls fabric:get_db_info() + JRO = fun() -> exit(chttpd_external:json_req_obj(Req, Db)) end, + spawn_monitor(JRO), + receive {'DOWN', _, _, _, JsonReq} -> JsonReq end; + false -> + chttpd_external:json_req_obj(Req, Db) + end. last_chunk(Req, undefined) -> chttpd:send_response(Req, 200, [], <<"">>); diff --git a/src/couch_mrview/test/couch_mrview_all_docs_tests.erl b/src/couch_mrview/test/couch_mrview_all_docs_tests.erl index 5e352797f9..bf8eb7e5b5 100644 --- a/src/couch_mrview/test/couch_mrview_all_docs_tests.erl +++ b/src/couch_mrview/test/couch_mrview_all_docs_tests.erl @@ -25,7 +25,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_changes_since_tests.erl b/src/couch_mrview/test/couch_mrview_changes_since_tests.erl index 8b11e3dd03..7e2f321feb 100644 --- a/src/couch_mrview/test/couch_mrview_changes_since_tests.erl +++ b/src/couch_mrview/test/couch_mrview_changes_since_tests.erl @@ -25,7 +25,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_collation_tests.erl b/src/couch_mrview/test/couch_mrview_collation_tests.erl index c4a714d1ed..5c8cb54b19 100644 --- a/src/couch_mrview/test/couch_mrview_collation_tests.erl +++ b/src/couch_mrview/test/couch_mrview_collation_tests.erl @@ -64,7 +64,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_compact_tests.erl b/src/couch_mrview/test/couch_mrview_compact_tests.erl index 40877c80ee..7664becdcc 100644 --- a/src/couch_mrview/test/couch_mrview_compact_tests.erl +++ b/src/couch_mrview/test/couch_mrview_compact_tests.erl @@ -26,7 +26,7 @@ setup() -> teardown(Db) -> meck:unload(), couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl b/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl index 028e0be115..5ac3e7ecf5 100644 --- a/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl +++ b/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl @@ -21,7 +21,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. ddoc_validation_test_() -> diff --git a/src/couch_mrview/test/couch_mrview_index_changes_tests.erl b/src/couch_mrview/test/couch_mrview_index_changes_tests.erl index 8f0c296aa2..2701e0c223 100644 --- a/src/couch_mrview/test/couch_mrview_index_changes_tests.erl +++ b/src/couch_mrview/test/couch_mrview_index_changes_tests.erl @@ -22,7 +22,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. changes_index_test() -> diff --git a/src/couch_mrview/test/couch_mrview_index_info_tests.erl b/src/couch_mrview/test/couch_mrview_index_info_tests.erl index 3f88972eae..c994df9d38 100644 --- a/src/couch_mrview/test/couch_mrview_index_info_tests.erl +++ b/src/couch_mrview/test/couch_mrview_index_info_tests.erl @@ -28,7 +28,7 @@ setup() -> teardown({Db, _}) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_local_docs_tests.erl b/src/couch_mrview/test/couch_mrview_local_docs_tests.erl index c16f53c624..3b38ac5359 100644 --- a/src/couch_mrview/test/couch_mrview_local_docs_tests.erl +++ b/src/couch_mrview/test/couch_mrview_local_docs_tests.erl @@ -25,7 +25,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_map_views_tests.erl b/src/couch_mrview/test/couch_mrview_map_views_tests.erl index 3a199288d9..229af183d6 100644 --- a/src/couch_mrview/test/couch_mrview_map_views_tests.erl +++ b/src/couch_mrview/test/couch_mrview_map_views_tests.erl @@ -24,7 +24,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_red_views_tests.erl b/src/couch_mrview/test/couch_mrview_red_views_tests.erl index 3100785979..b83686113b 100644 --- a/src/couch_mrview/test/couch_mrview_red_views_tests.erl +++ b/src/couch_mrview/test/couch_mrview_red_views_tests.erl @@ -24,7 +24,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index a0d08d7775..8a83a1c1e5 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -60,11 +60,11 @@ db_uri(#httpdb{url = Url}) -> couch_util:url_strip_password(Url); -db_uri(#db{name = Name}) -> - db_uri(Name); +db_uri(DbName) when is_binary(DbName) -> + ?b2l(DbName); -db_uri(DbName) -> - ?b2l(DbName). +db_uri(Db) -> + db_uri(couch_db:name(Db)). db_open(Db, Options) -> @@ -149,10 +149,12 @@ get_db_info(#httpdb{} = Db) -> fun(200, _, {Props}) -> {ok, Props} end); -get_db_info(#db{name = DbName, user_ctx = UserCtx}) -> - {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]), - {ok, Info} = couch_db:get_db_info(Db), - couch_db:close(Db), +get_db_info(Db) -> + DbName = couch_db:name(Db), + UserCtx = couch_db:get_user_ctx(Db), + {ok, InfoDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]), + {ok, Info} = couch_db:get_db_info(InfoDb), + couch_db:close(InfoDb), {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}. @@ -172,8 +174,10 @@ get_pending_count(#httpdb{} = Db, Seq) -> send_req(Db, Options, fun(200, _, {Props}) -> {ok, couch_util:get_value(<<"pending">>, Props, null)} end); -get_pending_count(#db{name=DbName}=Db, Seq) when is_number(Seq) -> - {ok, CountDb} = couch_db:open(DbName, [{user_ctx, Db#db.user_ctx}]), +get_pending_count(Db, Seq) when is_number(Seq) -> + DbName = couch_db:name(Db), + UserCtx = couch_db:get_user_ctx(Db), + {ok, CountDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]), Pending = couch_db:count_changes_since(CountDb, Seq), couch_db:close(CountDb), {ok, Pending}. @@ -185,7 +189,8 @@ get_view_info(#httpdb{} = Db, DDocId, ViewName) -> {VInfo} = couch_util:get_value(<<"view_index">>, Props, {[]}), {ok, VInfo} end); -get_view_info(#db{name = DbName}, DDocId, ViewName) -> +get_view_info(Db, DDocId, ViewName) -> + DbName = couch_db:name(Db), {ok, VInfo} = couch_mrview:get_view_info(DbName, DDocId, ViewName), {ok, [{couch_util:to_binary(K), V} || {K, V} <- VInfo]}. diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index a49d692d9c..2fd5356d52 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -121,7 +121,7 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) -> ok. --spec ensure_rep_db_exists() -> {ok, #db{}}. +-spec ensure_rep_db_exists() -> {ok, Db::any()}. ensure_rep_db_exists() -> Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db, nologifmissing]) of @@ -621,11 +621,14 @@ ssl_verify_options(false) -> [{verify, verify_none}]. --spec before_doc_update(#doc{}, #db{}) -> #doc{}. +-spec before_doc_update(#doc{}, Db::any()) -> #doc{}. before_doc_update(#doc{id = <>} = Doc, _Db) -> Doc; -before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) -> - #user_ctx{roles = Roles, name = Name} = UserCtx, +before_doc_update(#doc{body = {Body}} = Doc, Db) -> + #user_ctx{ + roles = Roles, + name = Name + } = couch_db:get_user_ctx(Db), case lists:member(<<"_replicator">>, Roles) of true -> Doc; @@ -649,11 +652,11 @@ before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) -> end. --spec after_doc_read(#doc{}, #db{}) -> #doc{}. +-spec after_doc_read(#doc{}, Db::any()) -> #doc{}. after_doc_read(#doc{id = <>} = Doc, _Db) -> Doc; -after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) -> - #user_ctx{name = Name} = UserCtx, +after_doc_read(#doc{body = {Body}} = Doc, Db) -> + #user_ctx{name = Name} = couch_db:get_user_ctx(Db), case (catch couch_db:check_is_admin(Db)) of ok -> Doc; diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 3253ce526c..49fd11402d 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -257,16 +257,21 @@ handle_call({report_seq_done, Seq, StatsInc}, From, update_task(NewState), {noreply, NewState}. - -handle_cast({db_compacted, DbName}, - #rep_state{source = #db{name = DbName} = Source} = State) -> - {ok, NewSource} = couch_db:reopen(Source), - {noreply, State#rep_state{source = NewSource}}; - -handle_cast({db_compacted, DbName}, - #rep_state{target = #db{name = DbName} = Target} = State) -> - {ok, NewTarget} = couch_db:reopen(Target), - {noreply, State#rep_state{target = NewTarget}}; +handle_cast({db_compacted, DbName}, State) -> + #rep_state{ + source = Source, + target = Target + } = State, + SourceName = couch_replicator_utils:local_db_name(Source), + TargetName = couch_replicator_utils:local_db_name(Target), + case DbName of + SourceName -> + {ok, NewSource} = couch_db:reopen(Source), + {noreply, State#rep_state{source = NewSource}}; + TargetName -> + {ok, NewTarget} = couch_db:reopen(Target), + {noreply, State#rep_state{target = NewTarget}} + end; handle_cast(checkpoint, State) -> case do_checkpoint(State) of @@ -891,10 +896,10 @@ has_session_id(SessionId, [{Props} | Rest]) -> end. -db_monitor(#db{} = Db) -> - couch_db:monitor(Db); -db_monitor(_HttpDb) -> - nil. +db_monitor(#httpdb{}) -> + nil; +db_monitor(Db) -> + couch_db:monitor(Db). get_pending_count(St) -> diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index 05836d4836..01881e4232 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -16,6 +16,7 @@ parse_rep_doc/2, open_db/1, close_db/1, + local_db_name/1, start_db_compaction_notifier/2, stop_db_compaction_notifier/1, replication_id/2, @@ -35,6 +36,7 @@ -include_lib("couch/include/couch_db.hrl"). -include("couch_replicator.hrl"). +-include("couch_replicator_api_wrap.hrl"). -import(couch_util, [ get_value/2, @@ -42,26 +44,35 @@ ]). -open_db(#db{name = Name, user_ctx = UserCtx}) -> - {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | []]), - Db; -open_db(HttpDb) -> - HttpDb. +open_db(#httpdb{} = HttpDb) -> + HttpDb; +open_db(Db) -> + DbName = couch_db:name(Db), + UserCtx = couch_db:get_user_ctx(Db), + {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]), + NewDb. -close_db(#db{} = Db) -> - couch_db:close(Db); -close_db(_HttpDb) -> - ok. +close_db(#httpdb{}) -> + ok; +close_db(Db) -> + couch_db:close(Db). + + +local_db_name(#httpdb{}) -> + undefined; +local_db_name(Db) -> + couch_db:name(Db). -start_db_compaction_notifier(#db{name = DbName}, Server) -> +start_db_compaction_notifier(#httpdb{}, _) -> + nil; +start_db_compaction_notifier(Db, Server) -> + DbName = couch_db:name(Db), {ok, Pid} = couch_event:link_listener( ?MODULE, handle_db_event, Server, [{dbname, DbName}] ), - Pid; -start_db_compaction_notifier(_, _) -> - nil. + Pid. stop_db_compaction_notifier(nil) -> diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 1907879c6f..fd657b710c 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -67,16 +67,16 @@ -start_link(Cp, #db{} = Source, Target, ChangesManager, _MaxConns) -> +start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) -> + gen_server:start_link( + ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []); + +start_link(Cp, Source, Target, ChangesManager, _MaxConns) -> Pid = spawn_link(fun() -> erlang:put(last_stats_report, now()), queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager) end), - {ok, Pid}; - -start_link(Cp, Source, Target, ChangesManager, MaxConns) -> - gen_server:start_link( - ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []). + {ok, Pid}. init({Cp, Source, Target, ChangesManager, MaxConns}) -> @@ -139,15 +139,23 @@ handle_call(flush, {Pid, _} = From, {noreply, State2#state{flush_waiter = From}}. -handle_cast({db_compacted, DbName}, - #state{source = #db{name = DbName} = Source} = State) -> - {ok, NewSource} = couch_db:reopen(Source), - {noreply, State#state{source = NewSource}}; - -handle_cast({db_compacted, DbName}, - #state{target = #db{name = DbName} = Target} = State) -> - {ok, NewTarget} = couch_db:reopen(Target), - {noreply, State#state{target = NewTarget}}; +handle_cast({db_compacted, DbName} = Msg, #state{} = State) -> + #state{ + source = Source, + target = Target + } = State, + SourceName = couch_replicator_utils:local_db_name(Source), + TargetName = couch_replicator_utils:local_db_name(Target), + case DbName of + SourceName -> + {ok, NewSource} = couch_db:reopen(Source), + {noreply, State#state{source = NewSource}}; + TargetName -> + {ok, NewTarget} = couch_db:reopen(Target), + {noreply, State#state{target = NewTarget}}; + _Else -> + {stop, {unexpected_async_call, Msg}, State} + end; handle_cast(Msg, State) -> {stop, {unexpected_async_call, Msg}, State}. @@ -223,15 +231,15 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> Target2 = open_db(Target), {IdRevs, Stats0} = find_missing(Changes, Target2), case Source of - #db{} -> - Source2 = open_db(Source), - Stats = local_process_batch( - IdRevs, Cp, Source2, Target2, #batch{}, Stats0), - close_db(Source2); #httpdb{} -> ok = gen_server:call(Parent, {add_stats, Stats0}, infinity), remote_process_batch(IdRevs, Parent), - {ok, Stats} = gen_server:call(Parent, flush, infinity) + {ok, Stats} = gen_server:call(Parent, flush, infinity); + _Db -> + Source2 = open_db(Source), + Stats = local_process_batch( + IdRevs, Cp, Source2, Target2, #batch{}, Stats0), + close_db(Source2) end, close_db(Target2), ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), @@ -248,7 +256,7 @@ local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, St case Target of #httpdb{} -> couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]); - #db{} -> + _Db -> couch_log:debug("Worker flushing doc batch of ~p docs", [Size]) end, Stats2 = flush_docs(Target, Docs), @@ -363,7 +371,7 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) -> case {Target, Size > 0} of {#httpdb{}, true} -> couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]); - {#db{}, true} -> + {_Db, true} -> couch_log:debug("Worker flushing doc batch of ~p docs", [Size]); _ -> ok @@ -425,7 +433,7 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> end end; -maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) -> +maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) -> case SizeAcc + 1 of SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN -> couch_log:debug("Worker flushing doc batch of ~p docs", [SizeAcc2]), diff --git a/src/couch_replicator/test/couch_replicator_compact_tests.erl b/src/couch_replicator/test/couch_replicator_compact_tests.erl index 3e6bb9e8a4..6378e8cbd0 100644 --- a/src/couch_replicator/test/couch_replicator_compact_tests.erl +++ b/src/couch_replicator/test/couch_replicator_compact_tests.erl @@ -87,8 +87,8 @@ should_all_processes_be_alive(RepPid, Source, Target) -> {ok, SourceDb} = reopen_db(Source), {ok, TargetDb} = reopen_db(Target), ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(SourceDb#db.main_pid)), - ?assert(is_process_alive(TargetDb#db.main_pid)) + ?assert(is_process_alive(couch_db:get_pid(SourceDb))), + ?assert(is_process_alive(couch_db:get_pid(TargetDb))) end). should_run_replication(RepPid, RepId, Source, Target) -> @@ -158,12 +158,12 @@ should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) -> compact_db("source", SourceDb), ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(SourceDb#db.main_pid)), + ?assert(is_process_alive(couch_db:get_pid(SourceDb))), wait_for_compaction("source", SourceDb), compact_db("target", TargetDb), ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(TargetDb#db.main_pid)), + ?assert(is_process_alive(couch_db:get_pid(TargetDb))), wait_for_compaction("target", TargetDb), {ok, SourceDb2} = reopen_db(SourceDb), @@ -174,14 +174,14 @@ should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) -> compact_db("source", SourceDb2), ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(SourceDb2#db.main_pid)), + ?assert(is_process_alive(couch_db:get_pid(SourceDb2))), pause_writer(Writer), wait_for_compaction("source", SourceDb2), resume_writer(Writer), compact_db("target", TargetDb2), ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(TargetDb2#db.main_pid)), + ?assert(is_process_alive(couch_db:get_pid(TargetDb2))), pause_writer(Writer), wait_for_compaction("target", TargetDb2), resume_writer(Writer) @@ -257,14 +257,16 @@ should_compare_databases(Source, Target) -> reopen_db({remote, Db}) -> reopen_db(Db); -reopen_db(#db{name=DbName}) -> - reopen_db(DbName); -reopen_db(DbName) -> +reopen_db(DbName) when is_binary(DbName) -> {ok, Db} = couch_db:open_int(DbName, []), ok = couch_db:close(Db), - {ok, Db}. + {ok, Db}; +reopen_db(Db) -> + reopen_db(couch_db:name(Db)). -compact_db(Type, #db{name = Name}) -> + +compact_db(Type, Db0) -> + Name = couch_db:name(Db0), {ok, Db} = couch_db:open_int(Name, []), {ok, CompactPid} = couch_db:start_compact(Db), MonRef = erlang:monitor(process, CompactPid), @@ -399,7 +401,8 @@ stop_writer(Pid) -> {reason, "Timeout stopping source database writer"}]}) end. -writer_loop(#db{name = DbName}, Parent, Counter) -> +writer_loop(Db0, Parent, Counter) -> + DbName = couch_db:name(Db0), {ok, Data} = file:read_file(?ATTFILE), maybe_pause(Parent, Counter), Doc = couch_doc:from_json_obj({[ diff --git a/src/fabric/include/couch_db_tmp.hrl b/src/fabric/include/couch_db_tmp.hrl deleted file mode 100644 index cd3a047d4a..0000000000 --- a/src/fabric/include/couch_db_tmp.hrl +++ /dev/null @@ -1,296 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --define(LOCAL_DOC_PREFIX, "_local/"). --define(DESIGN_DOC_PREFIX0, "_design"). --define(DESIGN_DOC_PREFIX, "_design/"). - --define(MIN_STR, <<"">>). --define(MAX_STR, <<255>>). % illegal utf string - --define(JSON_ENCODE(V), couch_util:json_encode(V)). --define(JSON_DECODE(V), couch_util:json_decode(V)). - --define(b2l(V), binary_to_list(V)). --define(l2b(V), list_to_binary(V)). - --define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>). - --define(LOG_DEBUG(Format, Args), couch_log:debug(Format, Args)). --define(LOG_INFO(Format, Args), couch_log:notice(Format, Args)). --define(LOG_ERROR(Format, Args), couch_log:error(Format, Args)). - --record(rev_info, - { - rev, - seq = 0, - deleted = false, - body_sp = nil % stream pointer - }). - --record(doc_info, - { - id = <<"">>, - high_seq = 0, - revs = [] % rev_info - }). - --record(full_doc_info, - {id = <<"">>, - update_seq = 0, - deleted = false, - data_size = 0, - rev_tree = [] - }). - --record(httpd, - {mochi_req, - peer, - method, - path_parts, - db_url_handlers, - user_ctx, - req_body = undefined, - design_url_handlers, - auth, - default_fun, - url_handlers - }). - - --record(doc, - { - id = <<"">>, - revs = {0, []}, - - % the json body object. - body = {[]}, - - atts = [], % attachments - - deleted = false, - - % key/value tuple of meta information, provided when using special options: - % couch_db:open_doc(Db, Id, Options). - meta = [] - }). - - --record(att, - { - name, - type, - att_len, - disk_len, % length of the attachment in its identity form - % (that is, without a content encoding applied to it) - % differs from att_len when encoding /= identity - md5= <<>>, - revpos=0, - data, - encoding=identity % currently supported values are: - % identity, gzip - % additional values to support in the future: - % deflate, compress - }). - - --record(user_ctx, - { - name=null, - roles=[], - handler - }). - -% This should be updated anytime a header change happens that requires more -% than filling in new defaults. -% -% As long the changes are limited to new header fields (with inline -% defaults) added to the end of the record, then there is no need to increment -% the disk revision number. -% -% if the disk revision is incremented, then new upgrade logic will need to be -% added to couch_db_updater:init_db. - --define(LATEST_DISK_VERSION, 5). - --record(db_header, - {disk_version = ?LATEST_DISK_VERSION, - update_seq = 0, - unused = 0, - id_tree_state = nil, - seq_tree_state = nil, - local_tree_state = nil, - purge_seq = 0, - purged_docs = nil, - security_ptr = nil, - revs_limit = 1000 - }). - --record(db, - {main_pid = nil, - update_pid = nil, - compactor_pid = nil, - instance_start_time, % number of microsecs since jan 1 1970 as a binary string - fd, - fd_monitor, - header = #db_header{}, - committed_update_seq, - id_tree, - seq_tree, - local_tree, - update_seq, - name, - filepath, - validate_doc_funs = undefined, - security = [], - security_ptr = nil, - user_ctx = #user_ctx{}, - waiting_delayed_commit = nil, - revs_limit = 1000, - fsync_options = [], - is_sys_db = false - }). - - --record(view_query_args, { - start_key, - end_key, - start_docid = ?MIN_STR, - end_docid = ?MAX_STR, - - direction = fwd, - inclusive_end=true, % aka a closed-interval - - limit = 10000000000, % Huge number to simplify logic - skip = 0, - - group_level = 0, - - view_type = nil, - include_docs = false, - stale = false, - multi_get = false, - callback = nil, - list = nil, - keys = nil, - sorted = true, - extra = [] -}). - --record(view_fold_helper_funs, { - reduce_count, - passed_end, - start_response, - send_row -}). - --record(reduce_fold_helper_funs, { - start_response, - send_row -}). - --record(extern_resp_args, { - code = 200, - stop = false, - data = <<>>, - ctype = "application/json", - headers = [], - json = nil -}). - --record(group, { - sig=nil, - dbname, - fd=nil, - name, - def_lang, - design_options=[], - views, - id_btree=nil, - current_seq=0, - purge_seq=0, - query_server=nil, - waiting_delayed_commit=nil, - atts=[] - }). - --record(view, - {id_num, - map_names=[], - def, - btree=nil, - reduce_funs=[], - dbcopies=[], - options=[] - }). - --record(index_header, - {seq=0, - purge_seq=0, - id_btree_state=nil, - view_states=nil - }). - --record(http_db, { - url, - auth = [], - resource = "", - headers = [ - {"User-Agent", "CouchDB/"++couch:version()}, - {"Accept", "application/json"}, - {"Accept-Encoding", "gzip"} - ], - qs = [], - method = get, - body = nil, - options = [ - {response_format,binary}, - {inactivity_timeout, 30000} - ], - retries = 10, - pause = 500, - conn = nil -}). - -% small value used in revision trees to indicate the revision isn't stored --define(REV_MISSING, []). - --record(changes_args, { - feed = "normal", - dir = fwd, - since = "0", - limit = 1000000000000000, - style = main_only, - heartbeat, - timeout, - filter, - include_docs = false -}). - --record(proc, { - pid, - lang, - client = nil, - ddoc_keys = [], - prompt_fun, - set_timeout_fun, - stop_fun, - data_fun -}). - --record(leaf, { - deleted, - ptr, - seq, - size = 0, - atts = [] -}). diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index f98a5c04ac..2b87e461ef 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -38,7 +38,7 @@ -include_lib("fabric/include/fabric.hrl"). --type dbname() :: (iodata() | #db{}). +-type dbname() :: (iodata() | tuple()). -type docid() :: iodata(). -type revision() :: {integer(), binary()}. -type callback() :: fun((any(), any()) -> {ok | stop, any()}). @@ -476,10 +476,12 @@ dbname(DbName) when is_list(DbName) -> list_to_binary(DbName); dbname(DbName) when is_binary(DbName) -> DbName; -dbname(#db{name=Name}) -> - Name; -dbname(DbName) -> - erlang:error({illegal_database_name, DbName}). +dbname(Db) -> + try + couch_db:name(Db) + catch error:badarg -> + erlang:error({illegal_database_name, Db}) + end. name(Thing) -> couch_util:to_binary(Thing). diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 80b110a24e..ad96255d6a 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -16,8 +16,9 @@ -export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3, get_missing_revs/2, get_missing_revs/3, update_docs/3]). -export([all_docs/3, changes/3, map_view/4, reduce_view/4, group_info/2]). --export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3, - set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]). +-export([create_db/1, create_db/2, delete_db/1, reset_validation_funs/1, + set_security/3, set_revs_limit/3, create_shard_db_doc/2, + delete_shard_db_doc/2]). -export([get_all_security/2, open_shard/2]). -export([compact/1, compact/2]). @@ -38,7 +39,8 @@ }). %% rpc endpoints -%% call to with_db will supply your M:F with a #db{} and then remaining args +%% call to with_db will supply your M:F with a Db instance +%% and then remaining args %% @equiv changes(DbName, Args, StartSeq, []) changes(DbName, Args, StartSeq) -> @@ -76,13 +78,13 @@ changes(DbName, Options, StartVector, DbOptions) -> args = Args, options = Options, pending = couch_db:count_changes_since(Db, StartSeq), - epochs = get_epochs(Db) + epochs = couch_db:get_epochs(Db) }, try {ok, #cacc{seq=LastSeq, pending=Pending, epochs=Epochs}} = couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0), rexi:stream_last({complete, [ - {seq, {LastSeq, uuid(Db), owner_of(LastSeq, Epochs)}}, + {seq, {LastSeq, uuid(Db), couch_db:owner_of(Epochs, LastSeq)}}, {pending, Pending} ]}) after @@ -144,7 +146,10 @@ fix_skip_and_limit(Args) -> Args#mrargs{skip=0, limit=Skip+Limit}. create_db(DbName) -> - rexi:reply(case couch_server:create(DbName, []) of + create_db(DbName, []). + +create_db(DbName, Options) -> + rexi:reply(case couch_server:create(DbName, Options) of {ok, _} -> ok; Error -> @@ -225,7 +230,7 @@ get_missing_revs(DbName, IdRevsList, Options) -> not_found -> {Id, Revs, []} end - end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))}; + end, IdRevsList, couch_db:get_full_doc_infos(Db, Ids))}; Error -> Error end). @@ -249,8 +254,9 @@ group_info(DbName, DDocId, DbOptions) -> reset_validation_funs(DbName) -> case get_or_create_db(DbName, []) of - {ok, #db{main_pid = Pid}} -> - gen_server:cast(Pid, {load_validation_funs, undefined}); + {ok, Db} -> + DbPid = couch_db:get_pid(Db), + gen_server:cast(DbPid, {load_validation_funs, undefined}); _ -> ok end. @@ -358,7 +364,7 @@ changes_enumerator(DocInfo, Acc) -> Opts = if Conflicts -> [conflicts | DocOptions]; true -> DocOptions end, ChangesRow = {change, [ {pending, Pending-1}, - {seq, {Seq, uuid(Db), owner_of(Seq, Epochs)}}, + {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}, {id, Id}, {changes, Results}, {deleted, Del} | @@ -456,79 +462,20 @@ set_io_priority(DbName, Options) -> ok end. -calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) -> - Seq; -calculate_start_seq(Db, Node, {Seq, Uuid}) -> - % Treat the current node as the epoch node - calculate_start_seq(Db, Node, {Seq, Uuid, Node}); -calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) -> - case is_prefix(Uuid, couch_db:get_uuid(Db)) of - true -> - case is_owner(EpochNode, Seq, couch_db:get_epochs(Db)) of - true -> Seq; - false -> 0 - end; - false -> - %% The file was rebuilt, most likely in a different - %% order, so rewind. - 0 - end; -calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) -> - case is_prefix(Uuid, couch_db:get_uuid(Db)) of - true -> - start_seq(get_epochs(Db), OriginalNode, Seq); - false -> + +calculate_start_seq(Db, Node, Seq) -> + case couch_db:calculate_start_seq(Db, Node, Seq) of + N when is_integer(N) -> + N; + {replace, OriginalNode, Uuid, OriginalSeq} -> %% Scan history looking for an entry with %% * target_node == TargetNode %% * target_uuid == TargetUUID %% * target_seq =< TargetSeq %% If such an entry is found, stream from associated source_seq - mem3_rep:find_source_seq(Db, OriginalNode, Uuid, Seq) + mem3_rep:find_source_seq(Db, OriginalNode, Uuid, OriginalSeq) end. -is_prefix(Pattern, Subject) -> - binary:longest_common_prefix([Pattern, Subject]) == size(Pattern). - -is_owner(Node, Seq, Epochs) -> - validate_epochs(Epochs), - Node =:= owner_of(Seq, Epochs). - -owner_of(_Seq, []) -> - undefined; -owner_of(Seq, [{EpochNode, EpochSeq} | _Rest]) when Seq > EpochSeq -> - EpochNode; -owner_of(Seq, [_ | Rest]) -> - owner_of(Seq, Rest). - -get_epochs(Db) -> - Epochs = couch_db:get_epochs(Db), - validate_epochs(Epochs), - Epochs. - -start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq > EpochSeq -> - %% OrigNode is the owner of the Seq so we can safely stream from there - Seq; -start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq > NewSeq -> - %% We transferred this file before Seq was written on OrigNode, so we need - %% to stream from the beginning of the next epoch. Note that it is _not_ - %% necessary for the current node to own the epoch beginning at NewSeq - NewSeq; -start_seq([_ | Rest], OrigNode, Seq) -> - start_seq(Rest, OrigNode, Seq); -start_seq([], OrigNode, Seq) -> - erlang:error({epoch_mismatch, OrigNode, Seq}). - -validate_epochs(Epochs) -> - %% Assert uniqueness. - case length(Epochs) == length(lists:ukeysort(2, Epochs)) of - true -> ok; - false -> erlang:error(duplicate_epoch) - end, - %% Assert order. - case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of - true -> ok; - false -> erlang:error(epoch_order) - end. uuid(Db) -> Uuid = couch_db:get_uuid(Db), @@ -540,30 +487,6 @@ uuid_prefix_len() -> -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -calculate_start_seq_test() -> - %% uuid mismatch is always a rewind. - Hdr1 = couch_db_header:new(), - Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]), - ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})), - %% uuid matches and seq is owned by node. - Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]), - ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})), - %% uuids match but seq is not owned by node. - Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]), - ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})), - %% return integer if we didn't get a vector. - ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)). - -is_owner_test() -> - ?assertNot(is_owner(foo, 1, [])), - ?assertNot(is_owner(foo, 1, [{foo, 1}])), - ?assert(is_owner(foo, 2, [{foo, 1}])), - ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])), - ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])), - ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])), - ?assertError(duplicate_epoch, is_owner(foo, 1, [{foo, 1}, {bar, 1}])), - ?assertError(epoch_order, is_owner(foo, 1, [{foo, 100}, {bar, 200}])). - maybe_filtered_json_doc_no_filter_test() -> Body = {[{<<"a">>, 1}]}, Doc = #doc{id = <<"1">>, revs = {1, [<<"r1">>]}, body = Body}, diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 7e3f23e68c..c5aef03778 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -302,7 +302,8 @@ path_ends_with(Path, Suffix) -> fake_db(DbName, Opts) -> {SecProps} = fabric:get_security(DbName), % as admin UserCtx = couch_util:get_value(user_ctx, Opts, #user_ctx{}), - #db{name = DbName, security = SecProps, user_ctx = UserCtx}. + {ok, Db} = couch_db:clustered_db(DbName, UserCtx, SecProps), + Db. %% test function kv(Item, Count) -> diff --git a/src/mango/src/mango_crud.erl b/src/mango/src/mango_crud.erl index 68c9d6cc45..41a4d143de 100644 --- a/src/mango/src/mango_crud.erl +++ b/src/mango/src/mango_crud.erl @@ -111,7 +111,7 @@ maybe_add_user_ctx(Db, Opts) -> {user_ctx, _} -> Opts; false -> - [{user_ctx, Db#db.user_ctx} | Opts] + [{user_ctx, couch_db:get_user_ctx(Db)} | Opts] end. diff --git a/src/mango/src/mango_cursor_text.erl b/src/mango/src/mango_cursor_text.erl index 96e365a49f..dfe942c386 100644 --- a/src/mango/src/mango_cursor_text.erl +++ b/src/mango/src/mango_cursor_text.erl @@ -50,7 +50,7 @@ create(Db, Indexes, Selector, Opts0) -> ?MANGO_ERROR(multiple_text_indexes) end, - Opts = unpack_bookmark(Db#db.name, Opts0), + Opts = unpack_bookmark(couch_db:name(Db), Opts0), DreyfusLimit = get_dreyfus_limit(), Limit = erlang:min(DreyfusLimit, couch_util:get_value(limit, Opts, mango_opts:default_limit())), @@ -96,7 +96,7 @@ execute(Cursor, UserFun, UserAcc) -> }, CAcc = #cacc{ selector = Selector, - dbname = Db#db.name, + dbname = couch_db:name(Db), ddocid = ddocid(Idx), idx_name = mango_idx:name(Idx), bookmark = get_bookmark(Opts), diff --git a/src/mango/src/mango_httpd.erl b/src/mango/src/mango_httpd.erl index a088276493..cc6cbd5d86 100644 --- a/src/mango/src/mango_httpd.erl +++ b/src/mango/src/mango_httpd.erl @@ -190,7 +190,8 @@ handle_find_req(Req, _Db) -> set_user_ctx(#httpd{user_ctx=Ctx}, Db) -> - Db#db{user_ctx=Ctx}. + {ok, NewDb} = couch_db:set_user_ctx(Db, Ctx), + NewDb. get_idx_w_opts(Opts) -> diff --git a/src/mango/src/mango_idx.erl b/src/mango/src/mango_idx.erl index bc88b970cd..1c3924aaaf 100644 --- a/src/mango/src/mango_idx.erl +++ b/src/mango/src/mango_idx.erl @@ -290,12 +290,12 @@ idx_mod(#idx{type = <<"text">>}) -> end. -db_to_name(#db{name=Name}) -> - Name; db_to_name(Name) when is_binary(Name) -> Name; db_to_name(Name) when is_list(Name) -> - iolist_to_binary(Name). + iolist_to_binary(Name); +db_to_name(Db) -> + couch_db:name(Db). get_idx_def(Opts) -> diff --git a/src/mango/src/mango_idx_text.erl b/src/mango/src/mango_idx_text.erl index ad9d2e8d7f..f6120a8292 100644 --- a/src/mango/src/mango_idx_text.erl +++ b/src/mango/src/mango_idx_text.erl @@ -344,8 +344,9 @@ indexable_fields(Fields, {op_default, _}) -> [<<"$default">> | Fields]. -maybe_reject_index_all_req({Def}, #db{name=DbName, user_ctx=Ctx}) -> - User = Ctx#user_ctx.name, +maybe_reject_index_all_req({Def}, Db) -> + DbName = couch_db:name(Db), + #user_ctx{name = User} = couch_db:get_user_ctx(Db), Fields = couch_util:get_value(fields, Def), case {Fields, forbid_index_all()} of {all_fields, "true"} -> @@ -374,7 +375,9 @@ setup() -> end), %default index all def that generates {fields, all_fields} Index = #idx{def={[]}}, - Db = #db{name = <<"testdb">>, user_ctx=#user_ctx{name = <<"u1">>}}, + DbName = <<"testdb">>, + UserCtx = #user_ctx{name = <<"u1">>}, + {ok, Db} = couch_db:clustered_db(DbName, UserCtx), {Index, Db}. diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl index 405d7e5fa1..e9c1473bc1 100644 --- a/src/mem3/src/mem3.erl +++ b/src/mem3/src/mem3.erl @@ -145,13 +145,13 @@ get_shard(DbName, Node, Range) -> local_shards(DbName) -> mem3_shards:local(DbName). -shard_suffix(#db{name=DbName}) -> - shard_suffix(DbName); -shard_suffix(DbName0) -> +shard_suffix(DbName0) when is_binary(DbName0) -> Shard = hd(shards(DbName0)), <<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>> = Shard#shard.name, - filename:extension(binary_to_list(DbName)). + filename:extension(binary_to_list(DbName)); +shard_suffix(Db) -> + shard_suffix(couch_db:name(Db)). fold_shards(Fun, Acc) -> mem3_shards:fold(Fun, Acc). @@ -292,10 +292,11 @@ group_by_range(Shards) -> % quorum functions -quorum(#db{name=DbName}) -> - quorum(DbName); -quorum(DbName) -> - n(DbName) div 2 + 1. +quorum(DbName) when is_binary(DbName) -> + n(DbName) div 2 + 1; +quorum(Db) -> + quorum(couch_db:name(Db)). + node(#shard{node=Node}) -> Node; diff --git a/src/mem3/src/mem3_httpd.erl b/src/mem3/src/mem3_httpd.erl index 535815862d..571f063702 100644 --- a/src/mem3/src/mem3_httpd.erl +++ b/src/mem3/src/mem3_httpd.erl @@ -32,7 +32,7 @@ handle_membership_req(#httpd{path_parts=[<<"_membership">>]}=Req) -> handle_shards_req(#httpd{method='GET', path_parts=[_DbName, <<"_shards">>]} = Req, Db) -> - DbName = mem3:dbname(Db#db.name), + DbName = mem3:dbname(couch_db:name(Db)), Shards = mem3:shards(DbName), JsonShards = json_shards(Shards, dict:new()), couch_httpd:send_json(Req, {[ @@ -40,7 +40,7 @@ handle_shards_req(#httpd{method='GET', ]}); handle_shards_req(#httpd{method='GET', path_parts=[_DbName, <<"_shards">>, DocId]} = Req, Db) -> - DbName = mem3:dbname(Db#db.name), + DbName = mem3:dbname(couch_db:name(Db)), Shards = mem3:shards(DbName, DocId), {[{Shard, Dbs}]} = json_shards(Shards, dict:new()), couch_httpd:send_json(Req, {[ diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl index f31891a7b9..555389b901 100644 --- a/src/mem3/src/mem3_nodes.erl +++ b/src/mem3/src/mem3_nodes.erl @@ -92,7 +92,7 @@ code_change(_OldVsn, #state{}=State, _Extra) -> initialize_nodelist() -> DbName = config:get("mem3", "nodes_db", "_nodes"), {ok, Db} = mem3_util:ensure_exists(DbName), - {ok, _, Db} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, Db, []), + {ok, _} = couch_db:fold_docs(Db, fun first_fold/2, Db, []), % add self if not already present case ets:lookup(?MODULE, node()) of [_] -> @@ -103,13 +103,13 @@ initialize_nodelist() -> {ok, _} = couch_db:update_doc(Db, Doc, []) end, couch_db:close(Db), - Db#db.update_seq. + couch_db:get_update_seq(Db). -first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) -> +first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) -> {ok, Acc}; -first_fold(#full_doc_info{deleted=true}, _, Acc) -> +first_fold(#full_doc_info{deleted=true}, Acc) -> {ok, Acc}; -first_fold(#full_doc_info{id=Id}=DocInfo, _, Db) -> +first_fold(#full_doc_info{id=Id}=DocInfo, Db) -> {ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo, [ejson_body]), ets:insert(?MODULE, {mem3_util:to_atom(Id), Props}), {ok, Db}. diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index ad7ac55f54..2b3b8a916c 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -170,11 +170,11 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) -> end. -repl(#db{name=DbName, seq_tree=Bt}=Db, Acc0) -> - erlang:put(io_priority, {internal_repl, DbName}), +repl(Db, Acc0) -> + erlang:put(io_priority, {internal_repl, couch_db:name(Db)}), #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}), Fun = fun ?MODULE:changes_enumerator/3, - {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]), + {ok, _, Acc2} = couch_db:enum_docs_since(Db, Seq, Fun, Acc1, []), {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2), {ok, couch_db:count_changes_since(Db, LastSeq)}. @@ -337,7 +337,7 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) -> SrcUUID = couch_db:get_uuid(SrcDb), S = couch_util:encodeBase64Url(couch_crypto:hash(md5, term_to_binary(SrcUUID))), DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>, - FoldFun = fun({DocId, {Rev0, {BodyProps}}}, _, _) -> + FoldFun = fun({DocId, {Rev0, {BodyProps}}}, _) -> TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>), case is_prefix(DocIdPrefix, DocId) of true -> @@ -354,10 +354,10 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) -> end end, Options = [{start_key, DocIdPrefix}], - case couch_btree:fold(SrcDb#db.local_tree, FoldFun, not_found, Options) of - {ok, _, {TgtUUID, Doc}} -> + case couch_db:fold_local_docs(SrcDb, FoldFun, not_found, Options) of + {ok, {TgtUUID, Doc}} -> {ok, TgtUUID, Doc}; - {ok, _, not_found} -> + {ok, not_found} -> {not_found, missing}; Else -> couch_log:error("Error finding replication doc: ~w", [Else]), diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index 93cb99ac9c..c2bd58fdf6 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -84,11 +84,11 @@ load_checkpoint_rpc(DbName, SourceNode, SourceUUID) -> save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) -> erlang:put(io_priority, {internal_repl, DbName}), case get_or_create_db(DbName, [?ADMIN_CTX]) of - {ok, #db{update_seq = TargetSeq} = Db} -> + {ok, Db} -> NewEntry = {[ {<<"target_node">>, atom_to_binary(node(), utf8)}, {<<"target_uuid">>, couch_db:get_uuid(Db)}, - {<<"target_seq">>, TargetSeq} + {<<"target_seq">>, couch_db:get_update_seq(Db)} ] ++ NewEntry0}, Body = {[ {<<"seq">>, SourceSeq}, diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl index 8d9cfb9c74..be7e5aaafc 100644 --- a/src/mem3/src/mem3_shards.erl +++ b/src/mem3/src/mem3_shards.erl @@ -323,7 +323,7 @@ get_update_seq() -> DbName = config:get("mem3", "shards_db", "_dbs"), {ok, Db} = mem3_util:ensure_exists(DbName), couch_db:close(Db), - Db#db.update_seq. + couch_db:get_update_seq(Db). listen_for_changes(Since) -> DbName = config:get("mem3", "shards_db", "_dbs"), @@ -380,7 +380,7 @@ load_shards_from_disk(DbName) when is_binary(DbName) -> couch_db:close(Db) end. -load_shards_from_db(#db{} = ShardDb, DbName) -> +load_shards_from_db(ShardDb, DbName) -> case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of {ok, #doc{body = {Props}}} -> Seq = couch_db:get_update_seq(ShardDb), @@ -659,7 +659,7 @@ t_spawn_writer_in_load_shards_from_db() -> meck:expect(couch_db, get_update_seq, 1, 1), meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()), erlang:register(?MODULE, self()), % register to get cache_insert cast - load_shards_from_db(#db{name = <<"testdb">>}, ?DB), + load_shards_from_db(test_util:fake_db([{name, <<"testdb">>}]), ?DB), meck:validate(couch_db), meck:validate(mem3_util), Cast = receive @@ -746,8 +746,8 @@ mem3_shards_changes_test_() -> { setup_changes() -> - ok = meck:expect(mem3_util, ensure_exists, ['_'], - {ok, #db{name = <<"dbs">>, update_seq = 0}}), + RespDb = test_util:fake_db([{name, <<"dbs">>}, {update_seq, 0}]), + ok = meck:expect(mem3_util, ensure_exists, ['_'], {ok, RespDb}), ok = meck:expect(couch_db, close, ['_'], ok), ok = application:start(config), {ok, Pid} = ?MODULE:start_link(),