查看原文
其他

分布式系统Consul一致性实现即Raft日志复制原理

彭荣新 高可用架构 2024-04-26

彭荣新,茄子科技老司机一枚,长期专注基础架构领域,对中间件的的研发和治理以及稳定性保障有丰富的实践经验。


背景

Consul 是一个非常强大的服务发现和配置管理工具,可以帮助您简化服务管理流程,提高系统的可用性和可扩展性,是目前非常流行的服务发现和配置管理系统,支持高可用,可扩展,多数据中心的分布式系统,是很多公司的基础实施组件,这些架构的优点的背后是基于分布式协议raft的实现,raft协议的理论有很多,以前需要根据paxos来实现,像zk自己实现了一套zap的协议来实现数据的复制和一致性,hdfs的namenode 日志高可用也是基于paxos实现,技术发展就是快,现在要实现一致性,高可靠,多副本基本上都是采用raft协议来实现,真正讲raft 日志复制实现的比较少,比如k8s用的etcd,nacos cp模型也有raft的实现,rocketmq也有raft实现slave选举,这篇文章主要分享consul raft 协议日志复制的实现原理,尝试讲明白写日志复制,日志顺序,过半提交,一致性检查等相关的知识点和实现原理。

Consul Raft

raft 算法主要包含两个部分,分别是leader选举和日志复制,leader选举我们不分析,我们主要分析日志复制的实现原理,下面我们以consul的key value 存储的写场景入手,一步步分析写请求的实现逻辑,是怎么实现raft 日志复制保证一致性的, 内容会比较长,会涉及到如下知识点:

  • 客户端发起请求

  • Server端接受请求

  • 拆包处理

  • 心跳机制

  • 批量发送

  • 过半提交

  • 一致性检查

  • 总结


Consul Agent 请求

客户端发起一个put key value的http请求,由kvs_endpoint.go 的KVSEndpoint func 处理,put的方法会路由给KVSPut 处理,除了一些校验外和请求标识,比如是否有获取锁acquire或者release,这里提下一个检查,就是value的大小检查,和web 容器一样检查防止请求数据太大,可以通过参数kv_max_value_size 控制,如果超过返回状态码413,标准的http 状态码。

检查都OK后,consul agent就开始请求consul server了,当然还是rpc 操作

// Copy the valuebuf := bytes.NewBuffer(nil)// 这里才开始读请求的数据。if _, err := io.Copy(buf, req.Body); err != nil { return nil, err}applyReq.DirEnt.Value = buf.Bytes()
// Make the RPCvar out bool// 开始请求serverif err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil { return nil, err}
// Only use the out value if this was a CAS// 没有出错的话,这里就成功返回了if applyReq.Op == api.KVSet { return true, nil}

agent请求server时,会默认请求server list的第一个节点,只有在失败的请求下,回滚动节点,把失败的添加到最后,原来的第二个节点做为第一个节点,请求的是consul 下面的kvs_endpoint.go 下面的Apply 方法,所以我们的重点要来了

Server Apply

consul server的 apply方法,代码还是show下,这里还有两个逻辑说明下。

// Apply is used to apply a KVS update request to the data store.func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { // 检查机房dc是否匹配,不是就转发到对应到dc的server。 if done, err := k.srv.forward("KVS.Apply", args, args, reply); done { return err } // 中间不重要的去了,省得太多... // 对权限token 应用ACL policy ok, err := kvsPreApply(k.logger, k.srv, authz, args.Op, &args.DirEnt) if err != nil { return err } if !ok { *reply = false return nil }
// Apply the update. // 这里是开启raft 算法的之旅的入口。 resp, err := k.srv.raftApply(structs.KVSRequestType, args) if err != nil { k.logger.Error("Raft apply failed", "error", err) return err } if respErr, ok := resp.(error); ok { return respErr }
// Check if the return type is a bool. if respBool, ok := resp.(bool); ok { *reply = respBool } return nil}

在真正开始执行raft 算法前,主要做了如下两件事:

先检查了dc是否是当前dc,如果不是会路由到正确的dc,这也是consul 支持多机房部署的一个很好的特性,路由很方便,这也是多机房部署consul是很好的选择。

