Mongodb源码分析–更新记录

在之前的一篇文章中,介绍了assembleResponse函数(位于instance.cpp第224行),它会根据op操作枚举类型来调用相应的crud操作,枚举类型定义如下:



enumOperations {

opReply
=1,/reply. responseTo is set./

dbMsg
=1000,/generic msg command followed by a string/

dbUpdate
=2001,/更新对象/

dbInsert
=2002,

//dbGetByOID = 2003,

dbQuery=2004,

dbGetMore
=2005,

dbDelete
=2006,

dbKillCursors
=2007

};




可以看到dbUpdate = 2001 为更新操作枚举值,下面我们看一下assembleResponse在确定是更新操作时调用的方法,如下:



//instance.cpp文件第224行

assembleResponse( Message&m, DbResponse&dbresponse,constSockAddr&client ) {

.....

try{

if( op==dbInsert ) {//添加记录操作

receivedInsert(m, currentOp);

}

elseif( op==dbUpdate ) {//更新记录

receivedUpdate(m, currentOp);

}

elseif( op==dbDelete ) {//删除记录

receivedDelete(m, currentOp);

}

elseif( op==dbKillCursors ) {//删除Cursors(游标)对象

currentOp.ensureStarted();

logThreshold
=10;

ss
<<"killcursors";

receivedKillCursors(m);

}

else{

mongo::log()
<<"operation isn't supported:"<<op<<endl;

currentOp.done();

log
=true;

}

}

.....

}

}


从上面代码可以看出,系统在确定dbUpdate操作时,调用了receivedUpdate()方法(位于instance.cpp文件第570行),下面是该方法的定义:

voidreceivedUpdate(Message&m, CurOp&op) {

DbMessage d(m);//初始化数据库格式的消息

constcharns=d.getns();//获取名空间,用于接下来insert数据

assert(
ns);

//因为CUD操作在主库中操作,所以这里断言名空间包含的db信息中是不是主库,即"master"

uassert(10054,"not master", isMasterNs( ns ) );

op.debug().str<<ns<<'';



//获取标志位信息(标识更新一条或多条等)关于消息结构体。有关消息结构参见我的这篇文章:

//http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html

intflags=d.pullInt();

//获取"更新消息"结构体中的selector(也就是要更新的数据条件,相关于where)

BSONObj query=d.nextJsObj();



assert( d.moreJSObjs() );

assert( query.objsize()<m.header()->dataLen() );

BSONObj toupdate=d.nextJsObj();//要更新的记录

uassert(10055,"update object too large", toupdate.objsize()<=BSONObjMaxUserSize);

assert( toupdate.objsize()<m.header()->dataLen() );

assert( query.objsize()+toupdate.objsize()<m.header()->dataLen() );

//标识是否为upsert方式,即:如果存在就更新,如果不存在就插入

boolupsert=flags&UpdateOption_Upsert;

//是否更新所有满足条件(where)的记录

boolmulti=flags&UpdateOption_Multi;

//是否更新所有节点(sharding状态)

boolbroadcast=flags&UpdateOption_Broadcast;

{

strings=query.toString();

/todo: we shouldn't do all this ss stuff when we don't need it, it will slow us down.

instead, let's just story the query BSON in the debug object, and it can toString()

lazily

/

op.debug().str<<"query:"<<s;

op.setQuery(query);

}



writelock lk;



//如果不更新所有节点(sharding)且当前物理结点是shard 状态时

if(!broadcast&&handlePossibleShardedMessage( m ,0) )

return;

//if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit

Client::Context ctx( ns );



UpdateResult res=updateObjects(ns, toupdate, query, upsert, multi,true, op.debug() );//更新对象

lastError.getSafe()->recordUpdate( res.existing , res.num , res.upserted );//for getlasterror

}



上面的方法中,主要是对消息进行折包解析,找出要更新的数据记录及相应查询条件,以及更新方式(即upsert),然后再在“写锁”环境下执行更新数据操作。



最终上面代码会调用 updateObjects()方法,该方法定义如下:

//update.cpp 文件第1279行

