热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

PhxPaxos源码分析之Paxos实现

介绍

介绍

PhxPaxos是微信后台团队开源的一套基于Paxos实现的c++库, 用于实现分布式系统中一致性的共识. 应用开发者可以将它嵌入到自己的应用中, 使单节点应用可以扩展成为多节点分布式应用, 而且多节点之间的状态可以保持一致, 多节点之间也可以自动容灾切换.

PhxPaxos这个库使用方法很简单, 开发者只需要实现StateMachine接口定义自己的状态机, 并用这个状态机和其他选项(当前节点信息、节点列表)来初始化出来一个Node对象. 应用通过这个Node对象的Propose方法便可以发起提议. 提议在整个集群中达成多数派之后, PhxPaxos会在上述状态机中执行提议的内容.

内部结构

PhxPaxos内部有若干个类, 分别对分布式系统中和Paxos算法中的不同模块/角色进行了抽象, 主要列举如下:

  • Node接口: 对外暴露的API, 比如上述的Propose方法便是该接口的方法之一.

  • PNode类: 实现了Node接口, 也是对系统中单个节点的抽象.

  • Group类: 对每个Paxos group的抽象.

  • Instance类: 对group中每轮Paxos实例的抽象.

  • Proposer/Acceptor/Learner类: 对Paxos实例中的各角色的抽象.

  • StateMachine接口: 定义了状态机需要实现的方法. 应用开发者需要实现.

  • Network接口: 定义了通信模块需要实现的方法. 可以是应用开发者实现, 也可以用PhxPaxos默认的通信模块(基于event loop模式).

  • LogStorage接口: 定义了log存储模块需要实现的方法. 可以是应用开发者实现, 也可以用PhxPaxos默认的存储模块(基于levelDB).

  • MsgTransport类: 封装了Network接口简化PhxPaxos内部和网络通信相关的逻辑, 比如实现消息广播.

  • PaxosLog类: 封装了LogStorage接口简化PhxPaxos内部和log相关的逻辑.

这些类之间的关系简要说明如下:

  • PNode对象是对系统中单个节点的抽象, 它可以拥有多个Group对象, 表示系统的节点支持多个Paxos group.

  • Group对象简单封装了Instance对象.

  • Instance对象是每一轮Paxos实例的抽象, 它组合了Proposer/Acceptor/Learner三个对象(角色). 它主要负责接收来自NetWork的消息并通过IOLoop线程来驱动各个角色.

  • Proposer/Acceptor/Learner: 分别实现了Paxos中对应角色的逻辑. 它们都继承自Base基类, 基类中维护了当前的InstanceId.

下面是自己简单画的UML类图(省略了Group、StateMachine、NetWork等数据结构):



图中除IOLoop类外刚才都已经有所介绍. 这里专门提一下IOLoop这个类, 它是对一个单独线程的抽象, 该线程主要作用是驱动Paxos实例中各角色的运转, 后面会提到这一点.

Paxos算法实现

PhxPaxos中的Paxos算法部分是基于Paxos Made Simple中的论述进行实现的, 论文中的算法流程主要有prepare和accept两个阶段, 具体可以参考这篇文章: 一步一步推导Paxos算法

Prepare阶段

我们看看prepare阶段是怎么实现的.

开发者的应用程序提议一个新的value, 该value最终会在IOLoop线程中通过Proposer的NewValue()方法发起Paxos流程.
触发过程依次如下:

应用线程执行:         PNode::Propose()
IOLoop线程执行:    IOLoop::CheckNewValue()
IOLoop线程执行:    Proposer::NewValue()

Proposer :: NewValue()

其中Proposer :: NewValue()代码如下:

int Proposer :: NewValue(const std::string & sValue)
{
   BP->GetProposerBP()->NewProposal(sValue);
   if (m_oProposerState.GetValue().size() == 0)
   {  
       m_oProposerState.SetValue(sValue);
   }  
   m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
   m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;
   if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
   {  
       // 以往的Paxos实例prepare阶段成功过  && 以往的Paxos实例没有被拒绝.,
       // 才可以跳过prepare阶段
       BP->GetProposerBP()->NewProposalSkipPrepare();
       PLGHead("skip prepare, directly start accept");
       Accept();
   }  
   else
   {  
       //if not reject by someone, no need to increase ballot
       Prepare(m_bWasRejectBySomeone);
   }  
   return 0;
}