检查是否启用了acl策略,如果有,需要检查,没有对应的token是不能操作的。

leader的写请求是从kvs的apply方法开始处理请求的,下面我们看下apply 方法的实现逻辑,在真正执行raft前,consul还做了一些加工,不能蛮搞,是非常严谨的,上面通过raftApply,经过几跳后,会执行到raftApplyWithEncoder方法,这里做的工作是很重要的,所以还是拿出来说下,是涨知识的地方,代码如下:

// raftApplyWithEncoder is used to encode a message, run it through raft,// and return the FSM response along with any errors. Unlike raftApply this// takes the encoder to use as an argument.func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) { if encoder == nil { return nil, fmt.Errorf("Failed to encode request: nil encoder") } // 对请求编码。 buf, err := encoder(t, msg) if err != nil { return nil, fmt.Errorf("Failed to encode request: %v", err) }
// Warn if the command is very large if n := len(buf); n > raftWarnSize { s.rpcLogger().Warn("Attempting to apply large raft entry", "size_in_bytes", n) }
var chunked bool var future raft.ApplyFuture switch { case len(buf) <= raft.SuggestedMaxDataSize || t != structs.KVSRequestType: //请求的数据大小如果小于512 * 1024 即512k,则做一次log执行。 future = s.raft.Apply(buf, enqueueLimit) default: //超过了512k,则需要分chunk,每个chunk做为一个log来应用。 chunked = true //这里就是每个log一次future。 future = raftchunking.ChunkingApply(buf, nil, enqueueLimit, s.raft.ApplyLog) }
//阻塞,等待raft协议完成。 if err := future.Error(); err != nil { return nil, err }
resp := future.Response()
//... return resp, nil}

这里通过注释,你也可以看出,主要关心4件事情:

1. 把请求编码,这个不是我们的重点,后面有时间可以单独分析。

2. 检查是否要拆包,是否要拆成多个raft command 来执行,这里有个参数控制,SuggestedMaxDataSize consul 默认设置是512k,如果超过这个则拆,否则可以一次raft 协议搞定。

3. 有一个超时时间,默认是30秒,后面会用到。

4. 最后事阻塞等待完成,是logfuture。

为什么要拆包

这些事raft 算法不会提的,这个事工程实践才会有的一些优化,此时你也和我一样,为啥要做这个优化呢,有什么好处,解决什么问题,这是我们做一个架构师必须要有的思考。

consul的官方就给出了解释,所以阅读优秀的代码就是一种享受,看注释就能知道为啥这样做,下面是他们对SuggestedMaxDataSize的注释:

// Increasing beyond this risks RPC IO taking too long and preventing// timely heartbeat signals which are sent in serial in current transports,// potentially causing leadership instability.SuggestedMaxDataSize = 512 * 1024

理解就是单次log 数据不能太大,太大理解有下面几个问题:

  • 链接池:leader和follower默认最大3个链接,但新版本的代码实现是没有链接用时,因为日志复制发送会独占一个链接,直到follower应用完成返回成功,才会释放链接,如果发送的数据量大,回影响链接释放的时间,但是目前大版本通过fast path处理以及会新建一个链接来发送,就没有了这个问题。

  • 网络带宽:而且并发高的时候,都是批量发送的,还会同时发给多个follower,有可能导致leader的网络带宽过大,会影响心跳包出现延迟,另外如果heartbeat包也支持follower commit日志,follower端也会影响heartbeat包的处理。


上面说影响到心跳,那我们总要知道heartbeat是怎么实现的,看看到底有什么影响,所以我们把consul的心跳机制实现原理说明下

定时心跳

- raft 心跳理论

把日志提交的两阶段优化为了一个阶段,省去了commit阶段,减少了一个rt,提升了吞吐量,为什么能这样优化,是借助下次请求和心跳请求来告诉followe 当前leader的commit index,所以raft 算法认为心跳包也是会带上当前commit index 给follower,让follower可以尽快提交,保持和leader一致,理论是这样的,但是consul并没有这样实现,请继续看下面

- Consul 心跳实现

但是consul 在实践的时候并没有这么做,也可能是优化了实现,实现逻辑是每个follower有一个独立的goroutine来负责发送heartbeat,consul leader 给follower发心跳时,只带了一个当前leader的任期和leader自身的id和地址两个信息,不带log 相关的信息,所以follower 处理心跳请求就很简单,只要更新下心跳时间即可,当然也会检查任期,这样就没有了io请求,就能快速响应,也叫fast path,就时直接在io线程处理了,因为follower 处理正常的rpc请求和心跳请求经过decode后,都会统一有follower的main goroutine来处理,如果有一个log append的rpc请求很大,即io操作会大,需要持久化log,很定影响后面的请求,即会影响到心跳请求,follower认为在心跳超时时间内没有收到心跳,则认为leader出了问题,会触发选举,就影响了leader稳定。

- CommitTimeout

consul 没有通过心跳机制来让follower尽快和leader保持一致来commit log 到fsm状态机,如果写不连续,那最近一次写follower就会一直不提交,本来是发心跳给follower时会让follower提交,但现在心跳不干这个活了,所以consul 需要一个新的机制来保证即使没有新的写请求的情况下,让follower也尽快和leader保持一致的commit log,这个机制就是commit timeout随机时间,每到了一个时间,consul leader就发一个

日志复制的请求给follower,该rpc请求除了带上leader任期term和标识信息外,还会告诉follower leader的commitindex,follower 就能比较自己的commitindex,如果小于,则进行提交的流程,把没有应用到状态机的log commit掉。

批量发送

说完了拆包优化逻辑后,我们看下ApplyLog的逻辑,代码如下:

// ApplyLog performs Apply but takes in a Log directly. The only values// currently taken from the submitted Log are Data and Extensions.func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture { metrics.IncrCounter([]string{"raft", "apply"}, 1)
var timer <-chan time.Time if timeout > 0 { timer = time.After(timeout) }
// Create a log future, no index or term yet logFuture := &logFuture{ log: Log{ Type: LogCommand, Data: log.Data, Extensions: log.Extensions, }, } logFuture.init()
select { case <-timer: return errorFuture{ErrEnqueueTimeout} case <-r.shutdownCh: return errorFuture{ErrRaftShutdown} case r.applyCh <- logFuture: return logFuture }}

