distributed consensus protocol

Overview

将自己对分布式一致性协议的理解, 简单记录一下,

先从问题8入手,

拜占庭将军问题(Byzantine Generals Problem), 拜占庭位于如今的土耳其的伊斯坦布尔, 是东罗马帝国的首都. 由于当时拜占庭罗马帝国国土辽阔, 为了达到防御目的, 每个军队都分隔很远, 将军与将军之间只能靠信使来传消息.
在战争的时候, 拜占庭军队内所有将军和副官必须达成一致的共识, 决定是否有赢的机会才去攻打敌人的阵营. 但是, 在军队内有可能存有叛徒和敌军的间谍, 左右将军们的决定进而扰乱整体军队的秩序. 在进行共识时, 结果很可能并不能代表大多数良人的意见(分化, 田忌赛马).
这时候, 在已知有成员谋反的情况下, 其余忠诚的将军在不受叛徒的影响下如何达成一致的协议, 拜占庭问题就此形成.

在计算机领域, 非黑客的情况下, 一般都会假设计算机之间不会互相发送恶意信息, 而更多的是宕机/网络延迟所造成的传信停止/滞后

所以将拜占庭将军问题可以简化为: 假设将军中没有叛军, 信使的信息可靠但有可能被暗杀的情况下, 将军们如何达成一致性决定?

paxos

type

  • basic/single-decree paxos, 一次决策一个value, 仅在一个值上达成一致
  • multi paxos, 连续决策多个value, 做到在一系列值上达成一致
    • 因为每个命令都通过一个Basic Paxos算法实例来达到一致, 会产生大量开销
    • 类似kafka的batch send
# Basic Paxos without failures
Client   Proposer      Acceptor     Learner
   |         |          |  |  |       |  |
   X-------->|          |  |  |       |  |  Request
   |         X--------->|->|->|       |  |  Prepare(N)
   |         |<---------X--X--X       |  |  Promise(N,{Va,Vb,Vc})
   |         X--------->|->|->|       |  |  Accept!(N,V)
   |         |<---------X--X--X------>|->|  Accepted(N,V)
   |<---------------------------------X--X  Response
   |         |          |  |  |       |  |

# Multi-Paxos without failures, 多出了提案编号I
Client   Proposer      Acceptor     Learner
   |         |          |  |  |       |  |
   X-------->|          |  |  |       |  |  Request
   |         X--------->|->|->|       |  |  Prepare(N)
   |         |<---------X--X--X       |  |  Promise(N,I,{Va,Vb,Vc})
   |         X--------->|->|->|       |  |  Accept!(N,I,V)
   |         |<---------X--X--X------>|->|  Accepted(N,I,V)
   |<---------------------------------X--X  Response
   |         |          |  |  |       |  |

role

  • client, 发出改值需求, 即需求方, 类似群众
  • proposer, 随机选择一个节点, 将改值需求作为提案, 即提案者, 类似基层人大代表, 帮群众发声
  • acceptor, 为提案投票, 即投票者, 类似全国人大代表, 负责审议表决
  • learners, 记录最终提案并执行, 即记录员

keypoint

  • 提案号proposer number(n)越大, 案件越新, 越容易被接受, 一般是采用同步锁自增++的方式产生的, 最好能够保证n是全局唯一递增
  • client request发送到系统, 随机抓取一个node作为proposer, 所以同一时刻可能有两个set请求
    • node1 -> set X=5
    • node2 -> set X=10
    • 但是quorum会拒绝n不大于自身offset的request, 因为这2个同时request可能会有一个会fail

flow

image

proposal election and proposal workflow4

paper1

To ensure that only a single value is chosen, we can let a large
enough set consist of any majority of the agents. Because any two majorities
have at least one acceptor in common, this works if an acceptor can accept
at most one value.

Learning about proposals already accepted is easy enough; predicting 
future acceptances is hard.

Putting the actions of the proposer and acceptor together, we see that
the algorithm operates in the following two phases,
Phase 1,
    (a) A proposer selects a proposal number n and sends a prepare
        request with number n to a majority of acceptors.
    (b) If an acceptor receives a prepare request with number n greater
        than that of any prepare request to which it has already responded,
        then it responds to the request with a promise not to accept any more
        proposals numbered less than n and with the highest-numbered 
        proposal (if any) that it has accepted.
