热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:读《分布式一致性原理》JAVA客户端API操作2

篇首语:本文由编程笔记#小编为大家整理,主要介绍了读《分布式一致性原理》JAVA客户端API操作2相关的知识,希望对你有一定的参考价值。创

篇首语:本文由编程笔记#小编为大家整理,主要介绍了读《分布式一致性原理》JAVA客户端API操作2相关的知识,希望对你有一定的参考价值。




创建节点

通过客户端API来创建一个数据节点,有一下两个接口:


public String create(final String path, byte data[], List acl,
CreateMode createMode)
public void create(final String path, byte data[], List acl,
CreateMode createMode, StringCallback cb, Object ctx)

这两个接口分别是同步和异步的方式创建节点

需要注意的是无论是同步还是异步创建节点,zookeeper都不支持递归创建,即在不存在父节点的情况下创建一个子节点

。另外如果一个节点已经存在了,那么再创建同名节点时会抛出异常:NodeExistException

目前,节点的内容只支持byte[]数组类型,也就是说zookeeper不负责对象序列化,需要开发者自己讲内容进行序列化与反序列化。

对已字符串直接调用getByte就行。对于其他复杂对象,可以使用序列化工具来进行。

 

关于权限控制,如果你的应用场景中没有复杂的权限要求,那么直接调用I Ids.OPEN_ACL_UNSAFE,这表明之后对这个节点的任何操作不受权限控制。

使用API创建一个节点:


package znode;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import session.CreateZookeeper;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
public class CreateZnode implements Watcher{
public static CountDownLatch cOnnectedSemaphore= new CountDownLatch(1);

@Override
public void process(WatchedEvent event) {
System.out.println(
"receive watched event:"+event);
if (KeeperState.SyncCOnnected==event.getState()) {
connectedSemaphore.countDown();
}

}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zooKeeper
= new ZooKeeper("192.168.64.60", 5000, new CreateZookeeper());
connectedSemaphore.await();

String path1
= zooKeeper.create("/zk-test-ephemera-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(
"success create znode:"+path1);

String path2
= zooKeeper.create("/zk-test-ephemera-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(
"success create znode:"+path2);
}
}

上面两个片段使用同步方式创建节点:可以看出创建临时节点返回值就是传入的路劲

使用临时顺序节点返回值会自动加上一个数字


使用异步API创建节点


package znode;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import session.CreateZookeeper;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
public class CreateZnode2 implements Watcher{
public static CountDownLatch cOnnectedSemaphore= new CountDownLatch(1);

@Override
public void process(WatchedEvent event) {
System.out.println(
"receive watched event:"+event);
if (KeeperState.SyncCOnnected==event.getState()) {
connectedSemaphore.countDown();
}

}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zooKeeper
= new ZooKeeper("192.168.64.60", 5000, new CreateZookeeper());
connectedSemaphore.await();

zooKeeper.create(
"/zk-test-ephemera-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL
,
new IStringCallback(),"I am context");
zooKeeper.create(
"/zk-test-ephemera-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL
,
new IStringCallback(),"I am context");
zooKeeper.create(
"/zk-test-ephemera-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL
,
new IStringCallback(),"I am context");
}
}
class IStringCallback implements AsyncCallback.StringCallback{
@Override
public void processResult(int rc, String path, Object ctx, String name) {
// TODO Auto-generated method stub
System.out.println("create path result: ["+rc+","+path+","+ctx+","+"real path name:"+name+"]");
}

}

 

和同步接口最大的区别在于,节点在创建的过程(包含网络通信和服务端的创建过程),是异步的。而且我们需要注意的是

同步创建过程时我们需要关注接口抛出的异常,而在异步接口中,是不会抛出异常的,所有的异常都会在回调函数中通过Result Code来体现。


删除节点


public void delete(final String path, int version)
public void delete(final String path, int version, VoidCallback cb,
Object ctx)

 