这里主要关心这个applyCh channel,consul 在初始化leader的时候给创建的一个无缓冲区的通道,所以如果leader的协程在干其他的事情,那这个提交log就阻塞了,时间最长30s,写入成功,就返回了logFuture,也就事前面我们看到future的阻塞。

到这里整个consul leader server的插入请求从接受到阻塞等待的逻辑就完成了,consul leader server 有个核心的go routine 在watch 这个applyCh,从定义可以看出,是应用raft log的channel,阻塞在applych 的go routine 代码如下:

case newLog := <-r.applyCh://这个是前面我们提交log future的 if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) newLog.respond(ErrLeadershipTransferInProgress) continue } // Group commit, gather all the ready commits ready := []*logFuture{newLog}GROUP_COMMIT_LOOP: for i := 0; i < r.conf.MaxAppendEntries; i++ { select { case newLog := <-r.applyCh: ready = append(ready, newLog) default: break GROUP_COMMIT_LOOP } }
// Dispatch the logs if stepDown { // we're in the process of stepping down as leader, don't process anything new //如果发现我们不是leader了,直接响应失败 for i := range ready { ready[i].respond(ErrNotLeader) } } else { r.dispatchLogs(ready) }

这里的一个重要的点就是组发送请求,就是读applyCh的log,这个里做了组提交的优化,最多一次发送MaxAppendEntries个,默认位64个,如果并发高的情况下,这里是能读到一个batch的,在网络传输和io操作,分组提交是一个通用的优化技巧,比如dubbo在rpc网络发送,rocketmq,mysql innodb log提交都用了分组提交技术来充分利用网络io带宽,减少网络来回或者io次数的开销,因为一次大概率是用不完网络或者io带宽的,就像高速4车道的,我们可以一次发四个,而不是一个一个的发。

