Project1 StandaloneKV
在这一节实验中,我们将会基于badger实现一个支持Column Family(也叫CF)的独立的KV数据库。独立意味着只有一个节点,而不是一个分布式系统。
这个数据库支持四项基本操作:Put/Delete/Get/Snap。
- Put:替换数据库中指定CF的某个key的value
- Delete:删除指定CF的key的value
- Get:获取指定CF的某个key的当前value
- Snap:获取指定CF的一系列key的当前value
这一节实验分为两个步骤:
- 实现一个独立的存储引擎
- 实现原始的键值服务处理程序(RawPut/RawDelete/RawGet/RawSanp)
部分概念理解
Column Family
Column Family,也叫 CF,这个概念从 HBase 中来,就是将多个列合并为一个CF进行管理。这样读取一行数据时,你可以按照 CF 加载列,不需要加载所有列(通常同一个CF的列会保存在同一个文件中,所以这样有很高的效率)。此外因为同一列的数据格式相同,你可以针对某种格式采用高效的压缩算法。
在这一节实验中,可以简单的将CF理解为Key的前缀或者说是C++里面的namespace。上层应用告诉我们CF和key,我们只需要简单的连接起来就可以用来当成访问底层badger的key了。
实现一个独立的存储引擎
standalone_storage.go
该节代码主要在/kv/storage/standalone_storage.go
中 ,我们需要实现StandAloneStorage结构体及其配套的方法以满足Storage
接口。
首先我们需要做的是将底层badger再进行一次封装,所以在StandAloneStorage结构中可以只简单的定义一个badger结构。方法NewStandAloneStorage用Config中的信息并且调用/kv/util/engine_util/engines.go
中的工具函数返回一个新建立的badger即可。
然后我们需要实现Write()和Reader()方法。
Write()方法是实际上对数据库进行写,所以我们需要直接对底层badger进行操作,这需要用到/kv/util/engine_util/util.go
中的部分工具函数。整体流程如下即可。
for _, x := range batch {
switch x.Data.(type) {
case storage.Put:
case storage.Delete:
}
}
Reader()方法需要返回一个StorageReader接口,这需要我们自己定义。参考RaftStorage,我们最需要的结构是badger数据库的Txn结构。在具体实现之前,我们需要先了解badger相关的api调用。我们的项目中用的是老版本的badger,需要参考https://github.com/Connor1996/badger中的使用。其实在实验中需要用到的api只有NewTransaction(),和与它相关的下层api。
我们自己定义的StorageReader接口只需要包含Txn一个结构,GetCF和IterCF方法都通过项目提供的工具函数实现即可。Reader()方法简单的返回一个我们自己定义的Reader即可(需要使用NewTransaction()新建一个Txn结构)。
实现原始的键值服务引擎
raw_api.go
该节代码主要在/kv/server/raw_api.go
中,我们需要实现四个方法,分别是RawGet/RawPut/RawDelete/RawScan。
首先是两个写方法,RawPut和RawDelete非常相似,我们要构建一个符合Modify接口的数据并传入第一节实现的Write方法中。这个接口的具体定义在/kv/storage/modify.go
中。比如Delete操作可以这样构建。
storage.Delete{
Key: req.GetKey(),
Cf: req.GetCf(),
}
然后是RawGet方法,我们需要先通过Reader()方法获取Txn结构,然后通过Txn的GetCF方法获取值,再构造Response返回即可。需要注意的是,如果找不到值,就需要将Response中的NotFound置为true。
最后是RawScan方法,我们首先获取Txn结构,再通过IterCF获取Iter,然后再用Iter的Seek,Valid,Next方法得到遍历。从StartKey开始,同时需要注意Limit。
Project2 RaftKV
在本节实验中,我们将实现一个基于raft的高可用的kv服务器,这不仅需要实现 Raft 算法,还需要实际使用它,这会带来更多的挑战,比如用 badger 管理 Raft 的持久化状态,为快照信息添加流控制等。
本节有3个部分需要实现,分别是:
- 实现基本的raft算法
- 在Raft之上建立一个容错的KV服务
- 增加对日志压缩和快照的支持
Part A 实现基本的raft算法
这一部分中,我们需要实现基本的raft算法,需要实现的代码都在/raft
下,具体在/raft/raft.go
,/raft/log.go
和/raft/rawnode.go
这三个文件中。为了完成项目,我们需要优先阅读Raft论文,需要了解raft算法的领导人机制,日志复制机制,选举机制等,只有这样我们才能与项目中具体的实现联系起来。
在这一节我们需要实现三个模块,分别是Raft,RaftLog和RawNode,对应了上面三个文件。
log.go
首先我们需要先理解RaftLog中各个变量的意义。
- commited:当前这个Raft节点已经提交的最后一个日志的index
- applied:当前这个Raft节点已经应用的最后一个日志的index
- stabled:当前这个Raft节点已经送入RaftDB中存储的日志(即稳定的日志)
- entries:当前这个Raft节点除了快照部分所有的日志
- pendingSnapshot:当前节点正在应用的快照
日志结构大概如下所示。
// snapshot/first.....applied....committed....stabled.....last
// --------|------------------------------------------------|
// log entries
在这个文件中,我们需要实现newLog,maybeCompact,unstableEntries等方法。
newLog方法,我们需要注意的点是要从storage中获取hardState,FirstIndex等来初始化RaftLog。
maybeCompact方法,我们需要做的是获取storage中当前的新的FirstIndex,然后参照新的FirstIndex对当前Raft的entries进行截断。截断的意思是将entries中日志的index小于FirstIndex的日志删除掉,只保留之后的日志。
unstableEntries和nextEnts方法,我们只需要按照注释返回满足要求的日志即可,需要注意一些边界条件,避免出现runtime error。
LastIndex方法,我们需要判断当前entries中是否存有日志,如果没有存日志(len(l.entries)==0),就需要从storage中获取LastIndex,否则直接返回entries中最后一条日志的index。同时我自己添加了FirstIndex方法来返回当前RaftLog的第一条日志的index,实现逻辑与LastIndex方法类似。
Term方法,我们需要先遍历所有entries,找到index与目标index相等的日志,然后返回它的term。如果entries中不存在,则返回storage中的对应index的term。
Raft.go
三种节点身份
- 领导者(Leader):领导者负责整个Raft Group的日志推送,日志复制和日志提交。一个稳定的Raft Group(非隔离和正在选举状态)应该同时只有一个领导人,当当前领导人被隔离或者关闭后,其他节点应该能通过选举超时机制新选出一位领导人,每当领导者刚上任,他会立刻向其他结点发送心跳消息,让他们不再发起选举(重置选举超时时间)。
- 候选者(Candidate):跟随者由于一段时间内没有收到领导人的消息,那么它就会出现选举超时而自己发起选举,这时候跟随者就会转变成候选者并像所有其他节点发送选举投票请求消息。只有接受到超过当前Group中节点数量半数的同意票,候选者才能成为领导者。
- 跟随者(Follower):跟随者接收领导者发送来的日志,进行复制并回应,让领导者知道日志是否复制成功。每次接到领导者的日志复制或心跳消息,跟随者会重置自己的选举超时时间。
Raft驱动规则
- Step(m pb.Message),上层RawNode用于传递消息给Raft。这里可以根据Raft节点自己的身份,分类进行处理。
- tick(),同样是上层调用,这是一个逻辑时钟,每次被调用一次,Raft节点会增加自己的r.heartbeatElapsed和r.electionElapse,然后根据节点身份进行相应的处理。
上层的RawNode会定时调用tick(),驱动Raft,同时如果有消息,则会通过step()函数传递给Raft。然后Raft根据消息类型和身份进行处理。如果有需要发送的消息则只需要送入r.msg中,RawNode在产生Ready的时候会取走并发送给别的节点。
消息类型
首先在每条消息最前面都需要判断消息的term和自己的term大小。如果消息的term大,那么节点就需要变成跟随者;如果节点的term大,那么就认为这条消息是过期消息,无视它或返回一条拒绝消息。
MessageType_MsgHup
Local Message,跟随者或者候选者收到,会立刻发起一次新的选举,广播发送RequestVote。
handleHup(m pb.Message):
先判断自己是否有pendingSnapshot,如果有,则不要发起选举。同时如果自己不在集群中,也不要发起选举。其他情况都可以立刻发起选举并广播RequestVote。
MessageType_MsgRequestVote
Candidate用于发送投票请求。
handleRequestVote(m pb.Message):
判断当前节点是否已经投票,如果投票就拒绝。然后判断消息的日志是否比节点自己的日志旧,如果旧也拒绝。最后回复一条同意消息并且将自身的r.Vote置为消息来源者。
MessageType_MsgRequestVoteResponse
投票请求的回复消息。
handleRequestVoteResponse(m pb.Message):
获取消息中的同意拒绝信息,然后判断同意票数或者拒绝票数是否超过集群总数的一半。如果同意票超过一半则成为Leader,并广播Heartbeat;如果拒绝票超过一半则成为Follower。
MessageType_MsgBeat
Local Message,收到后,如果自己是Leader,则广播Heartbeat。
handleBeat(m pb.Message):
遍历r.Prs,向除了自己之外的所有节点发送Heartbeat。
MessageType_MsgHeartbeat
Leader发送的Heartbeat,为了推动其他节点的commit。
handleHeartbeat(m pb.Message):
重置自己的r.electionElapsed,然后在消息中附上自己的最后一条提交的日志的term和index,给领导者发送一条HeartbeatResponse。
MessageType_MsgHeartbeatResponse
Follower回复给Leader的消息。
handleHeartbeatResponse(m pb.Message):
领导者将自己的最后一条提交的index和term与消息中的index和term进行比较。如果领导者的提交日志较新,则发送Append。
MessageType_MsgAppend
Leader发送给Follower的消息,目的是推动日志复制。
handleAppendEntries(m pb.Message):
节点首先对比自己的日志index和消息中的index,如果自己的最后一条日志的index较小,则回复一条AppendResponse,并附上自己最后一条日志的index。然后还要判断term是否一致,如果不一致也会返回。最后就是将r.Entries中的index与消息中index相等的日志之后的日志全部删除,然后改成消息中附带的entries。
MessageType_MsgAppendResponse
Follower回复给Leader的消息,分为两种情况,回复Append或Snapshot。
handleAppendEntriesResponse(m pb.Message):
如果回复拒绝,那么领导人会将next递减,然后重新尝试发送Append。如果是同意,那么就会更新对应节点的next和match,如果r.leadTransferee不为0,还需要判断是否可以进行领导人转移。最后是判断是否可以推进commited,如果可以推进commited,还需要立刻发送Append来推进其他节点的commited。
MessageType_MsgSnapshot
当Leader发送Append时,发现对应节点的next在自己storage的firstindex之前,那么他就会申请Snapshot并发送。
handleSnapshot(m pb.Message):
如果Snapshot中的index小于节点自己的commited,说明这条Snapshot消息过期,直接无视。然后节点会根据Snapshot中的信息更新自己的committed,applied,Prs等,并且截断自己entries。
MessageType_MsgTransferLeader
Local message,上层应用用来发起Leader Transfer。
handleTransferLeader(m pb.Message):
首先设置自己的r.leadTransferee,并检查目标节点是否已经包含了自己所有的日志,如果不包含,则启动Append流程进行同步,并且领导人转移期间不会再接受所有的propose请求。同步完成后,会发送TimeoutNow给目标节点。
MessageType_MsgTimeoutNow
handleTimeoutNow:
节点收到会直接通过Step给自己发一条Hup消息,直接发起一次新的选举。
MessageType_MsgPropose
Local Message,上层应用用来推送日志。
handlePropose(m pb.Message):
首先将推送的日志加入自己的entries,然后广播发送Append。需要注意的是,如果正在转移领导人,那么需要返回一个错误,让上层应用知道。
RawNode.go
在这个文件中我们需要实现一个raft算法和上层应用程序的中间的一个接口,叫做RawNode。RawNode封装一些函数供上层调用,如RawNode.Tick()实现逻辑时钟增长,RawNode.Propose()让上层应用来提供新的日志供raft算法来进行复制。
- Ready:保存一些重要的信息,包括hardstate、softstate等,这个数据结构是上层应用用来更新底层badger的。
上层应用首先通过调用RawNode.Ready()
来获得信息,然后根据该函数返回的Ready结构信息进行处理,处理完之后再调用RawNode.Advance()
更新raft节点中的数据。所以我们主要需要完成的就是RawNode.Ready()
和RawNode.Advance()
函数,除此之外还有一个判断是否有新的Ready结构的函数HasReady()
。
RawNode.Ready()
我们需要关注Ready的结构,发现前两个变量需要通过判断是否发生变化来进行返回,所以需要在RawNode结点中保存这两个状态,然后通过注释理解其他变量的意思就可以实现Ready()函数了。
RawNode.Advance()
其实就是默认上层应用已经把Ready返回的信息都处理完毕了(该保存进db的都保存了,啥都干了),然后RawNode结点需要做的就是更新自己管理的Raft结点的一些信息和状态,比如stabled,applied等。
RawNode.HasReady()
判断Ready中的变量是否有变化或为空,只要有一个变量有变化或者非空,就返回True;否则返回False。
Part B 在Raft之上建立一个容错的KV服务
在这一节实验中,我们将使用Part A中实现的Raft模块建立一个容错的KV存储服务。服务将是一个复制状态机,由几个使用Raft进行复制的KV服务器组成。只要大多数的服务器是活的并且可以通信,那么KV服务就应该继续处理客户的请求,尽管有其他的故障或网络分区发生。
我们首先要了解三个术语:Store、Peer和Region。
一个cluster包含多个store。region是peer的集合,在一个store上,同一个region的peer最多只有一个,但是可能拥有不同region的peer,且这些region的key space不重叠。一个region应该是横跨多个store的,由多个peer组成。
RaftStorage发送Command信息流程
RaftStorage就像1中实现的standAloneStorage接受客户的write,read要求,把要求封装成RaftCmdRequest,然后通过SendRaftCommand函数发送到router的peerSender隧道,并且在途中会把RaftCmdRequest进一步封装成peerMsg(该步骤在/kv/storage/raft_storage/raft_server.go
的Writer()和Reader()中)。这个隧道连接的是RaftStore中的router的peerSender(具体实现在上述路径的Start()
函数中)。
然后/kv/raftstore/raftstore.go
的startWorkers函数中将peerSender传递到了/kv/raftstore/raft_worker.go
的run函数中的rw.raftCh,并且启动raftWorker。raftWorker会先将raftCh中的信息都读出来,然后获取对应region的peer的句柄将msg传给peer_msg_handler.go中的HandleMsg函数进行处理。
在HandleMsg的MsgTypeRaftCmd分支中,会先将msg去一层包装重新变成RaftCmdRequest和Callback,然后传给proposeRaftCommand函数进行处理。
proposeRaftComman函数会进行一系列的检查,请求中的一些信息与底层的信息是否保持一致,如StoreID,RegionID,LeaderID,PeerID和RegionEpoch,若不一致则返回各种err。然后判断请求类型,由于未做优化,那不管什么都是需要propose一条Raft log,这个操作可以直接用RawNode的propose接口完成(还需要使用Marshal函数将Msg变成Data类型)。在这之后,还需要用proposals数组把propose的请求都保存起来。
所有的Msg都处理完毕之后,将会调用HandleRaftReady函数处理一下Raft的Ready结构。这块代码与任务书2中的伪代码的后半部分基本一致,流程是:
1.获取Ready数组
2.调用SaveReadyState函数保存服务器的Entry和RaftLocalState(相当于Raft论文中的服务器保存的持久性状态)
3.发送消息给其他peer
3.处理Raft所有提交的Entries(这对应着之前propose的一些操作),这个操作可能会改变kvDB里面存储的键值对,还要更新kvDB中的RaftApplyState
4.所有Entries处理完毕后,将会调用Advance函数对Raft节点进行更新。
步骤1:直接调用RawNode的Ready函数(在2ac中完成)即可
步骤2:调用SaveReadyState函数,这个函数的作用是追加日志和保存HardState(可以直接保存RaftLocalState,因为其中包含了HardState)。完成这个函数需要学会WriteBatch的使用,主要是SetMeta,DeleteMeta和WriteToDB函数(都有对应的注释),还有生成对应key的api
步骤3:直接调用send函数即可
步骤4:这里需要先用Unmarshal函数将Entry中的data重新变成RaftCmdRequest,然后在判断请求类型进行各种应用的操作,然后生成callBack,这里需要判断是否有错误,比如ErrStaleCommand(index不对应或者term不对应都是这个错误),最后更新一下RaftApplyState即可。
步骤5:直接调用RawNode的Advance函数(在2ac中完成)即可
最后RaftStorage通过传回来的callBack进行checkResponse。
Part C 增加对日志压缩和快照的支持
日志压缩和快照应用的大体流程理解
日志压缩
- 当日志的应用数量达到一定限制,
onRaftGcLogTick
函数发送一个Raft admin请求(CompactLogRequest)到Raft中;
- 经过Raft结点之间的日志复制,该admin请求被提交上来;
- 然后
HandleRaftReady
进行处理,进行raftDB上面的实质的日志信息删除,此时即代表日志压缩完成了。
快照应用
- 若Raft中有一部分结点之前被隔离或者新启动的一个结点,导致其当前存储的日志太老了,老到已经被压缩删除了,那么Leader就会产生一个快照信息(同时会产生一个临时文件,这个临时文件包括当前kvDB的所有键值数据信息)发送到这部分结点。
- 然后在下一次
SaveReadyState
就可以应用这个快照信息(更新各个状态并且直接按照之前的临时文件把键值信息应用到kvDB中),这样就可以做到让落后太多的结点快速更新到较新的状态,这样就完成了快照应用。
在Raft中修改
raft.go
当一个Leader需要向一个Follower发送Append消息同步日志时,发现Follower需要的日志自己已经truncated,不在RaftLog
中了,那么此时就需要向Follower发送Snapshot同步日志。所以我们需要修改sendAppend
函数,在取对应Follower的prevLogTerm时,如果返回的Err不为nil,则需要发送快照消息,并产生新的快照或者使用已有的快照发给对应Follower。
然后就需要一个handleSnapshot
函数处理Leader发送过来的快照消息,该函数需要做的是根据快照的Metadata
更新自己的committed,applied,stabled和Prs
,然后还要截断自己的Entries
,为了过测试还需要在截断后如果日志长度为0需要添加一个空的但是Index和Term为快照信息的Entry。
最重要的一点:在handleAppendEntries
函数中,调用了获取Term的函数后,需要判断Err是否为空,若非空则直接返回拒绝添加日志的消息。如果不做这个判错处理,那么在2C的测试中将会出现x unexpected raft log index: lastIndex y < appliedIndex z
的错误。
log.go
log.go中需要补充maybeCompact
函数,这个函数的功能是根据PeerStorage
的FirstIndex信息来进行截断当前的日志,这里只要处理好边界条件即可,否则可能会出现Runtime Error: out of range [0]
等类似的错误。
rawnode.go
Ready
函数中,需要添加对快照信息的判断,如果当前的pendingSnapshot
非空,则把其赋给ready结构,然后把原来的快照信息赋为空。
HasReady
函数多加一个快照信息非空返回true的判断即可。
Advance
函数则需要每次都调用log.go中的maybeCompace函数以及时响应可能apply的admin请求(日志压缩的请求)。
在raftStore中修改
peer_msg_handler.go
proposeRaftCommand
函数中需要判断msg的请求是否是admin请求,判断后直接简单的Propose就可以了,保存proposals数组都不需要。
HandleRaftReady
函数需要增加对admin请求的处理(暂时只有AdminCmdType_CompactLog
这一种类型),对这种类型,我们需要首先更新applyState
,然后调用ScheduleCompactLog
函数给raftlog-gc worker
安排任务,让它以异步的方式进行实际的日志删除工作。
peer_storage.go
ApplySnapshot
函数需要先调用clearMeta
和ClearExtraData
清空旧的信息,然后更新raftState,applyState,snapState和RegionState,然后通过regionSched来让region工人干活。
SaveReadyState
函数在快照非空的时候调用ApplySnapshot就可以了。
Project3 MultiRaftKV
在这一节实验中,我们将实现一个带有平衡调度器的基于Multi Raft的KV服务器,它由多个Raft Group组成,每个Raft Group负责一个单独的key范围,称为Region。对于单个Region的请求的处理和以前一样,但多个Region可以同时处理请求,这就提高了性能。
这一节实验分为三个部分,包括:
- 对Raft算法实现成员变更和领导人变更
- 在raftstore上实现TransferLeader、Conf Change和Region Split
- 引入Scheduler
Part A 对Raft算法实现成员变更和领导人变更
实现领导者转移
raft.go
首先需要增加对两种新消息的处理函数,MsgTransferLeader
和MsgTimeoutNow
。
在函数handleTransferLeader
中,需要判断被转移者是否在该raft集群中,然后判断被转移者是否具有资格(日志最新),如果具有则直接发送一条MsgTimeoutNow
消息给被转移者,否则发送日志。
在函数handleTimeoutNow
中,可以直接把一条MsgHup消息送入Step。
除此之外,
在handleAppendEntriesResponse
函数中,需要添加判断被转移者是否具有被转移资格,如有有则直接发送一条MsgTimeoutNow
消息,没有继续发送日志。
在handlePropose
函数中,需要加入对r.leadTransferee
的判断,如果不为0,则不能进行propose。并且每次新成为Leader,都要这个变量置为0。
在测试中,有一个TransferLeader不是发给leader的,而是发给follower的,所以需要在Step那里添加Follower的TransferLeader消息处理,直接转发给该结点此时的leader就可以了。
实现成员变更
raft.go
需要完成addNode
和removeNode
函数。
在函数addNode
中,只需要判断该id是否存在,如果不存在就加进r.Prs就可以了。
在函数removeNode
中,需要判断id在不在r.Prs中,如果在就删除掉。然后就是删除结点之后需要判断是否有新的可以提交的entry。
在函数handlePropose
中,需要判断propose的entry是不是配置修改消息,如果是,就需要判断当前的applied是不是大于等于之前的PendingConfIndex,如果大于等于才可以propose这个配置修改消息,否则直接拒绝。
在测试中,有一个测试函数测试的是r.Prs中不包含自己,这个时候应该无事发生,所以我在Step函数的最前面判断了一下自己在不在r.Prs里面,如果不在则直接返回。(很不幸,这样做会在3b产生bug,因为3b里面ConfChange新生成的Peer初始的Prs是空的,需要让领导人先发一个快照应用一下才能更上其他结点,如果像上面那样做那这个新的结点就不会回复消息,也不会处理消息)
Part B 在raftstore上实现TransferLeader、Conf Change和Region Split
不管是AdminRequest还是普通Request,在RaftStore上面的处理流程都是类似的:先推送进Raft层进行日志复制,通过Ready结构获取已经提交上来的日志,获取提交日志的内容进行应用,更新Raft层的信息。所以在了解了RaftStorage发送Command信息流程之后,就会较为轻松的理解现在我们需要做的事情。
TransferLeader
TransferLeader步骤较为简单,因为他只有Propose阶段。在RaftStore接收到TransferLeader命令后,只需要调用PeerStorage中的TransferLeader()
就可以完成领导人转移,然后可以立刻回复一条Response。
Conf Change
Propose阶段
- 进行一次判断,如果当前Raft层只有两个节点并且当前命令是删除领导人节点,那么会先进行领导人转移命令,然后返回一个错误。
- 如果通过了判断,就调用PeerStorage中的
ProposeConfChange
正常向Raft层推送日志
Apply阶段
- 检查RegionEpoch
- 对于删除节点,如果是自己那么直接调用
destroyPeer()
,否则就更改ConfVer、Peers,还要调用WriteRegionState
进行实质性的写入;对于增加节点,同样是修改ConfVer、Peers和调用WrieRegionState
。
- 调用
removePeerCache
或insertPeerCache
- 调用PeerStorage中的
ApplyConfChange
进行Raft层的修改
- 回复Response
Region Split
Propose阶段
- 检查CheckKeyInRegion
- 正常Propose
Apply阶段
- 检查RegionEpoch
- 检查CheckKeyInRegion
- 检查命令中的Peers长度和现在的Peers长度,不一致就返回
- 根据命令中的PeerIDs构造一组新的Peers
- 构造一个NewRegion,其中Startkey为SplitKey,EndKey为原来的EndKey
- 修改原来的Region中的EndKey、Version
- 在storeMeta中添加或修改Region信息并调用
WriteRegionState
createPeer
、register
最后send
激活Peer
- 回复Response
注意事项
在本节中,因为添加了RegionSplit,所以在原来的普通请求前,也需要检查RegionEpoch和KeyInRegion。
Part C 引入Scheduler
Scheduler拥有关于整个集群的信息。它知道每个Region在哪里,知道它们有多少个Key。
为了获得相关信息,Scheduler要求每个Region定期向Scheduler发送一个心跳请求。在收到心跳后,Scheduler将会更新本地Region信息。
同时Scheduler会定期检查Region信息,以发现我们的TinyKV集群中是否存在不平衡现象。例如,如果任何Store包含了太多的Region,Region应该从它那里转移到其他Store。
cluster.go
具体需要补充的函数为processRegionHeartbeat(region *core.RegionInfo) error
。
- 获取原来的region信息
- 如果有原来的region信息(localregion),那么只需要对比ConfV和Version,如果新的region信息比较旧那么就返回。如果没有,那么就需要扫描所有与新的region相关的localRegion,只有newregion的ConfV和Version大于等于所有的localRegion才能到下一步
- 先调用
putRegion
更新Region然后调用updateStoreStateusLocked
更新信息即可。
balance_region.go
具体需要补充的函数为Schedule(cluster opt.Cluster) *operator.Operator
。
- 调度器将选择所有合适的Store(一个合适的store应该是运行的,Down时间不能比cluster集群的MaxStoreDownTime时间长);然后根据它们的区域大小对它们进行排序
- 调度器尝试从具有最大区域大小的store中找到要移动的region
- 调度器会尝试去找到最适合移动的region,GetPendingRegionsWithLock, GetFollowersWithLock 和 GetLeadersWithLock,从左到右适合程度下降。
- 根据之前的store排序倒序选择含有region大小最小的store当成目的store
- 当选好适合的目的store后,调度器将会判断两个store的大小差距是否足够大,如果足够大那么就会进行region转移,以实现store平衡。
Project 4: Transactions
transaction.go
我们在这里只需要实现底层的MvccTxn
,只是一些基本的操作,比如写入Write、Lock等。
PutWrite、PutLock、DeleteLock、PutValue、DeleteValue
这几个函数都很像相似,只需要定义对应的结构,然后append进Txn.Writes
中即可。
GetLock
这个需要调用Txn.Reader.GetCF
,获取对应Lock的data,然后用ParseLock函数解码成为Lock结构即可。
GetValue
先通过iter.Seek()
查找遍历,然后判断找到的Kind是Write的key是不是自己要找的Key,如果是就从Default中获取值。
CurrentWrite、MostRecentWrite
这两个函数类似,前者是查询当前事务下,传入key的最新的Write,后者不需要管是不是当前事务。就是通过遍历查询key的最新的Write,然后返回即可。
server.go
这一节主要用到在transaction.go中实现的基本操作,来实现上层的附带锁的操作。只要弄清楚每种操作在哪种情况下需要干什么就行了。
- KvGet 在提供的时间戳处从数据库中读取一个值。如果在 KvGet 请求的时候,要读取的 key 被另一个事务锁定,那么 TinyKV 应该返回一个错误。否则,TinyKV 必须搜索该 key 的版本以找到最新的、有效的值。
- KvPrewrite 是一个值被实际写入数据库的地方。一个 key 被锁定,一个 key 被存储。我们必须检查另一个事务没有锁定或写入同一个 key 。
- KvCommit 并不改变数据库中的值,但它确实记录了该值被提交。如果 key 没有被锁定或被其他事务锁定,KvCommit 将失败。
- KvScan 相当于 RawScan 的事务性工作,它从数据库中读取许多值。
- KvCheckTxnStatus 检查超时,删除过期的锁并返回锁的状态。
- KvBatchRollback 检查一个 key 是否被当前事务锁定,如果是,则删除该锁,删除任何值,并留下一个回滚指示器作为写入。
- KvResolveLock 检查一批锁定的 key ,并将它们全部回滚或全部提交。