杂感
大三暑假完成项目,前期学习了Go语言,阅读了论文,看了点6.824的网课,真正的编辑器时间大概有一个半月。
首先想谈一下项目的Debug,这个项目涉及到很多并行和同步相关的内容,而且体量较大,很难控制台调试,更别说单步调试了(少数单元测试还是能单步调试定位错误的),因此要求打日志和看日志。项目提供了一个log包,实现了Debug、Warn、Error、Panic、Fatal的多级日志的多色显示,可以善用。此外,在Project 2B之后,由于有大量随机测试,产出的日志文件可能很大,当然可以用Ctrl+F
查找,但是效率太低,耗内存高,因此善用grep
等工具。再者,项目编写了完善的Makefile
,但是都是以一个阶段为单位运行,单次运行时间很长也不符合我们一个个功能实现的过程,因此要尝试自己编写运行脚本进行函数单测。最后,由于随机性的存在,一次成功未必次次成功,要进行重复测试。
个人体感难度1<4<3c<2<3a/3b
,其中1、4不依赖其他部分,当遇到困难时要学会挑软柿子捏,保持心态,总能会做的,2b, 2c, 3b, 3c
涉及随机测试,要进行重复测试(3c可以少测几次),其余部分一次成功。
Project 2中实现基本的Raft算法,后续都建立在这个基础之上,如果有问题会影响后面的实现,所以建议完成后多测试几遍,保证正确性后再向后推进,减少返工。
以下应项目要求,省略了具体代码实现简略地提供了部分背景知识补充和部分实现思路,如有问题请帮我指出。
实现经验
Project1 Standalone KV
概述
任务实现
实现一个单节点引擎
本节中,需要实现的内容位于kv/storage/standalone_storage/standalone_storage.go
,实现结构体StandAloneStorage
。
StandAloneStorage
是Storage
接口的实现,实现过程中主要参考badger
的API,使用其事务接口(badger.Txn
),完成StandAloneStorage
的初始化和Storage
的四个函数
实现KV服务处理函数
本节中,需要实现的内容位于kv/server/raw_api.go
,使用上一节中实现的存储引擎实现四种服务处理函数RawGet
,RawPut
,RawDelete
和RawScan
。这里实现的都是远程调用函数,收到封装好的RawXXXRequest
,构造并返回对应的RawXXXResponse
Project2 Raft KV
概述
任务实现
基本Raft算法
以下的顺序基本等同于具体实现顺序,可以参考,具体代码实现已经省略
RaftLog
的实现:
为了实现多节点之间的同步,raft将客户机请求的操作先转化为一条条日志并在节点间同步,当大多数结点已经复制了此条日志后才能应用到状态机。
注释中有一段解释了内部的结构
// snapshot/first.....applied....committed....stabled.....last
// --------|------------------------------------------------|
// log entries
其理解如下:
在领导人将创建的日志条目复制到大多数的服务器上的时候,日志条目就会被提交(committed)。
一旦跟随者知道一条日志条目已经被提交,那么他也会将这个日志条目应用(applied)到本地的状态机中。
Raft
结构体的实现
实现Raft结构体的初始化和各种方法,必要时结构体内需要添加内容
tick()
和Step(m pb.Message)
:
个人认为这两个函数是实现整个Raft
的基础
tick()
用于驱动内部逻辑时钟并且处理超时逻辑。
Step(m pb.Message)
用于处理不同类型的信息,Raft
接受到消息后,根据当前不同状态和不同的消息类型进行不同的处理。
领导人选举和状态变更
日志复制
同样给出一次典型的日志同步流程

实现RawNode
接口
本节中主要理解了程序结构,代码实现难度不高,主要实现raft/rawnode.go
中的Ready
结构体和三个函数:Ready()
, HasReady()
,Advance()
RawNode
是Raft
模块与存储层之间的桥梁,当需要使用Raft
模块进行节点间同步时,会调用RawNode
封装好的方法;当Raft
模块有消息需要被发送或者需要应用日志操作时,也会通过RawNode
层,几个实体之间的关系如下图