开始提议这个新value之前会先判断这次是否可以直接进入accept阶段. 否则需要从prepare阶段开始.
什么情况可以跳过prepare阶段呢? 条件是: 以往的Paxos实例prepare阶段成功过  && 以往的Paxos实例没有被拒绝.

"以往的Paxos实例prepare阶段成功过" 说明多数派已经承诺了不会接受小于它的proposerID的请求, 也即是m_bCanSkipPrepare为true.
"以往的Paxos实例没有被拒绝"说明它的proposerID还没有过时, 也即是系统中还没有更大的proposerID.

Proposer::Prepare()

Proposer::Prepare()代码如下:

void Proposer :: Prepare(const bool bNeedNewBallot)
{
   PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
           GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
           m_oProposerState.GetValue().size());
   BP->GetProposerBP()->Prepare();
   m_oTimeStat.Point();
   ExitAccept();
   m_bIsPreparing = true;
   m_bCanSkipPrepare = false;
   m_bWasRejectBySomeOne= false;
   m_oProposerState.ResetHighestOtherPreAcceptBallot();
   if (bNeedNewBallot) // 如果被拒绝过, 需要使用新的proposalID
   {  
       m_oProposerState.NewPrepare();
   }  
   PaxosMsg oPaxosMsg;
   oPaxosMsg.set_msgtype(MsgType_PaxosPrepare); // 构造prepare请求
   oPaxosMsg.set_instanceid(GetInstanceID());
   oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
   oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
   m_oMsgCounter.StartNewRound(); // 重置一些计数, 包括被接受的数量、被拒绝的数量
   AddPrepareTimer();
   PLGHead("END OK");
   BroadcastMessage(oPaxosMsg); // 广播prepare请求
}

在Prepare()方法的逻辑如下:

  1. 重置一些标记.

  2. 根据是否被拒绝来判断是否使用更大的proposalID. 如果被拒绝过, 需要使用更大的proposalID.

  3. 开始构造prepare请求, 填充MsgType、InstanceID、NodeID、ProposalID字段.

  4. 重置一些计数信息, 包括被接受的数量、被拒绝的数量.

  5. 将构造好的请求广播到系统中所有节点.

Acceptor::OnPrepare()

proposer将消息广播出去后将会被acceptor收到, 触发acceptor的相关逻辑.

触发过程如下:

网络线程执行:         PNode::OnReceiveMessage()
IOLoop线程执行:    Instance::OnReceive()
IOLoop线程执行:    Instance::OnReceivePaxosMsg()
IOLoop线程执行:    Acceptor::OnPrepare()

Acceptor::OnPrepare()代码如下所示:

int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{
   PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
           oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());
   BP->GetAcceptorBP()->OnPrepare();
   PaxosMsg oReplyPaxosMsg;
   oReplyPaxosMsg.set_instanceid(GetInstanceID());
   oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
   oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
   oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);
   BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
   if (oBallot >= m_oAcceptorState.GetPromiseBallot())
   {
       PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
               "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
               m_oAcceptorState.GetPromiseBallot().m_llProposalID,
               m_oAcceptorState.GetPromiseBallot().m_llNodeID,
               m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
               m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
       // 如果proposeID 大于等于 promiseID, 那么需要返回acceptor已经accept过的proposeID和value
       oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
       oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
       if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
       {
           oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
       }
        承诺不接受小于当前提议的proposeID的其他提议
       m_oAcceptorState.SetPromiseBallot(oBallot);
       // 持久化promiseID
       int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
       if (ret != 0)
       {
           BP->GetAcceptorBP()->OnPreparePersistFail();
           PLGErr("Persist fail, Now.InstanceID %lu ret %d",
                   GetInstanceID(), ret);
           return -1;
      }
       BP->GetAcceptorBP()->OnPreparePass();
   }
   else
   {
       // 提议的proposalID小于promiseID, 拒绝这个prepare请求
       BP->GetAcceptorBP()->OnPrepareReject();
       PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
               m_oAcceptorState.GetPromiseBallot().m_llProposalID,
               m_oAcceptorState.GetPromiseBallot().m_llNodeID);
       // 返回promiseID, 方便proposer更正自己的proposeID
       oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
   }
   nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
   PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
           GetInstanceID(), oPaxosMsg.nodeid());;
    // 回复proposer
   SendMessage(iReplyNodeID, oReplyPaxosMsg);
   return 0;
}