UpdateResult updateObjects(constcharns,constBSONObj&updateobj, BSONObj patternOrig,boolupsert,boolmulti,boollogop , OpDebug&debug ) {

//断言记录的ns是否在"保留的$集合"中

uassert(10155,"cannot update reserved $ collection", strchr(ns,'$')==0);

if( strstr(ns,".system.") ) {

/
dm: it's very important that system.indexes is never updated as IndexDetails has pointers into it/

uassert(
10156, str::stream()<<"cannot update system collection:"<<ns<<"q:"<<patternOrig<<"u:"<<updateobj , legalClientSystemNS( ns ,true) );

}

return_updateObjects(false, ns, updateobj, patternOrig, upsert, multi, logop, debug);

}





上面方法对要更新的ns进行判断,以避免因更新保留的集合而对系统结构造成损坏,如果一切正常,则调用 _updateObjects方法,如下:




//update.cpp 文件第1027行

UpdateResult _updateObjects(boolgod,constchar
ns,constBSONObj&updateobj, BSONObj patternOrig,boolupsert,boolmulti,boollogop , OpDebug&debug, RemoveSaverrs ) {

DEBUGUPDATE(
"update:"<<ns<<"update:"<<updateobj<<"query:"<<patternOrig<<"upsert:"<<upsert<<"multi:"<<multi );

Client
&client=cc();

intprofile=client.database()->profile;

StringBuilder
&ss=debug.str;



if( logLevel>2)

ss
<<"update:"<<updateobj.toString();



/
idea with these here it to make them loop invariant for multi updates, and thus be a bit faster for that case/

/
NOTE: when yield() is added herein, these must be refreshed after each call to yield!/

NamespaceDetails
d=nsdetails(ns);//can be null if an upsert...

NamespaceDetailsTransientnsdt=&NamespaceDetailsTransient::get_w(ns);

/
end note/



auto_ptr
<ModSet>mods;//定义存储修改信息操作(如$inc, $set, $push,)的集合实例

boolisOperatorUpdate=updateobj.firstElement().fieldName()[0]=='$';

intmodsIsIndexed=false;//really the # of indexes

if( isOperatorUpdate ) {

if( d&&d->indexBuildInProgress ) {//如果正在构建索引

set<string>bgKeys;

d
->inProgIdx().keyPattern().getFieldNames(bgKeys);//获取当前对象的所有字段(field)信息

mods.reset(newModSet(updateobj, nsdt->indexKeys(),&bgKeys));//为mods绑定操作信息

}

else{

mods.reset(
newModSet(updateobj, nsdt->indexKeys()) );//为mods绑定操作信息;

}

modsIsIndexed
=mods->isIndexed();

}

//upsert:如果存在就更新,如果不存在就插入

if(!upsert&&!multi&&isSimpleIdQuery(patternOrig)&&d&&!modsIsIndexed ) {

intidxNo=d->findIdIndex();

if( idxNo>=0) {

ss
<<"byid";

//根据id更新记录信息

return_updateById(isOperatorUpdate, idxNo, mods.get(), profile, d, nsdt, god, ns, updateobj, patternOrig, logop, debug);

}

}



set<DiskLoc>seenObjects;



intnumModded=0;

longlongnscanned=0;

MatchDetails details;

//构造“更新操作”实例对象并用其构造游标操作(符)实例

shared_ptr<MultiCursor::CursorOp>opPtr(newUpdateOp( mods.get()&&mods->hasDynamicArray() ) );

//构造MultiCursor查询游标(参见其构造方法中的 nextClause()语句)

shared_ptr<MultiCursor>c(newMultiCursor( ns, patternOrig, BSONObj(), opPtr,true) );



auto_ptr
<ClientCursor>cc;



while( c->ok() ) {//遍历(下面的c->advance()调用)游标指向的记录信息

nscanned++;



boolatomic=c->matcher()->docMatcher().atomic();



//并将其与更新操作中的条件进行匹配

if(!c->matcher()->matches( c->currKey(), c->currLoc(),&details ) ) {

c
->advance();//将游标跳转到下一条记录



if( nscanned%256==0&&!atomic ) {

if( cc.get()==0) {

shared_ptr
<Cursor>cPtr=c;

cc.reset(
newClientCursor( QueryOption_NoCursorTimeout , cPtr , ns ) );

}

if(!cc->yield() ) {

cc.release();

//TODO should we assert or something?

break;

}

if(!c->ok() ) {

break;

}

}

continue;

}



Record
r=c->_current();//游标当前所指向的记录

DiskLoc loc=c->currLoc();//游标当前所指向的记录所在地址



//TODO Maybe this is unnecessary since we have seenObjects

if( c->getsetdup( loc ) ) {//判断当前记录是否是重复

c->advance();

continue;

}



BSONObj js(r);



BSONObj pattern
=patternOrig;



if( logop ) {//记录日志

BSONObjBuilder idPattern;

BSONElement id;

//NOTE: If the matching object lacks an id, we'll log

//with the original pattern. This isn't replay-safe.

//It might make sense to suppress the log instead

//if there's no id.

if( js.getObjectID( id ) ) {

idPattern.append( id );

pattern
=idPattern.obj();

}

else{

uassert(
10157,"multi-update requires all modified objects to have an _id",!multi );

}

}



if( profile )

ss
<<"nscanned:"<<nscanned;

......



uassert(
10158,"multi update only works with $ operators",!multi );

//查看更新记录操作的时间戳,本人猜测这么做可能因为mongodb会采用最后更新时间戳解决分布式系统

//一致性的问题,也就是通常使用的Last write wins准则,有关信息可参见这篇文章:

//http://blog.mongodb.org/post/520888030/on-distributed-consistency-part-5-many-writer

BSONElementManipulator::lookForTimestamps( updateobj );

checkNoMods( updateobj );

//更新记录

theDataFileMgr.updateRecord(ns, d, nsdt, r, loc , updateobj.objdata(), updateobj.objsize(), debug, god);

if( logop ) {//记录日志操作

DEVif( god ) log()<<"REALLY??"<<endl;//god doesn't get logged, this would be bad.

logOp("u", ns, updateobj,&pattern );

}

returnUpdateResult(1,0,1);//返回操作结果

}



if( numModded )

returnUpdateResult(1,1, numModded );



......

returnUpdateResult(0,0,0);

}