这里列出的两个API是同步和异步的删除接口,API方法的参数说明如表5-5所示。

删除节点和更新节点的操作非常相似,在zookeeper中只允许删除叶子节点。也就是说,如果一个节点存在子节点的话

那么这个节点将无法直接删除,必须先删除其所有子节点。


 

读取数据

读取数据,包含子节点列表的获取和节点数据的获取。

1.getChildren

 


 

首先我们先看看注册watcher。如果zookeeper客户端获取到指定节点的子节点列表后,还需要订阅这个子节点列表的变化通知,

那么就可以通过注册一个Watcher来实现。当有子节点添加或删除时,服务端就会向客户端发送一个NodeChildrenChange的事件。

需要注意的是服务端向客户端发送事件通知时是不包含最新的节点列表的。是需要客户端主动重新获取的。

 

Stat,stat记录一个节点的基本属性信息。创建时的事务ID(cZxid),最后一次修改的事务ID(mZxid)和节点数据内容的长度

dataLength,我们可以将一个旧的stat变量传入,该stat会在执行过程中,被来自服务端响应的心的stat的替换掉。


package getchildren;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
public class GetChildren1 implements Watcher {

public static CountDownLatch cOnnectedSemaphore= new CountDownLatch(1);

private static ZooKeeper zk = null;
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncCOnnected==event.getState()) {
if (EventType.None.getIntValue()==event.getState().getIntValue()&&null==event.getPath()) {
connectedSemaphore.countDown();
}
else if (event.getType()==EventType.NodeChildrenChanged) {
try {
System.out.println(
"ReGetChild:"+zk.getChildren(event.getPath(), true));
}
catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String path
= "/zk-book";
zk
= new ZooKeeper("192.168.64.60:2181", 5000, new GetChildren1());
connectedSemaphore.await();

zk.create(path,
"".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path
+"/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

List
children = zk.getChildren(path, true);
System.out.println(children);

zk.create(path
+"/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Thread.sleep(Integer.MAX_VALUE);;
}
}

 

 

 

 

使用异步API获取子节点列表

 


package getchildren;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
public class GetChildren2 implements Watcher {

public static CountDownLatch cOnnectedSemaphore= new CountDownLatch(1);

private static ZooKeeper zk = null;
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncCOnnected==event.getState()) {
if (EventType.None.getIntValue()==event.getState().getIntValue()&&null==event.getPath()) {
connectedSemaphore.countDown();
}
else if (event.getType()==EventType.NodeChildrenChanged) {
try {
System.out.println(
"ReGetChild:"+zk.getChildren(event.getPath(), true));
}
catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String path
= "/zk-book";
zk
= new ZooKeeper("192.168.64.60:2181", 5000, new GetChildren2());
connectedSemaphore.await();

zk.create(path,
"".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path
+"/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

zk.getChildren(path,
true, new IChildren2Callback(),"i am context");

zk.create(path
+"/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Thread.sleep(Integer.MAX_VALUE);;
}
}
class IChildren2Callback implements AsyncCallback.Children2Callback{
@Override
public void processResult(int rc, String path, Object ctx, List children, Stat stat) {
// TODO Auto-generated method stub
System.out.println("Get Children znode result: "+rc+","+path+","+ctx+","+children+","+stat);
}

}

 

 


 getData

getData接口和上下文中的getChildren接口的用法相同,Watcher注册后,一旦节点的内容状态发生改变,zookeeper服务端会

向客户端发送一个NodeDataChanged的事件。API返回的结果类型时byte[].

 

使用同步AIP获取数据节点内容


package getdata;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import getchildren.GetChildren1;
public class GetData1 implements Watcher {
public static CountDownLatch cOnnectedSemaphore= new CountDownLatch(1);
private static ZooKeeper zk = null;
private static Stat stat = new Stat();

@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncCOnnected==event.getState()) {
if (EventType.None.getIntValue()==event.getState().getIntValue()&&null==event.getPath()) {
connectedSemaphore.countDown();
}
else if (event.getType()==EventType.NodeDataChanged) {
try {
byte[] data = zk.getData(event.getPath(), true, stat);
System.out.println(
new String(data));
}
catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String path
= "/zk-book";
zk
= new ZooKeeper("192.168.64.60:2181", 5000, new GetData1());
connectedSemaphore.await();

zk.create(path,
"123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(zk.getData(path,
true, stat));

zk.setData(path,
"456".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);;
}
}

 

数据内容或是数据版本发生变化,都胡出发服务端的NodeDataChanged通知。

 

异步API获取


package getdata;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
public class GetData2 implements Watcher {
public static CountDownLatch cOnnectedSemaphore= new CountDownLatch(1);
private static ZooKeeper zk = null;
private static Stat stat = new Stat();

@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncCOnnected==event.getState()) {
if (EventType.None.getIntValue()==event.getState().getIntValue()&&null==event.getPath()) {
connectedSemaphore.countDown();
}
else if (event.getType()==EventType.NodeDataChanged) {
zk.getData(event.getPath(),
true, new IDataback(),null);
}

}

}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String path
= "/zk-book";
zk
= new ZooKeeper("192.168.64.60:2181", 5000, new GetData2());
connectedSemaphore.await();

zk.create(path,
"123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL );
zk.getData(path,
true,new IDataback(),null);

zk.setData(path,
"456".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);;
}
}
class IDataback implements AsyncCallback.DataCallback{
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println(
new String(data));
System.out.println(stat.getCzxid());
System.out.println(stat.getMzxid());
System.out.println(stat.getVersion());

}

}

 



