作者:Sunshine5585 | 来源:互联网 | 2023-08-31 18:56
zookeeper有本身的回调和监听机制,使用redis作分布式锁,如果分布式机器多的话,同步会造成一定的时间差,zookeeper的回调和监听,由zk主动进行业务代码调用,数据间
zookeeper有本身的回调和监听机制,使用redis作分布式锁,如果分布式机器多的话,同步会造成一定的时间差,zookeeper的回调和监听,由zk主动进行业务代码调用,数据间同步迅速快捷。且结合临时节点,可以在client网络断开的时候,清除节点,避免死锁,临时节点也可以设置存活的时长。
使用zookeeper的临时、序列节点
原理:
在同一个目录下,由最早的节点获取锁,如果监听整个目录,每次释放锁都,排查文件号最小的,占用的资源会比较多。
所以这里采用的方案是,每个节点给前面的节点添加监听,当前面的节点释放锁被删除之后,则,通知后续节点。即,拿到锁的是最小的节点,当它释放锁,删除节点,会根据删除事件通知后续的一个节点。
注:如果是中间节点被手动删除了,也不影响。
以下撰写的分布式锁,满足可重入的要求:
//maven lombok此处不写
org.apache.zookeeper
zookeeper
3.6.0
junit
junit
4.13.1
//zookeeper连接工具类
public class ZkUUtiles {
private static ZooKeeper zk;
private static CountDownLatch cc = new CountDownLatch(1);
public static ZooKeeper getZk(){
LDefaultWatcher defaultWatcher = new LDefaultWatcher();
defaultWatcher.setCc(cc);
try {
zk = new ZooKeeper("127.0.0.1:2181/testLock2", 3000, defaultWatcher);
cc.await();
} catch (Exception e) {
e.printStackTrace();
}
return zk;
}
}
//默认监听
@Data
public class LDefaultWatcher implements Watcher {
private CountDownLatch cc;
@Override
public void process(WatchedEvent event) {
switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
cc.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
}
}
//分布式锁,回调机制阻塞
@Data
public class LDLockWatchBack implements AsyncCallback.StringCallback, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback , Watcher {
private ZooKeeper zk;
private String threadName;
private String nodeName;
private CountDownLatch cc = new CountDownLatch(1);
private Integer counInTimes = 1;
public void tryLock(){
try {
//重入锁
if (isTwiceIn()) return;
zk.create("/lock", threadName.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this,"");
cc.await();
} catch (Exception e) {
e.printStackTrace();
}
}
private boolean isTwiceIn() {
byte[] data = new byte[0];
try {
data = zk.getData("/", false, new Stat());
} catch (Exception e) {
e.printStackTrace();
}
if(threadName.equals(new String(data))){
counInTimes++;
return true;
}
return false;
}
public void unLock(){
try {
if(--counInTimes==0){
zk.delete(nodeName,-1);
zk.setData("/","no_thread_lock".getBytes(),-1);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
//StringCallback
public void processResult(int rc, String path, Object ctx, String name) {
if(name !=null){
System.out.println(threadName+" create "+name);
nodeName = name;
zk.getChildren("/",false, this,"dfs");
}
}
@Override
//childCallback
public void processResult(int rc, String path, Object ctx, List children) {
//判断当前节点是不是第一个
//如果是,则拿到锁,可以继续运行下去
//如果不是,则给前面节点注册监听事件
//先排序
Collections.sort(children);
int i = children.indexOf(nodeName.substring(1));
if(i ==0 ){
try {
zk.setData("/",threadName.getBytes(),-1);
Thread.sleep(10);
//业务代码,肯定至少运行10ms,此处如果不sleep,会造成,第一个节点很快已经走完了,
// 而后续节点,已经拿到了children,还没有给前节点注册上监听,但已经能判断自己不是第一个
//这种情况,第一个节点挂了,但没有监听,所以不会进行后续节点的通知。即,第一条线程执行完后,不再走后续线程
} catch (Exception e) {
e.printStackTrace();
}
cc.countDown();
}else{
zk.exists("/"+children.get(i-1),this, this,"");
}
}
@Override
//StatCallback
public void processResult(int rc, String path, Object ctx, Stat stat) {
//待实现
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
zk.getChildren("/",false, this,"dfs");
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
}//测试代码
public class PDLockTest {
private ZooKeeper zk;
@Before
public void getZK(){
zk = ZkUUtiles.getZk();
}
@After
public void closeZ(){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void dLockTest(){
for (int i = 0; i <10; i++) {
new Thread(()->{
LDLockWatchBack ldLockWatchBack = new LDLockWatchBack();
ldLockWatchBack.setZk(zk);
ldLockWatchBack.setThreadName(Thread.currentThread().getName());
//抢锁
ldLockWatchBack.tryLock();
ldLockWatchBack.tryLock();
ldLockWatchBack.tryLock();
ldLockWatchBack.tryLock();
ldLockWatchBack.tryLock();
ldLockWatchBack.tryLock();
//工作
System.out.println(Thread.currentThread().getName()+":工作中。。。");
// try {
// TimeUnit.SECONDS.sleep(1);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//释放锁
ldLockWatchBack.unLock();
ldLockWatchBack.unLock();
ldLockWatchBack.unLock();
ldLockWatchBack.unLock();
ldLockWatchBack.unLock();
ldLockWatchBack.unLock();
},"thread-"+i).start();
}
while (true){
}
}
}