Solr启动时,涉及到Solr Cloud的地方不多,主要是通过zookeeper维护集群状态,主要有以下几个地方:
1. 加载solr.xml时,此时只有配置了系统属性设置了solr.solrxml.location=zookeeper时才会从zk中加载,否则还是通过本地磁盘加载。
2. CoreContainer.load时,通过ZkContainer来初始化zookeeper。
3. 多线程create core的时候,从zookeeper中加载配置文件。
4. 最后,如果判断是Solr Cloud,则在zk中注册core的状态信息。
其余的工作便和本地文件启动基本无异了。
先来看下加载solr.xml文件的时候的代码,和本地文件启动时是一样的:
private ConfigSolr loadConfigSolr(SolrResourceLoader loader) {
String solrxmlLocation = System.getProperty("solr.solrxml.location", "solrhome");
// 没有配置solr.solrxml.location为zookeeper的,还是走本地加载
if (solrxmlLocation == null || "solrhome".equalsIgnoreCase(solrxmlLocation))
return ConfigSolr.fromSolrHome(loader, loader.getInstanceDir());
// 这里也没什么特殊,new了一个SolrZkClient,然后去zk里取data
if ("zookeeper".equalsIgnoreCase(solrxmlLocation)) {
String zkHost = System.getProperty("zkHost");
log.info("Trying to read solr.xml from " + zkHost);
if (StringUtils.isEmpty(zkHost))
throw new SolrException(ErrorCode.SERVER_ERROR,
"Could not load solr.xml from zookeeper: zkHost system property not set");
SolrZkClient zkClient = new SolrZkClient(zkHost, 30000);
try {
if (!zkClient.exists("/solr.xml", true))
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not load solr.xml from zookeeper: node not found");
byte[] data = zkClient.getData("/solr.xml", null, null, true);
return ConfigSolr.fromInputStream(loader, new ByteArrayInputStream(data));
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not load solr.xml from zookeeper", e);
} finally {
zkClient.close();
}
}
public void initZooKeeper(final CoreContainer cc, String solrHome, String zkHost, int zkClientTimeout, String hostPort,再分别看下里面的各个方法:
String hostContext, String host, int leaderVoteWait, int leaderConflictResolveWait, boolean genericCoreNodeNames) {
ZkController zkCOntroller= null;
// if zkHost sys property is not set, we are not using ZooKeeper
String zookeeperHost;
if(zkHost == null) {
zookeeperHost = System.getProperty("zkHost");
} else {
zookeeperHost = zkHost;
}
String zkRun = System.getProperty("zkRun");
if (zkRun == null && zookeeperHost == null)
return; // not in zk mode
// BEGIN: SOLR-4622: deprecated hardcoded defaults for hostPort & hostContext
if (null == hostPort) {
// throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
// "'hostPort' must be configured to run SolrCloud");
log.warn("Solr 'hostPort' has not be explicitly configured, using hardcoded default of " + DEFAULT_HOST_PORT + ". This default has been deprecated and will be removed in future versions of Solr, please configure this value explicitly");
hostPort = DEFAULT_HOST_PORT;
}
if (null == hostContext) {
// throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
// "'hostContext' must be configured to run SolrCloud");
log.warn("Solr 'hostContext' has not be explicitly configured, using hardcoded default of " + DEFAULT_HOST_CONTEXT + ". This default has been deprecated and will be removed in future versions of Solr, please configure this value explicitly");
hostCOntext= DEFAULT_HOST_CONTEXT;
}
// END: SOLR-4622
// zookeeper in quorum mode currently causes a failure when trying to
// register log4j mbeans. See SOLR-2369
// TODO: remove after updating to an slf4j based zookeeper
System.setProperty("zookeeper.jmx.log4j.disable", "true");
// zkRun不为空,通过SolrZkServer启动内嵌zk,start里面启动了一个守护线程
if (zkRun != null) {
String zkDataHome = System.getProperty("zkServerDataDir", solrHome + "zoo_data");
String zkCOnfHome= System.getProperty("zkServerConfDir", solrHome);
zkServer = new SolrZkServer(zkRun, zookeeperHost, zkDataHome, zkConfHome, hostPort);
zkServer.parseConfig();
zkServer.start();
// set client from server config if not already set
if (zookeeperHost == null) {
zookeeperHost = zkServer.getClientString();
}
}
int zkClientCOnnectTimeout= 30000;
if (zookeeperHost != null) {
// we are ZooKeeper enabled
try {
// If this is an ensemble, allow for a long connect time for other servers to come up
if (zkRun != null && zkServer.getServers().size() > 1) {
zkClientCOnnectTimeout= 24 * 60 * 60 * 1000; // 1 day for embedded ensemble
log.info("Zookeeper client=" + zookeeperHost + " Waiting for a quorum.");
} else {
log.info("Zookeeper client=" + zookeeperHost);
}
String cOnfDir= System.getProperty("bootstrap_confdir");
boolean boostrapCOnf= Boolean.getBoolean("bootstrap_conf");
// 通过zookeeperHost中是否含有“/”来判断是否是chroot,如果是,返回判断是否存在;需要创建时,返回是否创建成功
if(!ZkController.checkChrootPath(zookeeperHost, (confDir!=null) || boostrapConf)) {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"A chroot was specified in ZkHost but the znode doesn't exist. ");
}
zkCOntroller= new ZkController(cc, zookeeperHost, zkClientTimeout,
zkClientConnectTimeout, host, hostPort, hostContext,
leaderVoteWait, leaderConflictResolveWait, genericCoreNodeNames,
new CurrentCoreDescriptorProvider() {
@Override
public ListgetCurrentDescriptors() {
Listdescriptors = new ArrayList (
cc.getCoreNames().size());
Collectioncores = cc.getCores();
for (SolrCore core : cores) {
descriptors.add(core.getCoreDescriptor());
}
return descriptors;
}
});
if (zkRun != null && zkServer.getServers().size() > 1 && cOnfDir== null && boostrapCOnf== false) {
// we are part of an ensemble and we are not uploading the config - pause to give the config time
// to get up
Thread.sleep(10000);
}
// 对于配置了collection.configName参数的,将本地的配置文件上传到zk的指定目录
if(confDir != null) {
File dir = new File(confDir);
if(!dir.isDirectory()) {
throw new IllegalArgumentException("bootstrap_confdir must be a directory of configuration files");
}
String cOnfName= System.getProperty(ZkController.COLLECTION_PARAM_PREFIX+ZkController.CONFIGNAME_PROP, "configuration1");
zkController.uploadConfigDir(dir, confName);
}
// 如果配置了bootstrap_config参数,将把所有配置的cores的配置文件全上传到zk
if(boostrapConf) {
ZkController.bootstrapConf(zkController.getZkClient(), cc, solrHome);
}
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
} catch (TimeoutException e) {
log.error("Could not connect to ZooKeeper", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
} catch (IOException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
}
this.zkCOntroller= zkController;
}
/**之后创建了一个ZkController实例,ZkController主要负责一些集群状态的维护工作,比如注册shard,选举等:
* Validates if the chroot exists in zk (or if it is successfully created).
* Optionally, if create is set to true this method will create the path in
* case it doesn't exist
*
* @return true if the path exists or is created false if the path doesn't
* exist and 'create' = false
*/
public static boolean checkChrootPath(String zkHost, boolean create)
throws KeeperException, InterruptedException {
if (!containsChroot(zkHost)) {
return true;
}
log.info("zkHost includes chroot");
String chrootPath = zkHost.substring(zkHost.indexOf("/"), zkHost.length());
SolrZkClient tmpClient = new SolrZkClient(zkHost.substring(0,
zkHost.indexOf("/")), 60 * 1000);
boolean exists = tmpClient.exists(chrootPath, true);
if (!exists && create) {
tmpClient.makePath(chrootPath, false, true);
exists = true;
}
tmpClient.close();
return exists;
}
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,ZkController的构造函数主要是实例化了一些需要使用的对象,包括一个SolrZkClien()对象,SolrZkClient在功能上类似于基础的Zookeeper类,创建zk的路径节点,判断节点存在,设置、获取节点的Data等,不同的是它的很多操作主要加入了对连接和重连接的管理。当然,里面真正负责与zk交互的是SolrZookeeper实例,它是Zookeeper的子类,看下SolrZkClient的构造方法:
String localHostContext, int leaderVoteWait, int leaderConflictResolveWait, boolean genericCoreNodeNames, final CurrentCoreDescriptorProvider registerOnReconnect)
throws InterruptedException, TimeoutException, IOException
{
if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
this.cc = cc;
this.genericCoreNodeNames = genericCoreNodeNames;
// be forgiving and strip this off leading/trailing slashes
// this allows us to support users specifying hostCOntext="/" in
// solr.xml to indicate the root context, instead of hostCOntext=""
// which means the default of "solr"
localHostCOntext= trimLeadingAndTrailingSlashes(localHostContext);
this.zkServerAddress = zkServerAddress;
this.localHostPort = locaHostPort;
this.localHostCOntext= localHostContext;
this.hostName = normalizeHostName(localHost);
// 这个nodeName拼成:"$hostName:$hostPort_$hostContext"格式
this.nodeName = generateNodeName(this.hostName,
this.localHostPort,
this.localHostContext);
this.leaderVoteWait = leaderVoteWait;
this.leaderCOnflictResolveWait= leaderConflictResolveWait;
this.clientTimeout = zkClientTimeout;
zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout,
zkClientConnectTimeout, new DefaultConnectionStrategy(),
// on reconnect, reload cloud info
new OnReconnect() {
@Override
public void command() {
try {
// this is troublesome - we dont want to kill anything the old
// leader accepted
// though I guess sync will likely get those updates back? But
// only if
// he is involved in the sync, and he certainly may not be
// ExecutorUtil.shutdownAndAwaitTermination(cc.getCmdDistribExecutor());
// we need to create all of our lost watches
// seems we dont need to do this again...
// Overseer.createClientNodes(zkClient, getNodeName());
cc.cancelCoreRecoveries();
registerAllCoresAsDown(registerOnReconnect, false);
ElectionContext cOntext= new OverseerElectionContext(zkClient,
overseer, getNodeName());
ElectionContext prevCOntext= overseerElector.getContext();
if (prevContext != null) {
prevContext.cancelElection();
}
overseerElector.setup(context);
overseerElector.joinElection(context, true);
zkStateReader.createClusterStateWatchersAndUpdate();
// we have to register as live first to pick up docs in the buffer
createEphemeralLiveNode();
Listdescriptors = registerOnReconnect
.getCurrentDescriptors();
// re register all descriptors
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
// TODO: we need to think carefully about what happens when it
// was
// a leader that was expired - as well as what to do about
// leaders/overseers
// with connection loss
try {
register(descriptor.getName(), descriptor, true, true);
} catch (Exception e) {
SolrException.log(log, "Error registering SolrCore", e);
}
}
}
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
throw new ZooKeeperException(
SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (Exception e) {
SolrException.log(log, "", e);
throw new ZooKeeperException(
SolrException.ErrorCode.SERVER_ERROR, "", e);
}
}
}, new BeforeReconnect() {
@Override
public void command() {
try {
ZkController.this.overseer.close();
} catch (Exception e) {
log.error("Error trying to stop any Overseer threads", e);
}
markAllAsNotLeader(registerOnReconnect);
}
});
// 创建/overseer/queue
this.overseerJobQueue = Overseer.getInQueue(zkClient);
// 创建/overseer/collection-queue-work
this.overseerCollectiOnQueue= Overseer.getCollectionQueue(zkClient);
cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient);
this.baseURL = zkStateReader.getBaseUrlForNodeName(this.nodeName);
init(registerOnReconnect);
}
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect) {
this.zkClientCOnnectionStrategy= strat;
this.zkClientTimeout = zkClientTimeout;
// we must retry at least as long as the session timeout
zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout);
// 连接管理器
cOnnManager= new ConnectionManager("ZooKeeperConnection Watcher:"
+ zkServerAddress, this, zkServerAddress, strat, onReconnect, beforeReconnect);
try {
strat.connect(zkServerAddress, zkClientTimeout, connManager,
new ZkUpdate() {
@Override
public void update(SolrZooKeeper zooKeeper) {
SolrZooKeeper oldKeeper = keeper;
keeper = zooKeeper;
try {
closeKeeper(oldKeeper);
} finally {
if (isClosed) {
// we may have been closed
closeKeeper(SolrZkClient.this.keeper);
}
}
}
});
} catch (Exception e) {
connManager.close();
if (keeper != null) {
try {
keeper.close();
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
try {
connManager.waitForConnected(clientConnectTimeout);
} catch (Exception e) {
connManager.close();
try {
keeper.close();
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
numOpens.incrementAndGet();
}
private void init(CurrentCoreDescriptorProvider registerOnReconnect) {当“/live_nodes”存在时,说明集群中有存活节点,通过已经存在与zk中的属性初始化ZkStateReader,主要通过下面两个方法,先看下zkStateReader.createClusterStateWatcherAndUpdate方法:
try {
boolean createdWatchesAndUpdated = false;
// live_node如果存在,已经存在的/clusterState.json创建ClusterState
if (zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, true)) {
// 创建ClusterState
zkStateReader.createClusterStateWatchersAndUpdate();
createdWatchesAndUpdated = true;
publishAndWaitForDownStates();
}
// makes nodes zkNode
// 确认"/live_nodes"存在,不存在新创建
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
// 创建live node的临时节点,"/live_nodes/$nodeName”,如果存在,也会删了重建,是为了节点非正常关闭时遗留文件
createEphemeralLiveNode();
// 确认"/collections”存在,不存在创建
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
ShardHandler shardHandler;
String adminPath;
shardHandler = cc.getShardHandlerFactory().getShardHandler();
// 默认的adminPath: /admin/cores
adminPath = cc.getAdminPath();
// leader选举器
overseerElector = new LeaderElector(zkClient);
this.overseer = new Overseer(shardHandler, adminPath, zkStateReader);
ElectionContext cOntext= new OverseerElectionContext(zkClient, overseer, getNodeName());
overseerElector.setup(context);
overseerElector.joinElection(context, false);
// 如果前边没创建,这里创建ClusterState
if (!createdWatchesAndUpdated) {
zkStateReader.createClusterStateWatchersAndUpdate();
}
} catch (IOException e) {
log.error("", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Can't create ZooKeeperController", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
}
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,之后还有个方法是publishAndWaitForDownStates():
InterruptedException {
// We need to fetch the current cluster state and the set of live nodes
synchronized (getUpdateLock()) {
// 检验或创建"/clusterstate.json"
cmdExecutor.ensureExists(CLUSTER_STATE, zkClient);
// 检验或创建”/aliases.json"
cmdExecutor.ensureExists(ALIASES, zkClient);
log.info("Updating cluster state from ZooKeeper... ");
// 在"/clusterstate.json”上注册一个exists的watcher,触发时,根据zk中的新clusterstate.json文件更新ZkStateReader的clusterState
zkClient.exists(CLUSTER_STATE, new Watcher() {
@Override
public void process(WatchedEvent event) {
// session events are not change events,
// and do not remove the watcher
if (EventType.None.equals(event.getType())) {
return;
}
log.info("A cluster state change: {}, has occurred - updating... (live nodes size: {})", (event) , ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
try {
// delayed approach
// ZkStateReader.this.updateClusterState(false, false);
synchronized (ZkStateReader.this.getUpdateLock()) {
// remake watch
final Watcher thisWatch = this;
Stat stat = new Stat();
// 这里变成了注册getData的watcher
byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat ,
true);
Setln = ZkStateReader.this.clusterState.getLiveNodes();
ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln,ZkStateReader.this);
// update volatile
ZkStateReader.this.clusterState = clusterState;
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
return;
}
}
}, true);
}
// 在"/live_node”上注册getChildren的watcher,触发时,用"/live_node”的children属性更新clusterState
synchronized (ZkStateReader.this.getUpdateLock()) {
ListliveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
new Watcher() {
@Override
public void process(WatchedEvent event) {
// session events are not change events,
// and do not remove the watcher
if (EventType.None.equals(event.getType())) {
return;
}
try {
// delayed approach
// ZkStateReader.this.updateClusterState(false, true);
synchronized (ZkStateReader.this.getUpdateLock()) {
ListliveNodes = zkClient.getChildren(
LIVE_NODES_ZKNODE, this, true);
log.info("Updating live nodes... ({})", liveNodes.size());
SetliveNodesSet = new HashSet ();
liveNodesSet.addAll(liveNodes);
ClusterState clusterState = new ClusterState(
ZkStateReader.this.clusterState.getZkClusterStateVersion(),
liveNodesSet, ZkStateReader.this.clusterState
.getCollectionStates());
ZkStateReader.this.clusterState = clusterState;
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("", e);
throw new ZooKeeperException(
SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
return;
}
}
}, true);
// load一个新的ClusterState
SetliveNodeSet = new HashSet ();
liveNodeSet.addAll(liveNodes);
// 这里依旧使用在zk中存在的clusterState.json中的值创建,不再注册watcher,
ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this);
this.clusterState = clusterState;
// 对“/aliases.json”注册exist的watcher,触发时,根据“/aliases.json”中的新数据更新ZkStateReader中的aliases
zkClient.exists(ALIASES,
new Watcher() {
@Override
public void process(WatchedEvent event) {
// session events are not change events,
// and do not remove the watcher
if (EventType.None.equals(event.getType())) {
return;
}
try {
synchronized (ZkStateReader.this.getUpdateLock()) {
log.info("Updating aliases... ");
// remake watch
final Watcher thisWatch = this;
Stat stat = new Stat();
byte[] data = zkClient.getData(ALIASES, thisWatch, stat ,
true);
Aliases aliases = ClusterState.load(data);
ZkStateReader.this.aliases = aliases;
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
log.error("", e);
throw new ZooKeeperException(
SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
return;
}
}
}, true);
}
updateAliases();
}
public void publishAndWaitForDownStates() throws KeeperException,在接下来是solr的选举过程,来为观察节点投票选出leader,solr投票选举的逻辑下次再分析。
InterruptedException {
ClusterState clusterState = zkStateReader.getClusterState();
// 返回不可变keySet()
SetcollectiOns= clusterState.getCollections();
ListupdatedNodes = new ArrayList ();
for (String collectionName : collections) {
DocCollection collection = clusterState.getCollection(collectionName);
Collectionslices = collection.getSlices();
for (Slice slice : slices) {
Collectionreplicas = slice.getReplicas();
for (Replica replica : replicas) {
if (replica.getNodeName().equals(getNodeName())
&& !(replica.getStr(ZkStateReader.STATE_PROP)
.equals(ZkStateReader.DOWN))) {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
ZkStateReader.BASE_URL_PROP, getBaseUrl(),
ZkStateReader.CORE_NAME_PROP,
replica.getStr(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.ROLES_PROP,
replica.getStr(ZkStateReader.ROLES_PROP),
ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.SHARD_ID_PROP,
replica.getStr(ZkStateReader.SHARD_ID_PROP),
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.CORE_NODE_NAME_PROP, replica.getName());
updatedNodes.add(replica.getStr(ZkStateReader.CORE_NAME_PROP));
overseerJobQueue.offer(ZkStateReader.toJSON(m));
}
}
}
}
// now wait till the updates are in our state
long now = System.nanoTime();
long timeout = now + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
boolean foundStates = false;
while (System.nanoTime()clusterState = zkStateReader.getClusterState();
collectiOns= clusterState.getCollections();
for (String collectionName : collections) {
DocCollection collection = clusterState.getCollection(collectionName);
Collectionslices = collection.getSlices();
for (Slice slice : slices) {
Collectionreplicas = slice.getReplicas();
for (Replica replica : replicas) {
if (replica.getStr(ZkStateReader.STATE_PROP).equals(
ZkStateReader.DOWN)) {
updatedNodes.remove(replica.getStr(ZkStateReader.CORE_NAME_PROP));
}
}
}
}
if (updatedNodes.size() == 0) {
foundStates = true;
Thread.sleep(1000);
break;
}
Thread.sleep(1000);
}
if (!foundStates) {
log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
}
}