推荐阅读
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 本文详细介绍了Java中org.eclipse.ui.forms.widgets.ExpandableComposite类的addExpansionListener()方法,并提供了多个实际代码示例,帮助开发者更好地理解和使用该方法。这些示例来源于多个知名开源项目,具有很高的参考价值。 ... [详细]
  • 深入解析Spring Cloud Ribbon负载均衡机制
    本文详细介绍了Spring Cloud中的Ribbon组件如何实现服务调用的负载均衡。通过分析其工作原理、源码结构及配置方式,帮助读者理解Ribbon在分布式系统中的重要作用。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • 本文详细解析了Python中的os和sys模块,介绍了它们的功能、常用方法及其在实际编程中的应用。 ... [详细]
  • 2023年京东Android面试真题解析与经验分享
    本文由一位拥有6年Android开发经验的工程师撰写,详细解析了京东面试中常见的技术问题。涵盖引用传递、Handler机制、ListView优化、多线程控制及ANR处理等核心知识点。 ... [详细]
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • 技术分享:从动态网站提取站点密钥的解决方案
    本文探讨了如何从动态网站中提取站点密钥,特别是针对验证码(reCAPTCHA)的处理方法。通过结合Selenium和requests库,提供了详细的代码示例和优化建议。 ... [详细]
  • Java 类成员初始化顺序与数组创建
    本文探讨了Java中类成员的初始化顺序、静态引入、可变参数以及finalize方法的应用。通过具体的代码示例,详细解释了这些概念及其在实际编程中的使用。 ... [详细]
  • 1.如何在运行状态查看源代码?查看函数的源代码,我们通常会使用IDE来完成。比如在PyCharm中,你可以Ctrl+鼠标点击进入函数的源代码。那如果没有IDE呢?当我们想使用一个函 ... [详细]
  • 本文介绍了如何在C#中启动一个应用程序,并通过枚举窗口来获取其主窗口句柄。当使用Process类启动程序时,我们通常只能获得进程的句柄,而主窗口句柄可能为0。因此,我们需要使用API函数和回调机制来准确获取主窗口句柄。 ... [详细]
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
author-avatar
WLII庾斌_787
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有