区块链教程Fabric1.0源代码分析流言算法Gossip服务端二
Fabric 1.0源代码笔记 之 gossip(流言算法) #GossipServer(Gossip服务端)//conn.serviceConnection(),启动连接服务
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error
//return &proto.Empty{}
func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error)func (c *commImpl) GetPKIid() common.PKIidType
//向指定节点发送消息
func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer)
//探测远程节点是否有响应,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Probe(remotePeer *RemotePeer) error
//握手验证远程节点,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error)
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage
func (c *commImpl) PresumedDead() <-chan common.PKIidType
func (c *commImpl) CloseConn(peer *RemotePeer)
func (c *commImpl) Stop()//创建并启动gRPC Server&#xff0c;以及注册GossipServer实例
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
//将GossipServer实例注册至peerServer
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
func extractRemoteAddress(stream stream) string
func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error)
//创建gRPC Server&#xff0c;grpc.NewServer(serverOpts...)
func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte)//创建与服务端连接
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error)
//向指定节点发送消息
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage)
//return atomic.LoadInt32(&c.stopping) &#61;&#61; int32(1)
func (c *commImpl) isStopping() bool
func (c *commImpl) emptySubscriptions()
func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error)
func (c *commImpl) disconnect(pkiID common.PKIidType)
func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer proto.Signer) (*proto.SignedGossipMessage, error)
//代码在gossip/comm/comm_impl.go
创建并启动gRPC Server&#xff0c;以及注册GossipServer实例
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) {var ll net.Listenervar s *grpc.Servervar certHash []byteif len(dialOpts) &#61;&#61; 0 {//peer.gossip.dialTimeout&#xff0c;gRPC连接拨号的超时dialOpts &#61; []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))}}if port > 0 {//创建gRPC Server&#xff0c;grpc.NewServer(serverOpts...)s, ll, secureDialOpts, certHash &#61; createGRPCLayer(port)}commInst :&#61; &commImpl{selfCertHash: certHash,PKIID: idMapper.GetPKIidOfCert(peerIdentity),idMapper: idMapper,logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),peerIdentity: peerIdentity,opts: dialOpts,secureDialOpts: secureDialOpts,port: port,lsnr: ll,gSrv: s,msgPublisher: NewChannelDemultiplexer(),lock: &sync.RWMutex{},deadEndpoints: make(chan common.PKIidType, 100),stopping: int32(0),exitChan: make(chan struct{}, 1),subscriptions: make([]chan proto.ReceivedMessage, 0),}commInst.connStore &#61; newConnStore(commInst, commInst.logger)if port > 0 {commInst.stopWG.Add(1)go func() {defer commInst.stopWG.Done()s.Serve(ll) //启动gRPC Server}()//commInst注册到gRPC Serverproto.RegisterGossipServer(s, commInst)}return commInst, nil
}//代码在gossip/comm/comm_impl.go
将GossipServer实例注册至peerServer
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error) {dialOpts &#61; append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout)))//构造commImplcommInst, err :&#61; NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)if cert !&#61; nil {inst :&#61; commInst.(*commImpl)inst.selfCertHash &#61; certHashFromRawCert(cert.Certificate[0])}proto.RegisterGossipServer(s, commInst.(*commImpl))return commInst, nil
}//代码在gossip/comm/comm_impl.go
//创建与服务端连接
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {var err errorvar cc *grpc.ClientConnvar stream proto.Gossip_GossipStreamClientvar pkiID common.PKIidTypevar connInfo *proto.ConnectionInfovar dialOpts []grpc.DialOptiondialOpts &#61; append(dialOpts, c.secureDialOpts()...)dialOpts &#61; append(dialOpts, grpc.WithBlock())dialOpts &#61; append(dialOpts, c.opts...)cc, err &#61; grpc.Dial(endpoint, dialOpts...)cl :&#61; proto.NewGossipClient(cc)if _, err &#61; cl.Ping(context.Background(), &proto.Empty{}); err !&#61; nil {cc.Close()return nil, err}ctx, cf :&#61; context.WithCancel(context.Background())stream, err &#61; cl.GossipStream(ctx)connInfo, err &#61; c.authenticateRemotePeer(stream)pkiID &#61; connInfo.IDconn :&#61; newConnection(cl, cc, stream, nil)conn.pkiID &#61; pkiIDconn.info &#61; connInfoconn.logger &#61; c.loggerconn.cancel &#61; cfh :&#61; func(m *proto.SignedGossipMessage) {c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{conn: conn,lock: conn,SignedGossipMessage: m,connInfo: connInfo,})}conn.handler &#61; hreturn conn, nil
}
//代码在gossip/comm/comm_impl.go
type connection struct {cancel context.CancelFuncinfo *proto.ConnectionInfooutBuff chan *msgSendinglogger *logging.Logger // loggerpkiID common.PKIidType // pkiID of the remote endpointhandler handler // function to invoke upon a message receptionconn *grpc.ClientConn // gRPC connection to remote endpointcl proto.GossipClient // gRPC stub of remote endpointclientStream proto.Gossip_GossipStreamClient // client-side stream to remote endpointserverStream proto.Gossip_GossipStreamServer // server-side stream to remote endpointstopFlag int32 // indicates whether this connection is in process of stoppingstopChan chan struct{} // a method to stop the server-side gRPC call from a different go-routinesync.RWMutex // synchronizes access to shared variables
}//构造connection
func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection
//关闭connection
func (conn *connection) close()
//atomic.LoadInt32(&(conn.stopFlag)) &#61;&#61; int32(1)
func (conn *connection) toDie() bool
//conn.outBuff <- m&#xff0c;其中m为msgSending{envelope: msg.Envelope,onErr: onErr,}
func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error))
//go conn.readFromStream(errChan, msgChan)、go conn.writeToStream()&#xff0c;同时msg :&#61; <-msgChan&#xff0c;conn.handler(msg)
func (conn *connection) serviceConnection() error
//循环不间断从conn.outBuff取数据&#xff0c;然后stream.Send(m.envelope)
func (conn *connection) writeToStream()
//循环不间断envelope, err :&#61; stream.Recv()、msg, err :&#61; envelope.ToGossipMessage()、msgChan <- msg
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage)
//获取conn.serverStream
func (conn *connection) getStream() stream
//代码在gossip/comm/conn.go
type connectionStore struct {logger *logging.Logger // loggerisClosing bool // whether this connection store is shutting downconnFactory connFactory // creates a connection to remote peersync.RWMutex // synchronize access to shared variablespki2Conn map[string]*connection //connection map, key为pkiID&#xff0c;value为connectiondestinationLocks map[string]*sync.RWMutex //mapping between pkiIDs and locks,// used to prevent concurrent connection establishment to the same remote endpoint
}//构造connectionStore
func newConnStore(connFactory connFactory, logger *logging.Logger) *connectionStore
//从connection map中获取连接&#xff0c;如无则创建并启动连接&#xff0c;并写入connection map中
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error)
//连接数量
func (cs *connectionStore) connNum() int
//关闭指定连接
func (cs *connectionStore) closeConn(peer *RemotePeer)
//关闭所有连接
func (cs *connectionStore) shutdown()
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *proto.ConnectionInfo) *connection
//注册连接
func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo, serverStream proto.Gossip_GossipStreamServer) *connection
//关闭指定连接
func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType)
//代码在gossip/comm/conn.go
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {cs.RLock()isClosing :&#61; cs.isClosingcs.RUnlock()pkiID :&#61; peer.PKIIDendpoint :&#61; peer.Endpointcs.Lock()destinationLock, hasConnected :&#61; cs.destinationLocks[string(pkiID)]if !hasConnected {destinationLock &#61; &sync.RWMutex{}cs.destinationLocks[string(pkiID)] &#61; destinationLock}cs.Unlock()destinationLock.Lock()cs.RLock()//从connection map中获取conn, exists :&#61; cs.pki2Conn[string(pkiID)]if exists {cs.RUnlock()destinationLock.Unlock()return conn, nil}cs.RUnlock()//创建连接createdConnection, err :&#61; cs.connFactory.createConnection(endpoint, pkiID)destinationLock.Unlock()conn &#61; createdConnectioncs.pki2Conn[string(createdConnection.pkiID)] &#61; conngo conn.serviceConnection() //启动连接的消息接收处理、以及向对方节点发送消息return conn, nil
}
//代码在gossip/comm/conn.go
type ChannelDeMultiplexer struct {channels []*channellock *sync.RWMutexclosed int32
}//构造ChannelDeMultiplexer
func NewChannelDemultiplexer() *ChannelDeMultiplexer
//atomic.LoadInt32(&m.closed) &#61;&#61; int32(1)
func (m *ChannelDeMultiplexer) isClosed() bool
//关闭
func (m *ChannelDeMultiplexer) Close()
//添加通道
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{}
//挨个通道发送消息
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{})