分组好了后,下面就开始dispatch log了,代码如下:

// dispatchLog is called on the leader to push a log to disk, mark it// as inflight and begin replication of it.func (r *Raft) dispatchLogs(applyLogs []*logFuture) { now := time.Now() defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)
//获取当前leader的任期编号,这个不会重复是递增的,如果有心的leaer了,会比这个大。 term := r.getCurrentTerm() //log 编号,写一个加1 lastIndex := r.getLastIndex()
n := len(applyLogs) logs := make([]*Log, n) metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))
//设置每个log的编号和任期 for idx, applyLog := range applyLogs { applyLog.dispatch = now lastIndex++ applyLog.log.Index = lastIndex applyLog.log.Term = term logs[idx] = &applyLog.log r.leaderState.inflight.PushBack(applyLog) }
// Write the log entry locally // log先写入本地持久化,consul大部分的版本底层用的是boltdb,boltdb // 是一个支持事物的数据库,非常方便,这里会涉及io操作。 if err := r.logs.StoreLogs(logs); err != nil { r.logger.Error("failed to commit logs", "error", err) //如果写失败,则直接响应,前面的future阻塞就会唤醒。 for _, applyLog := range applyLogs { applyLog.respond(err) } //更新自己为follower r.setState(Follower) return } //这里很重要,好就才看明白,这个是log 复制成功后,最终应用到状态机的一个机制 //这里是记录下leader自己的结果,因为过半leader也算一份。 r.leaderState.commitment.match(r.localID, lastIndex)
// Update the last log since it's on disk now // 更新最新log entry的编号,写到这里了。 r.setLastLog(lastIndex, term)
// Notify the replicators of the new log // 开始异步发送给所有的follower,这个leader主go routine的活就干完了。 for _, f := range r.leaderState.replState { asyncNotifyCh(f.triggerCh) }}

这个dispatchlog的逻辑注释里基本写清楚了,核心的go routine 经过一顿操作后,最主要就是两点:

- 本地持久化log

consul log持久化是通过boltdb来存储的,boltdb可以看做一个简单版的innodb实现,是一个支撑事务和mvcc的存储引擎, consul 新版本自己实现了log持久化通过wal的方式,可以配置。

- 为过半成功增加一次记录,记录自己写成功,因为计算过半时,leader自己这一份也算在里面,这个很重要。

又异步交给了replicate go routine来处理,他就去继续去分组提交了,大概率如此循环往复,不知疲倦的给replication routine 派活。

复制GoRoutine

replication routine 会监听triggerCh channel,接受领导的任务,这个比较简单,就开始真正发给各自的follower了,代码如下:

case <-s.triggerCh: lastLogIdx, _ := r.getLastLog() //这个后面没有异步了,就是这个rpc调用,判断 shouldStop = r.replicateTo(s, lastLogIdx)

replicateTo 就是rpc调用follower,真正远程rpc给follower,等待响应,这里consul 为保障follower完全和leader的日志一致,需要做有序检查,所以consul leader 在replicate log给follower时,有一个细节要注意下,就是leader 除发当前操作的log entry,还需要带上上一条log entry,每条log entry的有两个关键变量:

  • log.Term  是log所属的leader的任期。

  • log.Index 是log的编号。


这里要发送的日志是就是获取当前最大的log index即lastIndex 做为最大值,然后每个follower维护一个nextIndex,即从那里开始读log,replication goroutine 会从存储里获取nextIndex-->lastIndex 的log,这里可能涉及到io操作,就是把前面的持久化的log,再批量读出来,nextIndex是在复制给follower成功后,会吧lastIndex+1 来更新nextIndex,下次就从新的地方开始读了。

除了log外,还会带上leader当前的CommitIndex,即leader已经应用到状态机FSM的log 索引,follower通过这个来比较,判断自己是否要提交log。

follower 节点通过这两个变量来匹配log是否一致,下面log一致性检查会说明具体怎么用,也会说明为啥要发前面一天log。

过半提交