acceptor接到prepare请求时, 如果prepare的编号n大于等于promiseID, 那么他承诺永远也不会同意提议编号比这n值小的prepare请求, 记为promiseID=n, 需要进行持久化. 另外, 如果它已经accept了某个值, 需要把接受的值(acceptedValue)和当时的提议编号(acceptedBallot)返回给proposer.
如果n小于promiseID, 那么acceptor可以拒绝该请求, 同时要返回自己的promisedID以便proposer更新下一次的提议编号.

BallotNumber这个概念和proposeID类似, 它包含proposeID和nodeID. BallotNumber实现了比较操作符: 先比较proposeID, 如果proposeID相同那么进一步比较nodeID. 而 nodeID 是根据 ip 和 port 进行位移算出来的一个整形数字.

需要注意prepare过程中acceptor判断提议的Ballot和promiseBallot大小关系: 大于或等于的. 其中"等于"的情况是针对prepare因为超时而重试 的情况. 只有这种情况下nodeID和proposeID都一致. 所以, 对于这种prepare超时的情况, proposeID不递增也可以继续工作.

Proposer::OnPrepareReply()

void Proposer :: OnPrepareReply(const PaxosMsg & oPaxosMsg)
{
   PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
           oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
           oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
   BP->GetProposerBP()->OnPrepareReply();
   if (!m_bIsPreparing)
   {
       BP->GetProposerBP()->OnPrepareReplyButNotPreparing();
       //PLGErr("Not preparing, skip this msg");
       return;
   }
   if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
   {
       BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg();
       //PLGErr("ProposalID not same, skip this msg");
       return;
   }
   m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
   if (oPaxosMsg.rejectbypromiseid() == 0)
   {  // 如果没有被该acceptor拒绝
       BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
       PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu",
               oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
       m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
       // 记录该acceptor曾经accept过的value(如果有的话)
       m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
   }
   else
   {  // 如果被该acceptor拒绝
       PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());
       m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
       m_bWasRejectBySomeOne= true;
       m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
   }
   if (m_oMsgCounter.IsPassedOnThisRound())
   {  // 如果已经被多数派接受, 设置m_bCanSkipPrepare 并进入accept阶段
       int iUseTimeMs = m_oTimeStat.Point();
       BP->GetProposerBP()->PreparePass(iUseTimeMs);
       PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);
       m_bCanSkipPrepare = true;
       Accept();
   }
   else if (m_oMsgCounter.IsRejectedOnThisRound()
           || m_oMsgCounter.IsAllReceiveOnThisRound())
   {  // 若果被多数派拒绝 或者没有被多数派接受
       BP->GetProposerBP()->PrepareNotPass();
       PLGImp("[Not Pass] wait 30ms and restart prepare");
       AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
   }
   PLGHead("END");
}

proposer在发出prepare请求之如果收到多数派的acceptor的承诺, 那么可以进入accept阶段而且设置m_bCanSkipPrepare为true.
另外, proposer记录acceptor返回的acceptedValue(有的话), 在accept阶段时会使用proposeID最大的acceptedValue作为提议值. 如果这一轮Paxos实例还没有任何acceptor接受过某个value的话, proposer可以使用自己一开始的提议值.

如果没有被多数派接受, 说明本次提议已经"过时", 需要那么需要等待10~40ms的时间, 重新发起prepare请求.

Accept阶段

Proposer::Accept()

接下来Paxos算法进入accept阶段.