Phase 2, 
    (a) If the proposer receives a response to its prepare requests
        (numbered n) from a majority of acceptors, then it sends an accept
        request to each of those acceptors for a proposal numbered n with a
        value v, where v is the value of the highest-numbered proposal among
        the responses, or is any value if the responses reported no proposals.
    (b) If an acceptor receives an accept request for a proposal numbered
        n, it accepts the proposal unless it has already responded to a prepare
        request having a number greater than n.

raft

是multi paxos的简化版本, 是对一系列连续问题达成一致的协议,

  • 发送的请求是连续的, 即continuously append only
  • 限制性选主. 必须有最新, 最全的日志节点才可以当选

role

  • follower, 响应candidate/leader的需求, 接受并持久化Leader同步过来的的日志
    • election timeout

      The election timeout is the amount of time a follower waits until becoming a candidate.
      The election timeout is randomized to be between 150ms and 300ms.
      After the election timeout the follower becomes a candidate and starts a new election term.

  • candidate, 选举过程中的临时角色
  • leader, 接收client的改值请求,并向follower同步请求日志,当日志同步到quorum节点后, 告诉follower可以提交日志(2PC)
    • heartbeat timeout

      Once a candidate wins an election, it becomes leader.
      It then sends heartbeat messages to all of the other servers to establish its authority and prevent new elections in a heartbeat timeout(50ms) interval.
      Once the leader crash and followers not receiving any heartbeat message during a random candidate timeout interval, then this election term will be terminated and one of the followers would become a candidate.

image

角色状态转移关系9, credit mindwind

keypoint

  • 随机的election timeout, 导致同一时间点只能有一个leader(最早结束election timeout的就成为candidate, 并发起选自己为主的投票)
  • 系统正常情况下每个任期有且仅有一个leader, 正常工作期间只有leader和followers
  • client request发送到系统, 要么直接到leader或者到follower之后被redirect到leader
  • 利用了once term one leader来tradeoff了multi-paxos的concurrency conflict和ZAB全局单leader的stability

flow

image

workflow

paper9

In Raft there are two timeout settings which control elections.

In normal operation there is exactly one leader and all of the other 
servers are followers.

Terms/任期 are numbered with consecutive integers increases monotonically 
over time. Each term begins with an election.

RequestVote RPCs are initiated by candidates during elections, and 
AppendEntries RPCs are initiated by leaders to replicate log entries 
and to provide a form of heartbeat.

In Raft, the leader handles inconsistencies by forcing the followers 
logs to duplicate its own. This means that conflicting entries in 
follower logs will be overwritten with entries from the leaders log.

Logs are composed of entries, which are numbered sequentially.
Raft requires servers to apply entries in log index order.

Once a follower learns that a log entry is committed, it applies 
the entry to its local state machine.

VR

Viewstamped Replication(VR),

类似raft,

VR raft
replicas peers
primary leader
backup follower
f+1 quorum
view term
view number logIndex
view change re-election

flow

image

here f=1, so quorum=1+1=2, so primary at least wait for 1 replica to response prepareOk14

VR ensures reliability and availability when no more
than a threshold of f replicas are faulty.

This implies that each step of the protocol must be processed by f + 1 replicas. 
These f + 1 together with the f that may not respond give us
the smallest group size of 2f + 1.

VR uses a primary replica to order client requests; the
other replicas are backups that simply accept the order
selected by the primary.

ZAB

similarity

  • 类似raft
      term -> epoch
      logIndex -> count
    
  • 类似锁
      分布式锁与领导选举/lock -> leadership
      排他锁eXclusive lock -> only one leader
      可重入锁reentrant lock -> 能再次被选举为leader并自己投自己
    

keypoint

  • follower/observer越多, 读性能越好, 但是如果保证f/o是最新的?
    • 写操作并不保证更新被所有的f/o立即确认, 因此通过部分f/o读取数据并不能保证读到最新的数据, 仅部分f/o及leader可读到最新数据
    • 如果一定要保证单一系统镜像, 可在读操作前调用sync()19
  • log顺序性
  • committed log sync/replicate
  • leader一定拥有最大的zxid=epoch+count
  • request过来
    • 正常情况, leader commited且不挂
    • commited了, 但是后续leader挂了, 那么max zxid的follower会继任
    • 未commit, 那么client超时, 然后重发, 此时zk cluster继任leader再receive req

flow

image

workflow

paper18

Zookeeper has been built around a two-phase commit protocol that 
allows it to replicate all the transactions while keeping in mind 
all the design principles mentioned above. The leader node 
generates transactions and assigns sequel numbers to them 
upon receiving a client state change request. It then sends 
those transactions to all its follower nodes and waits for 
their acknowledgments.