在Raft
基础上搭建容错KV服务器
到此为止,已经实现了基本的Raft
算法,但这并不满足我们的系统级要求,这里我们基于前述的Raft
模块完成一个可容错KV服务器,本部分进行了大量的阅读和学习,进行了一些归纳整理,希望能有所帮助。
理论基础
存储基础
首先借用官方文档里一张图片:

给出以下概念
Store(图中)/Raftstore(代码中)
:一个TiKV
结点只有一个Store
,是底层的KV存储基础
Region
:一个Region
即一个Raft Group
,内含多个Peer
,同一个Region
的不同Peer
分配在不同的Store
上,对同一个范围(StartKey-EndKey
)的数据进行处理(内部多Peer
通过Raft
协议同步),实现备份容灾
Peer
:Peer
是一个Raft Group
的一个结点,一个Store
上面有多个Peer
,每个Peer
属于不同的Region
,同一个Store
上的所有Peer
同时在一个存储介质上读写不同范围的数据
在本部分中,在每个 Store
上只有一个 Peer
,集群中只有一个 Region
。
项目结构
首先在raft_server.go
中给出了Storage
接口(也就是Project1中见过的,具有Start
,Stop
,Write
和Reader
的存储引擎)的另一实现——RaftStorage
,用于上层服务器调用,此外,在RafterStorage
中创建了Raftstore
(上一小节所述)用于驱动Raft
模块,当Raft
提交操作后才应用到状态机。
type RaftStorage struct {
engines *engine_util.Engines
config *config.Config
node *raftstore.Node
snapManager *snap.SnapManager
raftRouter *raftstore.RaftstoreRouter
raftSystem *raftstore.Raftstore //前述RaftStore
resolveWorker *worker.Worker
snapWorker *worker.Worker
wg sync.WaitGroup
}
type Raftstore struct {
ctx *GlobalContext
storeState *storeState
router *router
workers *workers //提供了四个用于异步处理的worker,具体见下方
tickDriver *tickDriver
closeCh chan struct{}
wg *sync.WaitGroup
}
type workers struct {
raftLogGCWorker *worker.Worker //处理日志压缩
schedulerWorker *worker.Worker //用于实际split命令的发送,也用于接受和处理心跳负荷调度
splitCheckWorker *worker.Worker //处理region split-check
regionWorker *worker.Worker //处理快照
wg *sync.WaitGroup
}
//此外,在Start RaftStore的时候,还会启动一个raftWorker和一个storeWorker
//其中raftWorker用于处理Raft命令的提交和应用
type raftWorker struct {
pr *router
raftCh chan message.Msg
ctx *GlobalContext
closeCh <-chan struct{}
}
Raftstore
维护了节点上的region
和peer
,首先使用storeMeta
维护元数据,内含一个锁防止多个region
同时修改元数据,一个Btree
索引定位key
位于哪一个region
,一个哈希表定位region
结构体,
而Peer
中又记录了所属的RaftGroup
(类型为*RawNode
),对Raft
模块生成的ready
结构体的处理和对消息的处理都在每一个peer
上处理
函数调用流程
首先在main.go
中启动了一个raftstorage
,调用了raftstorage
的start
,在其中又创建了RaftStore
Raftstore
在创建时,
先加载peers
:扫描db engine
,从中加载region对应的peer
然后将peers
注册进router
,router
用于接受其他节点的message
并路由到指定region
在本raftstore
的peer
并且向其他raftstore
的peer
发送信息
随后启动start workers
,启动一个RaftWorker
和一个StoreWorker
并发送对应start
信息开始运行,另外Raftstore
结构体中还启动了在“项目结构”一节中提及的四个异步worker
用于处理其他任务,此外还会另开一个线程用于推进逻辑时钟并发送相关信息。
这里重点关注RaftWorker
:
关键在于其run()
函数
func (rw *raftWorker) run(closeCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
var msgs []message.Msg
for {
msgs = msgs[:0]
select {
case <-closeCh:
return
case msg := <-rw.raftCh:
msgs = append(msgs, msg)
}
pending := len(rw.raftCh)
for i := 0; i < pending; i++ {
msgs = append(msgs, <-rw.raftCh)
}
peerStateMap := make(map[uint64]*peerState)
for _, msg := range msgs {
peerState := rw.getPeerState(peerStateMap, msg.RegionID)
if peerState == nil {
continue
}
newPeerMsgHandler(peerState.peer, rw.ctx).HandleMsg(msg)
}
for _, peerState := range peerStateMap {
newPeerMsgHandler(peerState.peer, rw.ctx).HandleRaftReady()
}
}
}
RaftWorker
从通道 raftCh
拉取信息,只要外部不发消息终止,则持续监听。
当获取到消息后,先通过 HandleMsg()
,Raftstore
将收到的信息告诉该peer
,然后该peer
进行一系列的处理,Raftstore
再通过 HandleRaftReady()
处理 RawNode
产生的Ready
。
一条客户机指令的始末:
在前面知识的基础上,可以梳理出一条指令从发出到完成的整个过程
首先,一条来自客户机的应用层指令被封装成MsgRaftCmd
由RaftStore
路由到对应的peer
,
peer
将指令封装成一条entry
,通过raft
层实现其在raft group
的同步,进行日志的"propose",除了在raft
模块中的同步,peer
还会在proposals
数组中添加一个新的propose
,表示此条指令已经被提交,等待应用,这种等待通过callback
实现。
随后当大多数节点都持久化后进行提交和应用,这会产生一个ready
,会在raftstore
的handlRaftReady
中进行应用,向DB中进行读写,完成后取出对应指令的propose
并进行callback
来回复已经处理完此条指令
任务实现
这一部分代码量不大,主要是阅读代码进行理解,这里主要给要完成的地方指个路。
peer_storage.go
:主要需要实现Append
和saveReadyState
。
peer_msg_handler
:主要需要实现proposeRaftCommand
和HandleRaftReady
添加对日志压缩和快照的支持
本节处理有关快照的部分,服务器定期检查日志数量,当日志数量超过阈值,会将部分日志丢弃转为使用快照记录当时状态,分为两个部分,一个是实现日志压缩的过程,另一个是触发和发送快照的过程,首先简要说明整个过程帮助理解项目
- 日志压缩:
- 在
raft_worker.go:run()
的HandleMsg()
中收到message.MsgTypeTick
消息,然后onTick()
函数触发d.onRaftGCLogTick()
- 检查
applied
的日志条数,如果超过了阈值(genericTest里的maxraftlog
),创建一个CompactLogRequest
请求(这里的消息不是增删改查类操作的NormalRequest
消息,而是在AdminRequest
给出相关信息),通过proposeRaftCommand()
将消息编码并交给Raft Group
- 在
rawnode.go
中的Propose
处理信息,提交到Raft
中,在handlePropose
内提交这条日志
- 在
raft_worker.go
中的HandleRaftReady
中将raft层中的更改通过 Ready 来通知上层。因此,这里需要apply包含AdminCmdType_CompactLog
的日志项并修改RaftApplyState
中的RaftTruncatedState
的index
和term
(与消息中压缩位置匹配)
- 调用
d.ScheduleCompactLog
发送RaftLogGCTask
任务(raftstore
创建时建的4个worker
之一,会异步执行日志的压缩删除工作)
raftlog_gc.go
里面收到RaftLogGCTask
并在 gcRaftLog()
函数中处理,清除raftDB
内的已经应用的日志。
- 最后在
rawnode.go
中的Advance
调用maybeCompact
更新apply状态
- 快照的触发和发送
raft.go
中的 sendAppend
函数中,如果发现结点落后太多以至于所需的日志已经被compact,则会调用sendSnapshot
使用快照同步
- 在
sendSnapshot
中会检查有无发送中的快照,如果没有,则调用 raftLog.storage.Snapshot()
函数以生成快照(这里由于快照很大,这里可以失败5次,如果还没生成好,会返回raft.ErrSnapshotTemporarilyUnavailable
的err,请求Snapshot
的Peer
应该暂时放弃请求,等待下一次请求),生成的过程由新建的 RegionTaskGen
创建任务,交由regionWorker
处理。
region_task.go
中的 handleGen
函数负责产生快照,调用 doSnapshot
产生快照。
- 当下次调用
Snapshot()
的时候,如果 当请求成功时,发送一个`pb.MessageType_MsgSnapshot``
- ``raft.go
中此消息被存储在了
msgs 中。等待通过
Ready `处理
HandleRaftReady
中通过调用 send
通过 transport
将消息封装成 RaftMessage
发送到对应的store
。
- 目标
RaftStore
在 server.go
中的 Snapshot()
接收发送过来的 Snapshot
。之后生成一个recvSnapTask
请求到 snapWorker
中。
- Raft 中则根据
Snapshot
中的metadata
更新当前自己的各种状态并设置 pendingSnapshot
,然后返回一个 MessageType_MsgAppendResponse
给 Leader。
pendingSnapshot
非空,此时产生ready
会由handleRaftReady
处理并调用ApplySnapshot
中根据pendingSnapshot
的MetaData
更新RaftTruncatedState
和RaftLocalState
,并由region worker
应用到kvDB
- 此时已经应用结束,在
advance
中清除pendingSnapshot
需要修改的部分主要是在raft.go
中添加对快照的支持,并完善peer_storage.go
和peer_msg_handler.go
Project3 Multi-Raft KV
概述
任务目标概述:
之前在Project 2B中,已经初步理解了TinyKV服务器的结构,不过当时在 Store
上只有一个 Peer
,集群中只有一个 Region
,这样的KV服务器是由单一的 raft 组支持的,不能无限扩展,并且每一个写请求都要等到提交后再逐一写入 badger,这是保证一致性的一个关键要求,但也扼杀了任何并发性。
Project 3中实现了multiraft
,一个store
上按照key的范围分为若干个region
,每个region
即一个raft group
,在不同的store
上有多个peer
。
这要求我们
- 使用
conf更改
(或membership change
)实现每个region
的多个peer
的动态增加和删除
- 使用
split
实现将同一个storage
根据键空间分为不同region
细化控制粒度并方便后续合理分配region
调整负载
- 实现
lead transfer
用于转移领导权。
- 最后引入
scheduler
调度器来合理分配负载
任务实现
TransferLeader
、confchange
和Split
主要实现TransferLeader
、confchange
和Split
,主要补充raft.go
相关内容和完成peer_mdg_handler.go
。处理peer_mdg_handler.go
基本逻辑和Project 2B中的处理逻辑类似,只不过这里是adminrequest
,粗略分为propose
阶段,处理阶段和apply
阶段。首先在handlemsg
中接受到上层提出的proposal
,然后用proposeRaftCommand
将消息给到raft层同步(并且传输对应的callback完成后回调);当raft
层处理完成会以ready
的方式传回,再由handleRaftReady
应用到db
实现调度器scheduler
实现上层调度器Scheduler
,决定region
中各peer
的位置,其定期向region
发送心跳RegionHeartbeatRequest
,收到回复后更新本地region
的记录,同时检查region
信息,如果store
中含有太多peer
则将一部分转移到其他store
主要实现scheduler/server/cluster.go
中的processRegionHeartbeat
和scheduler/server/schedulers/balance_region.go
中的Scheduler
。这一部分任务书的说明非常详细,而且对3a,3b没有依赖,可以提前做。
Project4 Transaction
这一部分难度也不算高,但是刚开始会感到困惑,这里记录一些用得上的背景知识
任务目标概述:
本节中实现TinySQL和TinyKV之间的事务系统。
任务可细分为以下部分:
mvcc
模块
- 根据实现的
MvccTxn
实现事务的服务处理函数
背景知识:
TinyKV中采用了列族Lock
,Write
,Default
,分别是锁,写入记录和实际的数据,其组织形式如下表,其中key有序(key升序,时间戳降序)
| CF | DB Key | DB Value |
| ------- | ----------------------- | ------------ |
| Lock | user_key
| lock_info
|
| Default | {user_key}{start_ts}
| user_value
|
| Write | {user_key}{commit_ts}
| write_info
|
其中DB key
通过EncodeKey
将原始key与timestamp编码
TinyKV的设计遵循Percolator,Percolator是在Bigtable上实现的,Bigtable可以理解为一个稀疏的,多维的持久化的键值对Map,其映射关系形如(row,colomn,timestamp)->value
,Percolator通过一个全局的授时服务器TSO给予事务一个全局时间戳来解决时序问题,通过两阶段提交(Prewrite,Commit)来解决分布式事务原子提交问题。
Prewrite
在事务开启时会从 TSO 获取一个 timestamp
作为 start_ts
。
在所有行的写操作中随机选择一个 作为primary
,其它作为secondary
;primary
作为事务提交的 sync point
来保证故障恢复的安全性(回复故障时应该Commit还是Rollback),可能发生以下情况:
2.1 Primary key
的 Lock
还在,代表之前的事务没有commit
,就选择回滚
2.2Primary key
上面的 Lock
已经不存在,且有了 Write
,那么代表 primary key
已经被commit
了,这里我们选择继续推进 commit
2.3 Primary key
既没有Lock
也没有Write
,那么说明之前连 Prewrite
阶段都还没开始,客户端重试即可。
先Prewrite primary
,成功后再 Prewrite secondaries
; 对于每一个 Write
:
3.1. Write-write conflict
检查: 以 Write.row
作为 key,检查 Write.col
对应的write
列在[start_ts, max)
之间是否存在相同key
的数据 。如果存在,则说明存在 write-write conflict
,直接Abort
整个事务。
3.2. 检查 lock
列中该Write
是否被上锁,如果锁存在,Percolator
不会等待锁被删除,而是选择直接Abort
整个事务。这种简单粗暴的冲突处理方案避免了死锁发生的可能。
3.3. 步骤 3.1,3.2 成功后,以 start_ts
作为Bigtable
的 timestamp
,将数据写入data
列。由于此时write
列尚未写入,因此数据对其它事务不可见。
3.4. 对 Write
上锁:以 start_ts
作为timestamp
,以 {primary.row, primary.col}
作为value
,写入lock
列 。{Priamry.row, Primary.col}
用于failover
时定位到primary Write
。
Commit阶段
获取一个 timestamp
作为commit_ts
。
先 Commit primary
, 如果失败则 Abort 事务:
2.1. 检测lock
列 primary
对应的锁是否存在,如果锁已经被其它事务清理,则失败。
2.2. 步骤 2.1 成功后以 commit_ts
作为timestamp
,以start_ts
作为 value
写 write
列。读操作会先读write
列获取 start_ts
,然后再以start_ts
去读取data
列中的value
。
2.3. 删除 lock列中对应的锁。
步骤 2 成功意味着事务已提交成功,此时 Client 可以返回用户提交成功,再异步的进行Commit secondary
。Commit secondary
无需检测lock
列锁是否还存在,一定不会失败,只需要执行 2.2 和 2.3。
附加参考资料
项目简介ppt内已经提供了很多有用的资料,如论文及其翻译、视频课等,这里补充一些可能用的上的内容。
Go语言圣经
任务书中文翻译(有错误,但是比机翻强,有需要可以参考)
[TiKV源码解析系列](https://pingcap.com/zh/blog/?tag=TiKV 源码解析)
Smith-Cruise为TinyKV项目编写的白皮书
etcd的raft实现以及etcd Raft库解析