void Proposer::Accept()
{
   PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu",
           m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());
   BP->GetProposerBP()->Accept();
   m_oTimeStat.Point();
   ExitPrepare();
   m_bIsAccepting = true;
   PaxosMsg oPaxosMsg;
   oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
   oPaxosMsg.set_instanceid(GetInstanceID());
   oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
   oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
   oPaxosMsg.set_value(m_oProposerState.GetValue());
   oPaxosMsg.set_lastchecksum(GetLastChecksum());
   m_oMsgCounter.StartNewRound();
   AddAcceptTimer();
   PLGHead("END");
   BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}

proposer构造accept请求, 填充MsgType、InstanceID、NodeID、ProposeID、Value等字段.
构造好请求后, 将请求广播到系统中所有节点.

Acceptor::OnAccept()

void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{
   PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
           oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());
   BP->GetAcceptorBP()->OnAccept();
   PaxosMsg oReplyPaxosMsg;
   oReplyPaxosMsg.set_instanceid(GetInstanceID());
   oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
   oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
   oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);
   BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
   if (oBallot >= m_oAcceptorState.GetPromiseBallot())
   {
       // 如果proposeID 大于等于 promiseID
       PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
               "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
               m_oAcceptorState.GetPromiseBallot().m_llProposalID,
               m_oAcceptorState.GetPromiseBallot().m_llNodeID,
               m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
               m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
       // 记录promiseID, acceptedBallot以及acceptedValue
       m_oAcceptorState.SetPromiseBallot(oBallot);
       m_oAcceptorState.SetAcceptedBallot(oBallot);
       m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());
       // 持久化promiseID, acceptedBallot以及acceptedValue
       int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
       if (ret != 0)
       {
           BP->GetAcceptorBP()->OnAcceptPersistFail();
           PLGErr("Persist fail, Now.InstanceID %lu ret %d",
                   GetInstanceID(), ret);
           return;
       }
       BP->GetAcceptorBP()->OnAcceptPass();
    }
   else
   {
       // 提议的proposalID小于promiseID, 拒绝这个accept请求
       BP->GetAcceptorBP()->OnAcceptReject();
       PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
               m_oAcceptorState.GetPromiseBallot().m_llProposalID,
               m_oAcceptorState.GetPromiseBallot().m_llNodeID);
       // 返回promiseID, 方便proposer更正自己的proposeID      
       oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
   }
   nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
   PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
           GetInstanceID(), oPaxosMsg.nodeid());
   SendMessage(iReplyNodeID, oReplyPaxosMsg);
}

acceptor接到accept请求后, 检查编号n是否大于等于promiseID, 如果是, 那么acceptor接受该提议, 更新promiseID、acceptedBallot和acceptedValue, 并需要进行持久化.
如果不是的话, acceptor就拒绝该请求, 同时要返回自己的promiseIDl以便proposer更新下一次的提议编号.

Proposer::OnAcceptReply()

void Proposer :: OnAcceptReply(const PaxosMsg & oPaxosMsg)
{
   PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
           oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
           oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
   BP->GetProposerBP()->OnAcceptReply();
   if (!m_bIsAccepting)
   {
       //PLGErr("Not proposing, skip this msg");
       BP->GetProposerBP()->OnAcceptReplyButNotAccepting();
       return;
   }
   if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
   {
       //PLGErr("ProposalID not same, skip this msg");
       BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg();
       return;
   }
   m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
   if (oPaxosMsg.rejectbypromiseid() == 0)
   {  // 如果没有被该acceptor拒绝
       PLGDebug("[Accept]");
       m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
   }
   else
   {  // 如果被该acceptor拒绝
       PLGDebug("[Reject]");
       m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
       m_bWasRejectBySomeOne= true;
       m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
   }
   if (m_oMsgCounter.IsPassedOnThisRound())
   { // 若果被多数派接受
       int iUseTimeMs = m_oTimeStat.Point();
       BP->GetProposerBP()->AcceptPass(iUseTimeMs);
       PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
       ExitAccept();
       // 通知learner学习提议结果
       m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
   }
   else if (m_oMsgCounter.IsRejectedOnThisRound()
           || m_oMsgCounter.IsAllReceiveOnThisRound())
   { // 若果被多数派拒绝 或者没有被多数派接受
       BP->GetProposerBP()->AcceptNotPass();
       PLGImp("[Not pass] wait 30ms and Restart prepare");
       AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
   }
   PLGHead("END");
}