When receiving ACKs from a quorum, commit calls are sent 
to the quorum for all the transactions. A follower checks 
the sequel number of the issued transaction and only 
commits it if it doesnt have any outstanding 
transactions in the queue.

A node can only be a leader/master node if it has the quorum 
number of nodes as followers.



An outstanding transaction is one that has been proposed 
but not yet delivered

The original Paxos protocol does not enable multiple outstanding transactions.
Paxos does not require FIFO channels for communication, so it tolerates message loss
and reordering. If two outstanding transactions have an order dependency, then Paxos
cannot have multiple outstanding transactions because FIFO order is not guaranteed.
This problem could be solved by batching multiple transactions into a single proposal
and allowing at most one proposal at a time, but this has performance drawbacks.

ZEN

zen discovery主要用于21,

  • discovering nodes/peers
      At startup, or when electing a new master, Elasticsearch tries to connect to each 
      `seed node` in its list, and holds a gossip-like conversation with them to find 
      other nodes and to build a complete picture of the cluster.
        
      PeerFinder.startProbe().peersByAddress.computeIfAbsent(transportAddress, this::createConnectingPeer);
        
      - peers通过seed nodes这个中间节点来发现彼此
      - peers彼此交换master-eligible nodes
    
  • electing a master(quorum)
       /** master nodes go before other nodes, with a secondary sort by id **/
      private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
          if (o1.isMasterNode() && !o2.isMasterNode()) {
              return -1;
          }
          if (!o1.isMasterNode() && o2.isMasterNode()) { // 固定配置, master or data or ingest
              return 1;
          }
          return o1.getId().compareTo(o2.getId());
      }
         
      The responsibility of the master node is to maintain the global cluster state 
      and reassign shards when nodes join or leave the cluster. Each time the 
      cluster state is changed, the new state is published to all nodes in the 
      cluster as described above.
    
  • forming a cluster
  • publishing cluster state

quorum仅用于选主, 但是选主后的publishing就需要所有data nodes都返回ack而非仅quorum个就行, 这可能是es的特性所决定. 也因此不用raft而创造了zen.

flow

image

discovering

gossip

Cassandra/Scylla using gossip to maintain a masterless cluster, using the consistent hash ring to determine the key’s routing instead of decided by the master,

  • pros
    • scalable
    • masterless, fault-tolerant, without SPOF
    • masterless, without single master resource restriction(e.g., es master metadata max size)
  • cons
    • message duplicate(e.g., A->B, then C->B, B receive twice)
    • message delay, need propagate, but in master mode, master can send directly

model23

  • SI model/simple epidemics/anti-entropy, 以固定的概率传播所有的数据
    • push style, nodeA periodically send(push) all its current content to randomly selected nodeB, then nodeB update itself, A->B
    • pull style, nodeA periodically ask(pull) new updates from randomly selected nodeB, then nodeA update itself, A->B(new updates)->A
    • push-pull stype, combination of push and pull, A->B->A->B, i.e., A send its diff new updates to B at last
  • SIR model/complex epidemics/rumor-mongering, 引入removed标记仅传播新到达的数据
    • based on push style

flow

image

illustration of three types of group communication topologies: (a) Centralized approach; (b) Fully connected overlay; and (c) Gossip-based approach(random pick), credit: jisajournal

paper

gossip/convergence spreads is O(logN)

eventual consistency

A node in the network randomly selects a peer with which it will 
exchange some information.

Reference

  1. Paxos Made Simple
  2. Paxos wiki
  3. 漫话分布式系统共识协议: Paxos篇
  4. 图解分布式一致性协议Paxos
  5. Paxos算法
  6. Message Passing vs Shared Memory Process communication Models
  7. paxos github code
  8. Raft理论基础
  9. In Search of an Understandable Consensus Algorithm
  10. raft live
  11. Raft算法详解
  12. 理解Copy On Write技术
  13. raft github code
  14. Viewstamped Replication Revisited
  15. Want to learn how Viewstamped Replication works? Read this summary
  16. vr github code
  17. ZooKeeper’s atomic broadcast protocol: Theory and practice
  18. Zookeeper Atomic Broadcast Protocol (ZAB) and implementation of Zookeeper
  19. ZooKeeper Programmer’s Guide
  20. zab github code
  21. es discovery and cluster formation
  22. zen github code
  23. Gossip and Epidemic Protocols
  24. gossip github code