上面的代码主要执行构造更新消息中的查询条件(selector)游标,并将“游标指向”的记录遍历出来与查询条件进行匹配,如果匹配命中,则进行更新。(有关游标的构造和继承实现体系,mongodb做的有些复杂,很难一句说清,我会在本系列后面另用篇幅进行说明)



注意上面代码段中的这行代码:




theDataFileMgr.updateRecord(ns, d, nsdt, r, loc , updateobj.objdata(), updateobj.objsize(), debug, god);
该方法会执行最终更新操作,其定义如下:




//pdfile.cpp 文件934行

constDiskLoc DataFileMgr::updateRecord(

constcharns,

NamespaceDetails
d,

NamespaceDetailsTransient
nsdt,

Record
toupdate,constDiskLoc&dl,

constchar_buf,int_len, OpDebug&debug,boolgod) {

StringBuilder
&ss=debug.str;

dassert( toupdate
==dl.rec() );



BSONObj objOld(toupdate);

BSONObj objNew(_buf);

DEV assert( objNew.objsize()
==_len );

DEV assert( objNew.objdata()
==_buf );



//如果_buf中不包含_id,但要更新的记录(toupdate)有_id

if(!objNew.hasElement("_id")&&objOld.hasElement("_id") ) {

/
add back the old _id value if the update removes it. Note this implementation is slow

(copies entire object multiple times), but this shouldn't happen often, so going for simple

code, not speed.

/

BSONObjBuilder b;

BSONElement e;

assert( objOld.getObjectID(e) );
//获取对象objOld的ID并绑定到e

b.append(e);//为了最好的性能,先放入_id

b.appendElements(objNew);

objNew
=b.obj();

}



/
重复key检查/

vector
<IndexChanges>changes;

boolchangedId=false;

//获取要修改的索引信息(包括要移除和添加的index key,并将结果返回给changes)

getIndexChanges(changes,
d, objNew, objOld, changedId);



//断言是否要修改_id索引

uassert(13596, str::stream()<<"cannot change _id of a document old:"<<objOld<<"new:"<<objNew ,!changedId );

dupCheck(changes,
d, dl);//重复key检查,如果重复则通过断言终止当前程序



//如果要更新的记录比最终要插入的记录尺寸小

if( toupdate->netLength()<objNew.objsize() ) {

//如不合适,则重新分配

uassert(10003,"failing update: objects in a capped ns cannot grow",!(d&&d->capped));

d
->paddingTooSmall();

if( cc().database()->profile )

ss
<<"moved";



//删除指定的记录(record),删除操作详见我的这篇文章:

//http://www.cnblogs.com/daizhj/archive/2011/04/06/mongodb_delete_recode_source_code.html

deleteRecord(ns, toupdate, dl);



//插入新的BSONObj信息,插入操作详见我的这篇文章:

//http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html

returninsert(ns, objNew.objdata(), objNew.objsize(), god);

}



nsdt
->notifyOfWriteOp();

d
->paddingFits();



/
如果有要修改的索引/

{

unsigned keyUpdates
=0;

intz=d->nIndexesBeingBuilt();//获取索引(包括正在构建)数

for(intx=0; x<z; x++) {

IndexDetails
&idx=d->idx(x);

//遍历当前更新记录要修改(移除)的索引键信息

for( unsigned i=0; i<changes[x].removed.size(); i++) {

try{

//移除当前记录在索引b树中相应信息(索引键)

idx.head.btree()->unindex(idx.head, idx,
changes[x].removed[i], dl);

}

catch(AssertionException&) {

ss
<<"exception update unindex";

problem()
<<"caught assertion update unindex"<<idx.indexNamespace()<<endl;

}

}

assert(
!dl.isNull() );

//获取指定名称(key)下的子对象

BSONObj idxKey=idx.info.obj().getObjectField("key");

Ordering ordering
=Ordering::make(idxKey);//生成排序方式

keyUpdates+=changes[x].added.size();



//遍历当前更新记录要修改(插入)的索引键信息

for( unsigned i=0; i<changes[x].added.size(); i++) {

try{

//之前做了dupCheck()操作,所以这里不用担心重复key的问题

//在b树中添加索引键信息,有关该方法的定义参见我的这篇文章

//http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html

idx.head.btree()->bt_insert(

idx.head,

dl,
changes[x].added[i], ordering,/dupsAllowed*/true, idx);

}

catch(AssertionException&e) {

ss
<<"exception update index";

problem()
<<"caught assertion update index"<<idx.indexNamespace()<<""<<e<<endl;

}

}

}

if( keyUpdates&&cc().database()->profile )

ss
<<'n'<<keyUpdates<<"key updates";

}



//update in place

intsz=objNew.objsize();



//将新修改的记录信息复制到旧记录(toupdate)所在位置

memcpy(getDur().writingPtr(toupdate->data, sz), objNew.objdata(), sz);

returndl;

}


上面代码段主要先对B树索引进行修改(这里采用先移除再重建方式),之后直接更新旧记录在内存中的数据,最终完成了记录的更新操作。



最后,用一张时序图回顾一下更新记录时mongodb服务端代码的执行流程:



Mongodb源码分析--更新记录







好了,今天的内容到这里就告一段落了,在接下来的文章中,将会介绍Mongodb的游标(cursor)设计体系和实现方式。

参考链接:

http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html

http://www.cnblogs.com/daizhj/archive/2011/04/06/mongodb_delete_recode_source_code.html

http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html

http://blog.mongodb.org/post/520888030/on-distributed-consistency-part-5-many-writer

原文链接:http://www.cnblogs.com/daizhj/archive/2011/04/08/mongodb_update_recode_source_code.html
作者: daizhj, 代震军

微博: http://t.sina.com.cn/daizhj

Tags: mongodb,c++,source code

原文链接: https://www.cnblogs.com/daizhj/archive/2011/04/11/mongodb_update_recode_source_code.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/23849

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年2月8日 上午1:43
下一篇 2023年2月8日 上午1:44

相关推荐