如果proposer收到了过半数的acceptor的接受,那么它就可以确定自己提议的值被选定了, 此时通知learner学习提议结果.
如果没有被多数派的acceptor接受, 那么需要等待10~40ms的时间, 重新回到prepare阶段.

Learner::ProposerSendSuccess()

提议被选择后, learner需要学习提议结果, 具体逻辑如下:

void Learner :: ProposerSendSuccess(
       const uint64_t llLearnInstanceID,
       const uint64_t llProposalID)
{
   BP->GetLearnerBP()->ProposerSendSuccess();
   PaxosMsg oPaxosMsg;
   oPaxosMsg.set_msgtype(MsgType_PaxosLearner_ProposerSendSuccess);
   oPaxosMsg.set_instanceid(llLearnInstanceID);
   oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
   oPaxosMsg.set_proposalid(llProposalID);
   oPaxosMsg.set_lastchecksum(GetLastChecksum());
   //run self first
   BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_First);
}
int Instance :: ReceiveMsgForLearner(const PaxosMsg & oPaxosMsg)
{
   if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_AskforLearn)
   {
       m_oLearner.OnAskforLearn(oPaxosMsg);
   }
   else if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_SendLearnValue)
   {
       m_oLearner.OnSendLearnValue(oPaxosMsg);
   }
   else if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_ProposerSendSuccess)
   {
       // 学习提议结果,持久化提议到log中.
       // 并且广播提议结果到其他节点的learner(因为目前只有proposer所在的learner才知道提议结果).
       m_oLearner.OnProposerSendSuccess(oPaxosMsg);
   }
   ...
   ...
   ...
   if (m_oLearner.IsLearned())
   {
       ...
       ...
       ...
       // 在状态机中执行提议结果
       // 注意bIsMyCommit这个变量, 提议结果可能不是自己一开始提议的, 可能是prepare阶段后以其他值进行的提议
       if (!SMExecute(m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue(), bIsMyCommit, poSMCtx))
       {
           BP->GetInstanceBP()->OnInstanceLearnedSMExecuteFail();
           PLGErr("SMExecute fail, instanceid %lu, not increase instanceid", m_oLearner.GetInstanceID());
           m_oCommitCtx.SetResult(PaxosTryCommitRet_ExecuteFail,
                   m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue());
           m_oProposer.CancelSkipPrepare();
           return -1;
       }
       {
           //this paxos instance end, tell proposal done
           m_oCommitCtx.SetResult(PaxosTryCommitRet_OK
                   , m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue());
           if (m_iCommitTimerID > 0)
           {
               m_oIOLoop.RemoveTimer(m_iCommitTimerID);
           }
       }
       PLGHead("[Learned] New paxos starting, Now.Proposer.InstanceID %lu "
               "Now.Acceptor.InstanceID %lu Now.Learner.InstanceID %lu",
               m_oProposer.GetInstanceID(), m_oAcceptor.GetInstanceID(), m_oLearner.GetInstanceID());
       PLGHead("[Learned] Checksum change, last checksum %u new checksum %u",
               m_iLastChecksum, m_oLearner.GetNewChecksum());
       m_iLastChecksum = m_oLearner.GetNewChecksum();
       NewInstance(); // 开始新的一轮Paxos实例
       PLGHead("[Learned] New paxos instance has started, Now.Proposer.InstanceID %lu "
               "Now.Acceptor.InstanceID %lu Now.Learner.InstanceID %lu",
               m_oProposer.GetInstanceID(), m_oAcceptor.GetInstanceID(), m_oLearner.GetInstanceID());
       m_oCheckpointMgr.SetMaxChosenInstanceID(m_oAcceptor.GetInstanceID());
       BP->GetInstanceBP()->NewInstance();
   }
   return 0;

learner感知到被选择的提议后, 需要调用PaxosLog::WriteState()持久化提议结果(上面的代码没有体现, 具体请阅读完整源码).
同时还需要广播提议结果到acceptor所在的节点, 因为目前只有proposer所在节点才知道提议结果.
完成上述操作后, learner通知状态机执行本次提议结果.
需要注意的是, 提议结果可能不是自己一开始提议的, 可能是prepare阶段后以其他值进行的提议.
最后, 调用NewInstance()初始化新的Paxos实例.

