热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于Zookeeper监听和回调分布式锁

zookeeper有本身的回调和监听机制,使用redis作分布式锁,如果分布式机器多的话,同步会造成一定的时间差,zookeeper的回调和监听,由zk主动进行业务代码调用,数据间

zookeeper有本身的回调和监听机制,使用redis作分布式锁,如果分布式机器多的话,同步会造成一定的时间差,zookeeper的回调和监听,由zk主动进行业务代码调用,数据间同步迅速快捷。且结合临时节点,可以在client网络断开的时候,清除节点,避免死锁,临时节点也可以设置存活的时长。

使用zookeeper的临时、序列节点

原理:

在同一个目录下,由最早的节点获取锁,如果监听整个目录,每次释放锁都,排查文件号最小的,占用的资源会比较多。

所以这里采用的方案是,每个节点给前面的节点添加监听,当前面的节点释放锁被删除之后,则,通知后续节点。即,拿到锁的是最小的节点,当它释放锁,删除节点,会根据删除事件通知后续的一个节点。

注:如果是中间节点被手动删除了,也不影响。

 

以下撰写的分布式锁,满足可重入的要求:

//maven lombok此处不写

org.apache.zookeeper
zookeeper
3.6.0


junit
junit
4.13.1

 

//zookeeper连接工具类
public class ZkUUtiles {
private static ZooKeeper zk;
private static CountDownLatch cc = new CountDownLatch(1);
public static ZooKeeper getZk(){
LDefaultWatcher defaultWatcher = new LDefaultWatcher();
defaultWatcher.setCc(cc);
try {
zk = new ZooKeeper("127.0.0.1:2181/testLock2", 3000, defaultWatcher);
cc.await();
} catch (Exception e) {
e.printStackTrace();
}
return zk;
}
}

//默认监听
@Data
public class LDefaultWatcher implements Watcher {
private CountDownLatch cc;
@Override
public void process(WatchedEvent event) {
switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
cc.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
}
}

//分布式锁,回调机制阻塞
@Data
public class LDLockWatchBack implements AsyncCallback.StringCallback, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback , Watcher {
private ZooKeeper zk;
private String threadName;
private String nodeName;
private CountDownLatch cc = new CountDownLatch(1);
private Integer counInTimes = 1;
public void tryLock(){
try {
//重入锁
if (isTwiceIn()) return;
zk.create("/lock", threadName.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this,"");
cc.await();
} catch (Exception e) {
e.printStackTrace();
}
}
private boolean isTwiceIn() {
byte[] data = new byte[0];
try {
data = zk.getData("/", false, new Stat());
} catch (Exception e) {
e.printStackTrace();
}
if(threadName.equals(new String(data))){
counInTimes++;
return true;
}
return false;
}
public void unLock(){
try {
if(--counInTimes==0){
zk.delete(nodeName,-1);
zk.setData("/","no_thread_lock".getBytes(),-1);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
//StringCallback
public void processResult(int rc, String path, Object ctx, String name) {
if(name !=null){
System.out.println(threadName+" create "+name);
nodeName = name;
zk.getChildren("/",false, this,"dfs");
}
}
@Override
//childCallback
public void processResult(int rc, String path, Object ctx, List children) {
//判断当前节点是不是第一个
//如果是,则拿到锁,可以继续运行下去
//如果不是,则给前面节点注册监听事件
//先排序
Collections.sort(children);
int i = children.indexOf(nodeName.substring(1));
if(i ==0 ){
try {
zk.setData("/",threadName.getBytes(),-1);
Thread.sleep(10);
//业务代码,肯定至少运行10ms,此处如果不sleep,会造成,第一个节点很快已经走完了,
// 而后续节点,已经拿到了children,还没有给前节点注册上监听,但已经能判断自己不是第一个
//这种情况,第一个节点挂了,但没有监听,所以不会进行后续节点的通知。即,第一条线程执行完后,不再走后续线程
} catch (Exception e) {
e.printStackTrace();
}
cc.countDown();
}else{
zk.exists("/"+children.get(i-1),this, this,"");
}
}
@Override
//StatCallback
public void processResult(int rc, String path, Object ctx, Stat stat) {
//待实现
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
zk.getChildren("/",false, this,"dfs");
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
}

//测试代码
public class PDLockTest {
private ZooKeeper zk;
@Before
public void getZK(){
zk = ZkUUtiles.getZk();
}
@After
public void closeZ(){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void dLockTest(){
for (int i = 0; i <10; i++) {
new Thread(()->{
LDLockWatchBack ldLockWatchBack = new LDLockWatchBack();
ldLockWatchBack.setZk(zk);
ldLockWatchBack.setThreadName(Thread.currentThread().getName());
//抢锁
ldLockWatchBack.tryLock();
ldLockWatchBack.tryLock();
ldLockWatchBack.tryLock();
ldLockWatchBack.tryLock();
ldLockWatchBack.tryLock();
ldLockWatchBack.tryLock();
//工作
System.out.println(Thread.currentThread().getName()+":工作中。。。");
// try {
// TimeUnit.SECONDS.sleep(1);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//释放锁
ldLockWatchBack.unLock();
ldLockWatchBack.unLock();
ldLockWatchBack.unLock();
ldLockWatchBack.unLock();
ldLockWatchBack.unLock();
ldLockWatchBack.unLock();
},"thread-"+i).start();
}
while (true){
}
}
}

 



推荐阅读
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • Python正则表达式学习记录及常用方法
    本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
  • 标题: ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • 本文介绍了Oracle存储过程的基本语法和写法示例,同时还介绍了已命名的系统异常的产生原因。 ... [详细]
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
  • SpringBoot整合SpringSecurity+JWT实现单点登录
    SpringBoot整合SpringSecurity+JWT实现单点登录,Go语言社区,Golang程序员人脉社 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 使用圣杯布局模式实现网站首页的内容布局
    本文介绍了使用圣杯布局模式实现网站首页的内容布局的方法,包括HTML部分代码和实例。同时还提供了公司新闻、最新产品、关于我们、联系我们等页面的布局示例。商品展示区包括了车里子和农家生态土鸡蛋等产品的价格信息。 ... [详细]
  • 十大经典排序算法动图演示+Python实现
    本文介绍了十大经典排序算法的原理、演示和Python实现。排序算法分为内部排序和外部排序,常见的内部排序算法有插入排序、希尔排序、选择排序、冒泡排序、归并排序、快速排序、堆排序、基数排序等。文章还解释了时间复杂度和稳定性的概念,并提供了相关的名词解释。 ... [详细]
  • 本文介绍了关系型数据库和NoSQL数据库的概念和特点,列举了主流的关系型数据库和NoSQL数据库,同时描述了它们在新闻、电商抢购信息和微博热点信息等场景中的应用。此外,还提供了MySQL配置文件的相关内容。 ... [详细]
  • 本文介绍了一个视频转换软件MyVideoConverter,该软件支持将mpg转换成swf格式,支持多种格式的转换,转换速度快,还能转换成3GP格式,同时具有音频分离提取功能。欢迎使用MyVideoConverter进行视频转换和音频提取。 ... [详细]
  • 在C#中,使用关键字abstract来定义抽象类和抽象方法。抽象类是一种不能被实例化的类,它只提供部分实现,但可以被其他类继承并创建实例。抽象类可以用于类、方法、属性、索引器和事件。在一个类声明中使用abstract表示该类倾向于作为其他类的基类成员被标识为抽象,或者被包含在一个抽象类中,必须由其派生类实现。本文介绍了C#中抽象类和抽象方法的基础知识,并提供了一个示例代码。 ... [详细]
author-avatar
Sunshine5585
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有