节点/ping/ipNode 中的数据就是保存一个IP,多实例部署PingZookeeperDemo ,通过ChangeNodeData,CreateNodeData,DeleteNodeData 来改变node 中的数据值
完整代码如下:
1. PingZookeeperDemo.java
package cn.east.hadoop.zookeeper.ping;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* ping 一个zookeeper node节点中的数据ip
*
* @author East271536394
* @version 2013-6-20 下午2:09:45
*/
public class PingZookeeperDemo implements Watcher {
protected static final int SESSION_TIMEOUT = 10000;
protected static final String CONNECTION_STRING = "202.173.9.57:4181";
protected static String ROOT_PATH = "/ping";
private static String DEFAULT_IP = "202.173.10.58";
private static boolean init = false;
protected ZooKeeper zookeeper;
protected String probeIp = DEFAULT_IP;
protected String ipNode = ROOT_PATH + "/ipNode";
protected CountDownLatch cOnnectedSemaphore= new CountDownLatch(1);
/**
* 创建ZK连接
* @param connectString ZK服务器地址列表
* @param sessionTimeout Session超时时间
*/
public void createConnection(String connectString, int sessionTimeout) {
try {
zookeeper = new ZooKeeper(connectString, sessionTimeout, this);
init = true;
connectedSemaphore.await();
init = false;
createRoot();
} catch (InterruptedException e) {
System.out.println("连接创建失败,发生 InterruptedException");
e.printStackTrace();
} catch (IOException e) {
System.out.println("连接创建失败,发生 IOException");
e.printStackTrace();
}
}
/**
*
* 创建root 节点
*/
private void createRoot() {
try {
Stat s = zookeeper.exists(ROOT_PATH, false);
if (s == null) {
zookeeper.create(ROOT_PATH, "rootPath".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println("创建createRoot,已存在");
}
} catch (Exception se) {
System.out.println("创建createRoot,发生 Exception");
se.printStackTrace();
}
}
/**
*
* 从zookeeper节点中取出Ip
* @return
* @throws Exception
*/
private void initIp() throws Exception {
Stat s = zookeeper.exists(ipNode, true);
if (s == null) {
zookeeper.create(ipNode, probeIp.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println("创建ipNode,已存在");
probeIp = new String(zookeeper.getData(ipNode, true, null));
}
System.out.println("从zookeeper 中 " + ipNode + " 取值为: " + probeIp);
}
/**
* 进行探测IP
* 方法说明。
* @param ip
*/
private void probe() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
String date = format.format(new Date());
System.out.println(date + ">>>>>>>>>>>>>正在探测 IP" + probeIp);
}
@Override
public void process(WatchedEvent event) {
System.out.println("收到事件通知:" + event.getState() + ",type :" + event.getType() + ",path :" + event.getPath()
+ "\n");
if (KeeperState.SyncCOnnected== event.getState()) {
if (init) {
System.out.println("connectedSemaphore.countDown()>>>>>>>>>>>>>>>>");
connectedSemaphore.countDown();
}
//模拟处理业务逻辑/ping/ipNode如果数据发生变化
//1. 更新节点
if (ipNode.equals(event.getPath()) && Event.EventType.NodeDataChanged == event.getType()) {
System.out.println(ipNode + "节点发生了数据变化>>>");
try {
changeIp();
} catch (Exception se) {
se.printStackTrace();
}
}
//2.删除节点
if (ipNode.equals(event.getPath()) && Event.EventType.NodeDeleted == event.getType()) {
System.out.println(ipNode + "节点已被删除>>>");
try {
resetIp();
} catch (Exception se) {
se.printStackTrace();
}
}
//3.创建节点
if (ipNode.equals(event.getPath()) && Event.EventType.NodeCreated == event.getType()) {
System.out.println(ipNode + "节点已被创建>>>");
try {
createIp();
} catch (Exception se) {
se.printStackTrace();
}
}
}
}
/**
*
* 取新的IP
* @throws Exception
*/
private void createIp() throws Exception {
probeIp = new String(zookeeper.getData(ipNode, false, null));
System.out.println("createIp>>>>>>>>>probeIp:" + probeIp);
//在创建后,对节点进行监控
zookeeper.exists(ipNode, true);
}
private void resetIp() throws Exception {
probeIp = "0.0.0.0";
System.out.println("resetIp>>>>>>>>>probeIp:" + probeIp);
//在删除后,对节点进行监控
zookeeper.exists(ipNode, true);
}
/**
*
* 取新的IP
* @throws Exception
*/
private void changeIp() throws Exception {
probeIp = new String(zookeeper.getData(ipNode, true, null));
}
public static void main(String[] args) throws Exception {
PingZookeeperDemo demo = new PingZookeeperDemo();
demo.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
demo.initIp();
while (true) {
Thread.sleep(5000);
demo.probe();
}
}
}
2. ChangeNodeData.java
package cn.east.hadoop.zookeeper.ping;
/**
* 类说明。
*
* @author East271536394
* @version 2013-6-20 下午2:55:53
*/
public class ChangeNodeData extends PingZookeeperDemo {
public void changeNodeData() {
try {
zookeeper.setData(ipNode, "202.173.10.88_change".getBytes(), -1);
String ip = new String(zookeeper.getData(ipNode, false, null));
System.out.println("新IP数据为:" + ip);
} catch (Exception se) {
System.out.println("设置" + ipNode + "节点数据异常");
se.printStackTrace();
}
}
public static void main(String[] args) {
ChangeNodeData change = new ChangeNodeData();
change.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
change.changeNodeData();
}
}
3. CreateNodeData.java
package cn.east.hadoop.zookeeper.ping;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
/**
* 类说明。
*
* @author East271536394
* @version 2013-6-20 下午2:55:53
*/
public class CreateNodeData extends PingZookeeperDemo {
public void createNodeData() {
try {
zookeeper.create(ipNode, "202.173.10.89_newCreate".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("成功创建" + ipNode);
String ip = new String(zookeeper.getData(ipNode, false, null));
System.out.println("新IP数据为:" + ip);
} catch (Exception se) {
System.out.println("创建" + ipNode + "节点数据异常");
se.printStackTrace();
}
}
public static void main(String[] args) {
CreateNodeData create = new CreateNodeData();
create.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
create.createNodeData();
}
}
4. DeleteNodeData.java
package cn.east.hadoop.zookeeper.ping;
/**
* 类说明。
*
* @author East271536394
* @version 2013-6-20 下午2:55:53
*/
public class DeleteNodeData extends PingZookeeperDemo {
public void deleteNodeData() {
try {
zookeeper.delete(ipNode, -1);
System.out.println("删除成功"+ipNode);
} catch (Exception se) {
System.out.println("删除" + ipNode + "节点数据异常");
se.printStackTrace();
}
}
public static void main(String[] args) {
DeleteNodeData delete = new DeleteNodeData();
delete.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
delete.deleteNodeData();
}
}
结果如下:
收到事件通知:SyncConnected,type :None,path :null
connectedSemaphore.countDown()>>>>>>>>>>>>>>>>
创建createRoot,已存在
收到事件通知:SyncConnected,type :NodeCreated,path :/ping/ipNode
/ping/ipNode节点已被创建>>>
从zookeeper 中 /ping/ipNode 取值为: 202.173.10.58
createIp>>>>>>>>>probeIp:202.173.10.58
2013-06-20 05:14:02>>>>>>>>>>>>>正在探测 IP202.173.10.58
2013-06-20 05:14:07>>>>>>>>>>>>>正在探测 IP202.173.10.58
2013-06-20 05:14:12>>>>>>>>>>>>>正在探测 IP202.173.10.58
收到事件通知:SyncConnected,type :NodeDataChanged,path :/ping/ipNode
/ping/ipNode节点发生了数据变化>>>
2013-06-20 05:14:17>>>>>>>>>>>>>正在探测 IP202.173.10.88_change
2013-06-20 05:14:22>>>>>>>>>>>>>正在探测 IP202.173.10.88_change
2013-06-20 05:14:27>>>>>>>>>>>>>正在探测 IP202.173.10.88_change
收到事件通知:SyncConnected,type :NodeDeleted,path :/ping/ipNode
/ping/ipNode节点已被删除>>>
resetIp>>>>>>>>>probeIp:0.0.0.0
2013-06-20 05:14:32>>>>>>>>>>>>>正在探测 IP0.0.0.0
2013-06-20 05:14:37>>>>>>>>>>>>>正在探测 IP0.0.0.0
收到事件通知:SyncConnected,type :NodeCreated,path :/ping/ipNode
/ping/ipNode节点已被创建>>>
createIp>>>>>>>>>probeIp:202.173.10.89_newCreate
2013-06-20 05:14:42>>>>>>>>>>>>>正在探测 IP202.173.10.89_newCreate
2013-06-20 05:14:47>>>>>>>>>>>>>正在探测 IP202.173.10.89_newCreate
2013-06-20 05:14:52>>>>>>>>>>>>>正在探测 IP202.173.10.89_newCreate
2013-06-20 05:14:57>>>>>>>>>>>>>正在探测 IP202.173.10.89_newCreate