- QuorumPeerMain
这是zookeeper服务的启动类,它提供了一个main()方法用来启动zookeeper服务,如下:
public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {main.initializeAndRun(args);} catch (IllegalArgumentException e) {LOG.error("Invalid arguments, exiting abnormally", e);LOG.info(USAGE);System.err.println(USAGE);System.exit(ExitCode.INVALID_INVOCATION.getValue());} catch (ConfigException e) {LOG.error("Invalid config, exiting abnormally", e);System.err.println("Invalid config, exiting abnormally");System.exit(ExitCode.INVALID_INVOCATION.getValue());} catch (DatadirException e) {LOG.error("Unable to access datadir, exiting abnormally", e);System.err.println("Unable to access datadir, exiting abnormally");System.exit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());} catch (AdminServerException e) {LOG.error("Unable to start AdminServer, exiting abnormally", e);System.err.println("Unable to start AdminServer, exiting abnormally");System.exit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());} catch (Exception e) {LOG.error("Unexpected exception, exiting abnormally", e);System.exit(ExitCode.UNEXPECTED_ERROR.getValue());}LOG.info("Exiting normally");System.exit(ExitCode.EXECUTION_FINISHED.getValue());}
它就做了两件事,一:初始化QuorumPeerMain,二:initializeAndRun(args)启动服务.接下来看看initializeAndRun(args)这个方法:
protected void initializeAndRun(String[] args)throws ConfigException, IOException, AdminServerException{QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {config.parse(args[0]);}DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start();if (args.length == 1 && config.isDistributed()) {runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");ZooKeeperServerMain.main(args);}}
这里做了三件事:
1.解析zookeeper的配置文件
2.创建并启动数据文件清理管理器(包括快照文件和事务日志)
3.判断是单机还是分布式部署方式来启动zookeeper
集群模式将会调用runFromConfig(config)这个方法启动zookeeper服务
public void runFromConfig(QuorumPeerConfig config)throws IOException, AdminServerException{try {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}LOG.info("Starting quorum peer");MetricsProvider metricsProvider;try {metricsProvider = MetricsProviderBootstrap.startMetricsProvider(config.getMetricsProviderClassName(),config.getMetricsProviderConfiguration());} catch (MetricsProviderLifeCycleException error) {throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(),error);}try {ServerMetrics.metricsProviderInitialized(metricsProvider);ServerCnxnFactory cnxnFactory = null;ServerCnxnFactory secureCnxnFactory = null;if (config.getClientPortAddress() != null) {cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns(),config.getClientPortListenBacklog(), false);}if (config.getSecureClientPortAddress() != null) {secureCnxnFactory = ServerCnxnFactory.createFactory();secureCnxnFactory.configure(config.getSecureClientPortAddress(),config.getMaxClientCnxns(),config.getClientPortListenBacklog(), true);}quorumPeer = getQuorumPeer();quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(),config.getDataDir()));quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());quorumPeer.setElectionType(config.getElectionAlg());quorumPeer.setMyid(config.getServerId());quorumPeer.setTickTime(config.getTickTime());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit());quorumPeer.setObserverMasterPort(config.getObserverMasterPort());quorumPeer.setConfigFileName(config.getConfigFilename());quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);if (config.getLastSeenQuorumVerifier()!=null) {quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);}quorumPeer.initConfigInZKDatabase();quorumPeer.setCnxnFactory(cnxnFactory);quorumPeer.setSecureCnxnFactory(secureCnxnFactory);quorumPeer.setSslQuorum(config.isSslQuorum());quorumPeer.setUsePortUnification(config.shouldUsePortUnification());quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());if (config.sslQuorumReloadCertFiles) {quorumPeer.getX509Util().enableCertFileReloading();}quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if(quorumPeer.isQuorumSaslAuthEnabled()){quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();if(config.jvmPauseMonitorToRun) {quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));}quorumPeer.start();quorumPeer.join();} catch (InterruptedException e) {LOG.warn("Quorum Peer interrupted", e);} finally {if (metricsProvider != null) {try {metricsProvider.stop();} catch (Throwable error) {LOG.warn("Error while stopping metrics", error);}}}}
这个方法做了以下几件事:
1.注册log4j JMX控件
2.创建性能指标统计类MetricsProvider
3.创建服务端与客户端通信工厂类ServerCnxnFactory
4.初始化QuorumPeer并启动这个线程