raft协议要求写操作,只有超过一半才能算成功,才能应用到状态机FSM, 客户端才能读到这个数据,这个过半是leader自己也算在里面的,也就是前面一篇文章我们提到的,leader在持久化log后,就标记自己写成功了,我们没有分析,现在我们来分析下这个逻辑,因为follower 处理完日志复制后,也是有这个逻辑处理的。

//这里很重要,好就才看明白,这个是log 复制成功后,最终应用到状态机的一个机制//这里是记录下leader自己的结果,因为过半leader也算一份。r.leaderState.commitment.match(r.localID, lastIndex)

我们上篇文章只是在这里做了一个注释,并没有分析里面怎么实现的,我们就是要搞懂到底怎么实现的,下面是match的代码:

// Match is called once a server completes writing entries to disk: either the// leader has written the new entry or a follower has replied to an// AppendEntries RPC. The given server's disk agrees with this server's log up// through the given index.func (c *commitment) match(server ServerID, matchIndex uint64) { c.Lock() defer c.Unlock() if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev { c.matchIndexes[server] = matchIndex c.recalculate() }}

注释也基本说明了这个方法的作用,就是我们上面说的,我们就不再重复了,要理解这个逻辑,先了解下这个数据结构matchIndexes,matchIndexes 是一个map,key就是server id,就是consul 集群每个节点有一个id,value就是上次应用log到状态机的编号commitIndex,recalculate的代码如下:

// Internal helper to calculate new commitIndex from matchIndexes.// Must be called with lock held.func (c *commitment) recalculate() { if len(c.matchIndexes) == 0 { return } matched := make([]uint64, 0, len(c.matchIndexes)) for _, idx := range c.matchIndexes { matched = append(matched, idx) } //这个排序是降序,才能保证下面取中间索引位置的值来判断是否过半已经复制成功。 sort.Sort(uint64Slice(matched)) quorumMatchIndex := matched[(len(matched)-1)/2] //如果超过一半的follower成功了,则开始commit,即应用到状态机 if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex { c.commitIndex = quorumMatchIndex //符合条件,触发commit,通知leader执行apply log asyncNotifyCh(c.commitCh) }}

这个recalculate的逻辑单独看有点晦涩,先不急于理解,先举一个例子来说明下recalculate的逻辑:

假如集群三个节点,server id分别为1,2,3,上次写log的编号是3,就是leader和follower都成功了,这个matchIndexes的数据如下:

1(leader) --> 3

2(follower) --> 3

3(follower) --> 3

假如这个时候新来一个put请求,leader本地持久化成功,就要更新这个数据结构了matchIndexes了, 因为leader是先更新,再并发请求follower的,所以这个时候matchIndexes数据如下,因为一个log,所以logIndex是加1。

1(leader) --> 4

2(follower) --> 3

3(follower) --> 3

因为leader本地完成和follower远程完成一样,都要通过这个逻辑来判断是否commit 该log 请求,即是否应用到FSM,所以就是要判断是否过半完成了,逻辑是这样的:

1. 先创建一个数组matched,长度为集群节点数,我们的例子是3,

2. 然后把matchIndexes的commitIndex 起出来,放到matched中,matched的数据就是[4,3,3]

3. 排序,为啥要排序,因为map 是无序的,下面要通过中间索引的值来判断是否变化。

4. 然后计算 quorumMatchIndex := matched[(len(matched)-1)/2],这个就是取中间索引下标的值,也是因为这点,需要第三步排序.

5. 比较quorumMatchIndex 是否大于当前的commitIndex,如果大于,说明满足过半的条件,则更新,然后应用到状态机。

通过上面5步,来实现了一个过半的逻辑,我们再以两个场景来理下,

假如一个follower失败了,一个成功,成功的follower会更新matched的数据是[4,3,4],或者是[4,4,3],排序后为都是[4,4,3], 第4步计算的结果是4大于3,就可以提交了,经过上面的详细,再回看上面的代码就好理解了。

一致性检查:

raft协议日志复制是需要严格保证顺序的,所以在日志复制的时候follower需要对日志做检查,主要有两种情况:

  • log有gap,比如follower停了一段时间,重新加入集群,这个时候follower的log 编号很多事和leader有差距的,对这种情况,就是日志一致性的保证。

  • follower之前有其他的leader写了日志,需要覆盖,以新的leader为准。

  • leader网络出现问题,集群已经有新的leader,老的leader又活过来后,重新发起日志到follower,这个时候任期比会新的leader小,follower直接返回不处理该请求,老的leader 在检查响应时,先判断follower的term是不是比自己大,就停止复制的工作。


每次log append给follower时,follower会把自己当前的logindex 编号和当前leader的任期term返回给leader,leader获取到对应的编号时,会更新发送logNext,也就是从这里开始发生日志给follower,就进入重试的的流程,重新发日志。

这里consul 根据raft协议做了一个优化,raft协议描述的是每次递减一个logindex 编号,来回确认,直到找到follower匹配的编号,再开始发日志,这样性能就很差,所有基本上没有那个分布式系统是那样实现落地的。

Commit Log

只要超过一半的日志 复制成功,consul 就进入日志commit阶段,也就是将修改应用到状态机,通过recalculate 方法给leader监听的commitCh 发一个消息,通知leader开始执行apply log 到FSM, leader 的代码如下:

case <-r.leaderState.commitCh: // Process the newly committed entries //上次执行commit log index oldCommitIndex := r.getCommitIndex() //新的log需要commit的log index,在判断是过半时,会更新commitindex commitIndex := r.leaderState.commitment.getCommitIndex() r.setCommitIndex(commitIndex)
.... start := time.Now() var groupReady []*list.Element var groupFutures = make(map[uint64]*logFuture) var lastIdxInGroup uint64
// Pull all inflight logs that are committed off the queue. for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() { commitLog := e.Value.(*logFuture) idx := commitLog.log.Index //idx 大于commitIndex,说明是后面新写入的,还没有同步到follower的日志。 if idx > commitIndex { // Don't go past the committed index break }
// Measure the commit time metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch) groupReady = append(groupReady, e) groupFutures[idx] = commitLog lastIdxInGroup = idx }
// Process the group if len(groupReady) != 0 { //应用的逻辑在这里。groupFutures 就是写入go routine wait的future r.processLogs(lastIdxInGroup, groupFutures) //清理inflight集合中已经commit过的log,防止重复commit for _, e := range groupReady { r.leaderState.inflight.Remove(e) } }

这里比较简单,就是从leaderState.inflight 中取出log,就是我们之前写入的,循环判断,如果log的编号大于commitIndex,说明是后面新写入的log,还没有同步到follower的log,不能提交。这里应该是有序的,lastIdxInGroup 应该就是需要commit的log的最大的一个编号。

processLogs的逻辑就是支持分批提交支持,发给consul 的runFSM的go routine,consul raft专门有一个go routine来负责commit log到状态机,支持批量和一个一个commit,我们看下单个commit的情况,代码如下:

commitSingle := func(req *commitTuple) { // Apply the log if a command or config change var resp interface{} // Make sure we send a response defer func() { // Invoke the future if given if req.future != nil { req.future.response = resp req.future.respond(nil) } }()
switch req.log.Type { case LogCommand: start := time.Now() //将日志应用到FSM的关键在这里。 resp = r.fsm.Apply(req.log) metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start) ....
}
// Update the indexes lastIndex = req.log.Index lastTerm = req.log.Term}

主要就是三点,应用log 到fsm,然后跟新下fsm的logindex和任期,最后就是要通知还在wait的go routine。

整个日志复制的流程很长,最后再上一张图总结下整个过程:

总结

这篇文章主要基于consul 目前版本的实现和基于个人的理解,以consul 一个写请求的整个过程为线索,介绍了consul 基于raft协议的实现日志复制的基本过程,重点介绍了日志顺序保证措施,日志一致性检查,过半提交log,心跳机制的实现原理,以及相关的几个优化措施,比如大请求分拆,分组批量发送等一些实践优化措施,如有不正确的地方,欢迎交流和指正,个人微博@绝尘驹

参考阅读:




技术原创及架构实践文章,欢迎通过公众号菜单「联系我们」进行投稿


继续滑动看下一个
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存