Instance::NewInstance()

其中NewInstance()的代码如下:

void Instance::NewInstance()
{
   m_oAcceptor.NewInstance();
   m_oLearner.NewInstance();
   m_oProposer.NewInstance();
}

初始化新一轮Paxos实例中的proposer/acceptor/learner对象, 这些角色的初始化逻辑是:

  1. 递增instanceID

  2. 初始化ProposerState/AcceptorState/LearnerState, 但是保持proposer的proposalID、acceptor的promiseID不变.

总结

本文粗略介绍了PhxPaxos的主要结构以及它的Paxos算法的实现, 除此之外的PhxPaxos中的其他模块比如Log Strorage、Master Election和Checkpoint机制在本文并没有覆盖.
从我的观点来看, PhxPaxos的实现是非常优雅的, 不管是对于分布式系统中模块/角色的抽象, 还是具体的代码风格. 另外, PhxPaxos在使用上还提供了很高的灵活度: 应用开发者可以只实现状态机, 也可以选择自己实现Log Strorage和Network接口. 当然, 如果应用开发者只需PhxPaxos的Master Election功能, 那他连状态机也无需实现.

但是, PhxPaxos的默认的Log Strorage在我的观点看来性能方面会比较差, 因为它使用了LevelDB作为存储引擎(instanceID作为key, 提议值作为value, 除此之外不包含其他信息了包括promiseID/acceptedID之类的), 一个基于LSM设计的、专门用于随机写场景的引擎. 硬币都有两个面, LevelDB为了优化随机写的性能, 也带来了写放大的问题, 一是它本身的WAL落盘, 二是SST文件的落盘, 三是SST文件的compaction. 对于Paxos Log Storage来说, 其实它本身就类似于一个WAL, 直接顺序写入硬盘应该是效率最高的做法.

总的来说, PhxPaxos是一个非常棒的项目, 也是非常值得学习研究的一个项目!




推荐阅读
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • [译]技术公司十年经验的职场生涯回顾
    本文是一位在技术公司工作十年的职场人士对自己职业生涯的总结回顾。她的职业规划与众不同,令人深思又有趣。其中涉及到的内容有机器学习、创新创业以及引用了女性主义者在TED演讲中的部分讲义。文章表达了对职业生涯的愿望和希望,认为人类有能力不断改善自己。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • 解决Cydia数据库错误:could not open file /var/lib/dpkg/status 的方法
    本文介绍了解决iOS系统中Cydia数据库错误的方法。通过使用苹果电脑上的Impactor工具和NewTerm软件,以及ifunbox工具和终端命令,可以解决该问题。具体步骤包括下载所需工具、连接手机到电脑、安装NewTerm、下载ifunbox并注册Dropbox账号、下载并解压lib.zip文件、将lib文件夹拖入Books文件夹中,并将lib文件夹拷贝到/var/目录下。以上方法适用于已经越狱且出现Cydia数据库错误的iPhone手机。 ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 拥抱Android Design Support Library新变化(导航视图、悬浮ActionBar)
    转载请注明明桑AndroidAndroid5.0Loollipop作为Android最重要的版本之一,为我们带来了全新的界面风格和设计语言。看起来很受欢迎࿰ ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • 本文介绍了C#中生成随机数的三种方法,并分析了其中存在的问题。首先介绍了使用Random类生成随机数的默认方法,但在高并发情况下可能会出现重复的情况。接着通过循环生成了一系列随机数,进一步突显了这个问题。文章指出,随机数生成在任何编程语言中都是必备的功能,但Random类生成的随机数并不可靠。最后,提出了需要寻找其他可靠的随机数生成方法的建议。 ... [详细]
  • 本文介绍了如何使用iptables添加非对称的NAT规则段,以实现内网穿透和端口转发的功能。通过查阅相关文章,得出了解决方案,即当匹配的端口在映射端口的区间内时,可以成功进行端口转发。详细的操作步骤和命令示例也在文章中给出。 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
author-avatar
秀珍冠秋晓雯
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有