PhxPaxos是微信后台团队开源的一套基于Paxos实现的c++库, 用于实现分布式系统中一致性的共识. 应用开发者可以将它嵌入到自己的应用中, 使单节点应用可以扩展成为多节点分布式应用, 而且多节点之间的状态可以保持一致, 多节点之间也可以自动容灾切换.
PhxPaxos这个库使用方法很简单, 开发者只需要实现StateMachine接口定义自己的状态机, 并用这个状态机和其他选项(当前节点信息、节点列表)来初始化出来一个Node对象. 应用通过这个Node对象的Propose方法便可以发起提议. 提议在整个集群中达成多数派之后, PhxPaxos会在上述状态机中执行提议的内容.
PhxPaxos内部有若干个类, 分别对分布式系统中和Paxos算法中的不同模块/角色进行了抽象, 主要列举如下:
Node接口: 对外暴露的API, 比如上述的Propose方法便是该接口的方法之一.
PNode类: 实现了Node接口, 也是对系统中单个节点的抽象.
Group类: 对每个Paxos group的抽象.
Instance类: 对group中每轮Paxos实例的抽象.
Proposer/Acceptor/Learner类: 对Paxos实例中的各角色的抽象.
StateMachine接口: 定义了状态机需要实现的方法. 应用开发者需要实现.
Network接口: 定义了通信模块需要实现的方法. 可以是应用开发者实现, 也可以用PhxPaxos默认的通信模块(基于event loop模式).
LogStorage接口: 定义了log存储模块需要实现的方法. 可以是应用开发者实现, 也可以用PhxPaxos默认的存储模块(基于levelDB).
MsgTransport类: 封装了Network接口简化PhxPaxos内部和网络通信相关的逻辑, 比如实现消息广播.
PaxosLog类: 封装了LogStorage接口简化PhxPaxos内部和log相关的逻辑.
这些类之间的关系简要说明如下:
PNode对象是对系统中单个节点的抽象, 它可以拥有多个Group对象, 表示系统的节点支持多个Paxos group.
Group对象简单封装了Instance对象.
Instance对象是每一轮Paxos实例的抽象, 它组合了Proposer/Acceptor/Learner三个对象(角色). 它主要负责接收来自NetWork的消息并通过IOLoop线程来驱动各个角色.
Proposer/Acceptor/Learner: 分别实现了Paxos中对应角色的逻辑. 它们都继承自Base基类, 基类中维护了当前的InstanceId.
下面是自己简单画的UML类图(省略了Group、StateMachine、NetWork等数据结构):
图中除IOLoop类外刚才都已经有所介绍. 这里专门提一下IOLoop这个类, 它是对一个单独线程的抽象, 该线程主要作用是驱动Paxos实例中各角色的运转, 后面会提到这一点.
PhxPaxos中的Paxos算法部分是基于Paxos Made Simple中的论述进行实现的, 论文中的算法流程主要有prepare和accept两个阶段, 具体可以参考这篇文章: 一步一步推导Paxos算法
我们看看prepare阶段是怎么实现的.
开发者的应用程序提议一个新的value, 该value最终会在IOLoop线程中通过Proposer的NewValue()方法发起Paxos流程.
触发过程依次如下:
应用线程执行: PNode::Propose()
IOLoop线程执行: IOLoop::CheckNewValue()
IOLoop线程执行: Proposer::NewValue()
其中Proposer :: NewValue()代码如下:
int Proposer :: NewValue(const std::string & sValue)
{
BP->GetProposerBP()->NewProposal(sValue);
if (m_oProposerState.GetValue().size() == 0)
{
m_oProposerState.SetValue(sValue);
}
m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;
if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
{
// 以往的Paxos实例prepare阶段成功过 && 以往的Paxos实例没有被拒绝.,
// 才可以跳过prepare阶段
BP->GetProposerBP()->NewProposalSkipPrepare();
PLGHead("skip prepare, directly start accept");
Accept();
}
else
{
//if not reject by someone, no need to increase ballot
Prepare(m_bWasRejectBySomeone);
}
return 0;
}
开始提议这个新value之前会先判断这次是否可以直接进入accept阶段. 否则需要从prepare阶段开始.
什么情况可以跳过prepare阶段呢? 条件是: 以往的Paxos实例prepare阶段成功过 && 以往的Paxos实例没有被拒绝.
"以往的Paxos实例prepare阶段成功过" 说明多数派已经承诺了不会接受小于它的proposerID的请求, 也即是m_bCanSkipPrepare为true.
"以往的Paxos实例没有被拒绝"说明它的proposerID还没有过时, 也即是系统中还没有更大的proposerID.
Proposer::Prepare()代码如下:
void Proposer :: Prepare(const bool bNeedNewBallot)
{
PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
m_oProposerState.GetValue().size());
BP->GetProposerBP()->Prepare();
m_oTimeStat.Point();
ExitAccept();
m_bIsPreparing = true;
m_bCanSkipPrepare = false;
m_bWasRejectBySomeOne= false;
m_oProposerState.ResetHighestOtherPreAcceptBallot();
if (bNeedNewBallot) // 如果被拒绝过, 需要使用新的proposalID
{
m_oProposerState.NewPrepare();
}
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosPrepare); // 构造prepare请求
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
m_oMsgCounter.StartNewRound(); // 重置一些计数, 包括被接受的数量、被拒绝的数量
AddPrepareTimer();
PLGHead("END OK");
BroadcastMessage(oPaxosMsg); // 广播prepare请求
}
在Prepare()方法的逻辑如下:
重置一些标记.
根据是否被拒绝来判断是否使用更大的proposalID. 如果被拒绝过, 需要使用更大的proposalID.
开始构造prepare请求, 填充MsgType、InstanceID、NodeID、ProposalID字段.
重置一些计数信息, 包括被接受的数量、被拒绝的数量.
将构造好的请求广播到系统中所有节点.
proposer将消息广播出去后将会被acceptor收到, 触发acceptor的相关逻辑.
触发过程如下:
网络线程执行: PNode::OnReceiveMessage()
IOLoop线程执行: Instance::OnReceive()
IOLoop线程执行: Instance::OnReceivePaxosMsg()
IOLoop线程执行: Acceptor::OnPrepare()
Acceptor::OnPrepare()代码如下所示:
int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());
BP->GetAcceptorBP()->OnPrepare();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
// 如果proposeID 大于等于 promiseID, 那么需要返回acceptor已经accept过的proposeID和value
oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
{
oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
}
承诺不接受小于当前提议的proposeID的其他提议
m_oAcceptorState.SetPromiseBallot(oBallot);
// 持久化promiseID
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnPreparePersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return -1;
}
BP->GetAcceptorBP()->OnPreparePass();
}
else
{
// 提议的proposalID小于promiseID, 拒绝这个prepare请求
BP->GetAcceptorBP()->OnPrepareReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
// 返回promiseID, 方便proposer更正自己的proposeID
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());;
// 回复proposer
SendMessage(iReplyNodeID, oReplyPaxosMsg);
return 0;
}
acceptor接到prepare请求时, 如果prepare的编号n大于等于promiseID, 那么他承诺永远也不会同意提议编号比这n值小的prepare请求, 记为promiseID=n, 需要进行持久化. 另外, 如果它已经accept了某个值, 需要把接受的值(acceptedValue)和当时的提议编号(acceptedBallot)返回给proposer.
如果n小于promiseID, 那么acceptor可以拒绝该请求, 同时要返回自己的promisedID以便proposer更新下一次的提议编号.
BallotNumber这个概念和proposeID类似, 它包含proposeID和nodeID. BallotNumber实现了比较操作符: 先比较proposeID, 如果proposeID相同那么进一步比较nodeID. 而 nodeID 是根据 ip 和 port 进行位移算出来的一个整形数字.
需要注意prepare过程中acceptor判断提议的Ballot和promiseBallot大小关系: 大于或等于的. 其中"等于"的情况是针对prepare因为超时而重试 的情况. 只有这种情况下nodeID和proposeID都一致. 所以, 对于这种prepare超时的情况, proposeID不递增也可以继续工作.
void Proposer :: OnPrepareReply(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
BP->GetProposerBP()->OnPrepareReply();
if (!m_bIsPreparing)
{
BP->GetProposerBP()->OnPrepareReplyButNotPreparing();
//PLGErr("Not preparing, skip this msg");
return;
}
if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
{
BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg();
//PLGErr("ProposalID not same, skip this msg");
return;
}
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
if (oPaxosMsg.rejectbypromiseid() == 0)
{ // 如果没有被该acceptor拒绝
BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu",
oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
// 记录该acceptor曾经accept过的value(如果有的话)
m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
}
else
{ // 如果被该acceptor拒绝
PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
m_bWasRejectBySomeOne= true;
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
if (m_oMsgCounter.IsPassedOnThisRound())
{ // 如果已经被多数派接受, 设置m_bCanSkipPrepare 并进入accept阶段
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->PreparePass(iUseTimeMs);
PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);
m_bCanSkipPrepare = true;
Accept();
}
else if (m_oMsgCounter.IsRejectedOnThisRound()
|| m_oMsgCounter.IsAllReceiveOnThisRound())
{ // 若果被多数派拒绝 或者没有被多数派接受
BP->GetProposerBP()->PrepareNotPass();
PLGImp("[Not Pass] wait 30ms and restart prepare");
AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
}
PLGHead("END");
}
proposer在发出prepare请求之如果收到多数派的acceptor的承诺, 那么可以进入accept阶段而且设置m_bCanSkipPrepare为true.
另外, proposer记录acceptor返回的acceptedValue(有的话), 在accept阶段时会使用proposeID最大的acceptedValue作为提议值. 如果这一轮Paxos实例还没有任何acceptor接受过某个value的话, proposer可以使用自己一开始的提议值.
如果没有被多数派接受, 说明本次提议已经"过时", 需要那么需要等待10~40ms的时间, 重新发起prepare请求.
接下来Paxos算法进入accept阶段.
void Proposer::Accept()
{
PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu",
m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());
BP->GetProposerBP()->Accept();
m_oTimeStat.Point();
ExitPrepare();
m_bIsAccepting = true;
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
oPaxosMsg.set_value(m_oProposerState.GetValue());
oPaxosMsg.set_lastchecksum(GetLastChecksum());
m_oMsgCounter.StartNewRound();
AddAcceptTimer();
PLGHead("END");
BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}
proposer构造accept请求, 填充MsgType、InstanceID、NodeID、ProposeID、Value等字段.
构造好请求后, 将请求广播到系统中所有节点.
void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());
BP->GetAcceptorBP()->OnAccept();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
// 如果proposeID 大于等于 promiseID
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
// 记录promiseID, acceptedBallot以及acceptedValue
m_oAcceptorState.SetPromiseBallot(oBallot);
m_oAcceptorState.SetAcceptedBallot(oBallot);
m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());
// 持久化promiseID, acceptedBallot以及acceptedValue
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnAcceptPersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return;
}
BP->GetAcceptorBP()->OnAcceptPass();
}
else
{
// 提议的proposalID小于promiseID, 拒绝这个accept请求
BP->GetAcceptorBP()->OnAcceptReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
// 返回promiseID, 方便proposer更正自己的proposeID
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());
SendMessage(iReplyNodeID, oReplyPaxosMsg);
}
acceptor接到accept请求后, 检查编号n是否大于等于promiseID, 如果是, 那么acceptor接受该提议, 更新promiseID、acceptedBallot和acceptedValue, 并需要进行持久化.
如果不是的话, acceptor就拒绝该请求, 同时要返回自己的promiseIDl以便proposer更新下一次的提议编号.
void Proposer :: OnAcceptReply(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
BP->GetProposerBP()->OnAcceptReply();
if (!m_bIsAccepting)
{
//PLGErr("Not proposing, skip this msg");
BP->GetProposerBP()->OnAcceptReplyButNotAccepting();
return;
}
if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
{
//PLGErr("ProposalID not same, skip this msg");
BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg();
return;
}
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
if (oPaxosMsg.rejectbypromiseid() == 0)
{ // 如果没有被该acceptor拒绝
PLGDebug("[Accept]");
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
}
else
{ // 如果被该acceptor拒绝
PLGDebug("[Reject]");
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
m_bWasRejectBySomeOne= true;
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
if (m_oMsgCounter.IsPassedOnThisRound())
{ // 若果被多数派接受
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->AcceptPass(iUseTimeMs);
PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
ExitAccept();
// 通知learner学习提议结果
m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
}
else if (m_oMsgCounter.IsRejectedOnThisRound()
|| m_oMsgCounter.IsAllReceiveOnThisRound())
{ // 若果被多数派拒绝 或者没有被多数派接受
BP->GetProposerBP()->AcceptNotPass();
PLGImp("[Not pass] wait 30ms and Restart prepare");
AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
}
PLGHead("END");
}
如果proposer收到了过半数的acceptor的接受,那么它就可以确定自己提议的值被选定了, 此时通知learner学习提议结果.
如果没有被多数派的acceptor接受, 那么需要等待10~40ms的时间, 重新回到prepare阶段.
提议被选择后, learner需要学习提议结果, 具体逻辑如下:
void Learner :: ProposerSendSuccess(
const uint64_t llLearnInstanceID,
const uint64_t llProposalID)
{
BP->GetLearnerBP()->ProposerSendSuccess();
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosLearner_ProposerSendSuccess);
oPaxosMsg.set_instanceid(llLearnInstanceID);
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(llProposalID);
oPaxosMsg.set_lastchecksum(GetLastChecksum());
//run self first
BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_First);
}
int Instance :: ReceiveMsgForLearner(const PaxosMsg & oPaxosMsg)
{
if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_AskforLearn)
{
m_oLearner.OnAskforLearn(oPaxosMsg);
}
else if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_SendLearnValue)
{
m_oLearner.OnSendLearnValue(oPaxosMsg);
}
else if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_ProposerSendSuccess)
{
// 学习提议结果,持久化提议到log中.
// 并且广播提议结果到其他节点的learner(因为目前只有proposer所在的learner才知道提议结果).
m_oLearner.OnProposerSendSuccess(oPaxosMsg);
}
...
...
...
if (m_oLearner.IsLearned())
{
...
...
...
// 在状态机中执行提议结果
// 注意bIsMyCommit这个变量, 提议结果可能不是自己一开始提议的, 可能是prepare阶段后以其他值进行的提议
if (!SMExecute(m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue(), bIsMyCommit, poSMCtx))
{
BP->GetInstanceBP()->OnInstanceLearnedSMExecuteFail();
PLGErr("SMExecute fail, instanceid %lu, not increase instanceid", m_oLearner.GetInstanceID());
m_oCommitCtx.SetResult(PaxosTryCommitRet_ExecuteFail,
m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue());
m_oProposer.CancelSkipPrepare();
return -1;
}
{
//this paxos instance end, tell proposal done
m_oCommitCtx.SetResult(PaxosTryCommitRet_OK
, m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue());
if (m_iCommitTimerID > 0)
{
m_oIOLoop.RemoveTimer(m_iCommitTimerID);
}
}
PLGHead("[Learned] New paxos starting, Now.Proposer.InstanceID %lu "
"Now.Acceptor.InstanceID %lu Now.Learner.InstanceID %lu",
m_oProposer.GetInstanceID(), m_oAcceptor.GetInstanceID(), m_oLearner.GetInstanceID());
PLGHead("[Learned] Checksum change, last checksum %u new checksum %u",
m_iLastChecksum, m_oLearner.GetNewChecksum());
m_iLastChecksum = m_oLearner.GetNewChecksum();
NewInstance(); // 开始新的一轮Paxos实例
PLGHead("[Learned] New paxos instance has started, Now.Proposer.InstanceID %lu "
"Now.Acceptor.InstanceID %lu Now.Learner.InstanceID %lu",
m_oProposer.GetInstanceID(), m_oAcceptor.GetInstanceID(), m_oLearner.GetInstanceID());
m_oCheckpointMgr.SetMaxChosenInstanceID(m_oAcceptor.GetInstanceID());
BP->GetInstanceBP()->NewInstance();
}
return 0;
learner感知到被选择的提议后, 需要调用PaxosLog::WriteState()持久化提议结果(上面的代码没有体现, 具体请阅读完整源码).
同时还需要广播提议结果到acceptor所在的节点, 因为目前只有proposer所在节点才知道提议结果.
完成上述操作后, learner通知状态机执行本次提议结果.
需要注意的是, 提议结果可能不是自己一开始提议的, 可能是prepare阶段后以其他值进行的提议.
最后, 调用NewInstance()初始化新的Paxos实例.
其中NewInstance()的代码如下:
void Instance::NewInstance()
{
m_oAcceptor.NewInstance();
m_oLearner.NewInstance();
m_oProposer.NewInstance();
}
初始化新一轮Paxos实例中的proposer/acceptor/learner对象, 这些角色的初始化逻辑是:
递增instanceID
初始化ProposerState/AcceptorState/LearnerState, 但是保持proposer的proposalID、acceptor的promiseID不变.
本文粗略介绍了PhxPaxos的主要结构以及它的Paxos算法的实现, 除此之外的PhxPaxos中的其他模块比如Log Strorage、Master Election和Checkpoint机制在本文并没有覆盖.
从我的观点来看, PhxPaxos的实现是非常优雅的, 不管是对于分布式系统中模块/角色的抽象, 还是具体的代码风格. 另外, PhxPaxos在使用上还提供了很高的灵活度: 应用开发者可以只实现状态机, 也可以选择自己实现Log Strorage和Network接口. 当然, 如果应用开发者只需PhxPaxos的Master Election功能, 那他连状态机也无需实现.
但是, PhxPaxos的默认的Log Strorage在我的观点看来性能方面会比较差, 因为它使用了LevelDB作为存储引擎(instanceID作为key, 提议值作为value, 除此之外不包含其他信息了包括promiseID/acceptedID之类的), 一个基于LSM设计的、专门用于随机写场景的引擎. 硬币都有两个面, LevelDB为了优化随机写的性能, 也带来了写放大的问题, 一是它本身的WAL落盘, 二是SST文件的落盘, 三是SST文件的compaction. 对于Paxos Log Storage来说, 其实它本身就类似于一个WAL, 直接顺序写入硬盘应该是效率最高的做法.
总的来说, PhxPaxos是一个非常棒的项目, 也是非常值得学习研究的一个项目!