热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

zookeeper的客户端操作以及集群介绍

上一篇文章介绍了zookeeper(以下简称zk)的基本概念以及用法,通过zk自带的客户

上一篇文章介绍了zookeeper(以下简称zk)的基本概念以及用法,通过zk自带的客户端进行了简单的操作,但平时很少直接使用zk自带的客户端,因为实在是不直观,今天介绍下实际开发时候是怎么使用zk的,然后再说下zk的集群。
  • 一、ZooInspector

ZooInspector是一个图形化客户端,通过它可以很直观的看到zk里的数据。
下载链接:
https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
用法也很简单,解压后进入到build目录打开cmd,然后运行:
java -jar zookeeper-dev-ZooInspector.jar

创建一个节点试试,点击Add Node:

新增按钮后面那个垃圾桶状的就是删除,同理,鼠标定位的哪个节点就是删除的哪个节点,删除就不演示了,很简单。
  • 二、通过java客户端操作zk

  • 2.1、基于zk的原生客户端进行操作

添加依赖:


    org.apache.zookeeper
    zookeeper
    3.5.5

    java代码:

      package com.ayo.zk;


      import org.apache.zookeeper.KeeperException;
      import org.apache.zookeeper.WatchedEvent;
      import org.apache.zookeeper.Watcher;
      import org.apache.zookeeper.ZooKeeper;
      import org.junit.Before;
      import org.junit.Test;


      import java.io.IOException;
      import java.util.List;


      public class ZkTest {


      ZooKeeper zooKeeper = null;


      **
      * 初始化方法,构造zk客户端
      */
      @Before
      public void init() throws IOException {
      zk的地址,多个用,隔开
      String cOnnectString= "192.168.209.129:2181";
      **
      * 会话超时时间,单位毫秒
      */
      int sessiOnTimeout= 60000;
      全局监听,缺点,如果多个节点同时发生变化,就不太好处理了
      zooKeeper = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> System.out.println("全局监听: " + watchedEvent.getPath()));
      }


      **
      * 运行前看下linux中的防火墙是否开启,开启的话需要关闭,或者在防火墙中打开2181端口也可以
      *
      * 测试不添加监听
      * @throws KeeperException
      * @throws InterruptedException
      */
      @Test
      public void testGet() throws KeeperException, InterruptedException {
      **
      * watch:是否监听,true:是,false:否
      * 注意,从zk中取出来的数据都是字节数组,需要自己转换为想要的格式
      */
      byte[] data = zooKeeper.getData("/ayo", false, null);
      String s = new String(data);
      System.out.println(s);
      }


      **
      * 测试添加监听
      * @throws KeeperException
      * @throws InterruptedException
      */
      @Test
      public void testGet2() throws KeeperException, InterruptedException {
      **
      * watch:是否监听,true:是,false:否,这里触发的是全局监听
      * 注意,从zk中取出来的数据都是字节数组,需要自己转换为想要的格式
      */
      byte[] data = zooKeeper.getData("/ayo", true, null);
      String s = new String(data);
      System.out.println(s);
      休眠一分钟,为了观察监听器打印
      Thread.sleep(60000);
      }


      **
      * 测试添加自定义的监听
      * @throws KeeperException
      * @throws InterruptedException
      */
      @Test
      public void testGet3() throws KeeperException, InterruptedException {
      **
      * watch:直接new Watcher()
      */
      byte[] data = zooKeeper.getData("/ayo", new Watcher() {
      public void process(WatchedEvent watchedEvent) {
      try {
      因为监听一次就失效,所以需要重新添加监听,类似递归
      zooKeeper.getData(watchedEvent.getPath(), this, null);
      } catch (KeeperException e) {
      e.printStackTrace();
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      System.out.println("自定义的监听: " + watchedEvent.getPath());
      }
      }, null);
      String s = new String(data);
      System.out.println(s);
      休眠一分钟,为了观察监听器打印
      Thread.sleep(60000);
      }


      **
      * 获取子节点,无监听
      * @throws KeeperException
      * @throws InterruptedException
      */
      @Test
      public void testGet4() throws KeeperException, InterruptedException {
      List children = zooKeeper.getChildren("/ayo", false);
      children.stream().forEach(System.out::println);
      }


      **
      * 获取子节点, 同时监听子节点的变化
      * @throws KeeperException
      * @throws InterruptedException
      */
      @Test
      public void testGet5() throws KeeperException, InterruptedException {
      List children = zooKeeper.getChildren("/ayo", watchedEvent -> System.out.println(watchedEvent.getPath()));
      children.stream().forEach(System.out::println);
      }
      }

      以上代码我就不再演示了,有兴趣的,可以自己跑跑,实际开发中基本上也用不着,因为zk原生的API有很多问题:
      watcher机制是一次性的,每次触发过后需要重新添加监听
      session超时后不会自动重连,需要手动
      异常信息繁琐,开发人员不知道如何处理
      如果在创建节点时发生异常,需要自己再去检查节点是否创建成功
      不能递归创建和删除节点
      等等。
      • 2.2、Apache curator

      实际项目里通常使用的是第三方的开源客户端zkclient和curator,那这俩又有啥区别呢?个人认为curator要更优雅,通俗的说就是curator适合被开发人员使用,而zkclient适合被开源框架使用,比如dubbo和kafka。今天就以curator为例来操作下zk。
      java代码:
      • 2.2.1、初始化客户端

      2.2.2、创建节点并写入数据

      • 2.2.3、监听本节点的数据变化

      用ZooInspector修改数据即可

      • 2.2.4、监听子节点的数据变化

      用ZooInspector修改数据即可,删除子节点自己操作下,会打印子节点里的内容。

      • 2.2.5、删除节点

      可以看到,已经/zhubajie这个节点已经没有了。
      • 2.2.6、附录完整代码


        org.apache.curator
        curator-x-discovery
        2.5.0

          package com.ayo.zk;


          import org.apache.curator.RetryPolicy;
          import org.apache.curator.framework.CuratorFramework;
          import org.apache.curator.framework.CuratorFrameworkFactory;
          import org.apache.curator.framework.recipes.cache.*;
          import org.apache.curator.retry.ExponentialBackoffRetry;
          import org.junit.Before;
          import org.junit.Test;


          /**
          * 测试Zookeeper CuratorFramework 框架
          */
          public class CuratorFrameworkTest {


          CuratorFramework curator = null;


          /**
          * 初始化客户端
          *
          * retryPolicy参数是指在连接ZK服务过程中重新连接测策略.
          * 在它的实现类ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)中,
          * baseSleepTimeMs参数代表两次连接的等待时间,maxRetries参数表示最大的尝试连接次数
          */
          @Before
          public void init(){
          RetryPolicy policy = new ExponentialBackoffRetry(1000, 10);
          /**
          * CuratorFramework示例创建完成,代表ZooKeeper已经连接成功
          */
          curator = CuratorFrameworkFactory.builder().connectString("192.168.209.129:2181")
          .sessionTimeoutMs(2000).retryPolicy(policy).build();
          /**
          * 调用start()方法打开连接,在使用完毕后调用close()方法关闭连接
          */
          curator.start();
          }


          /**
          * 创建节点并给节点写入数据
          */
          @Test
          public void createNode() throws Exception {
          curator.create().forPath("/zhubajie");
          curator.setData().forPath("/zhubajie", "八戒老帅了".getBytes("GBK"));
          curator.create().forPath("/zhubajie/son");
          curator.setData().forPath("/zhubajie/son", "所以它儿子也很帅啊".getBytes("GBK"));
          }


          /**
          * 删除节点
          */
          @Test
          public void deleteNode() throws Exception {
          curator.delete().forPath("/zhubajie");
          }




          /**
          * 监听本节点的数据变化
          */
          @Test
          public void testWatch() throws Exception {
          final NodeCache nodeCache = new NodeCache(curator, "/zhubajie", false);
          nodeCache.getListenable().addListener(new NodeCacheListener() {
          public void nodeChanged() throws Exception {
          /**
          * 在这个地方实现你自己的逻辑,这里以打印节点信息为例
          */
          System.out.println("本节点的数据发生变化,路径为"
          + nodeCache.getCurrentData().getPath()
          + ",数据变为" + new String(nodeCache.getCurrentData().getData(), "GBK"));
          }
          });


          nodeCache.start();
          /**
          * 让程序休眠,观察打印情况
          */
          Thread.sleep(60000);


          }


          /**
          * 监听子节点的增加或者删除或者数据变化
          */
          @Test
          public void testWatchChild() throws Exception {
          PathChildrenCache pathChildrenCache = new PathChildrenCache(curator, "/zhubajie", true);
          pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
          public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
          /**
          * 在这个地方实现你自己的逻辑,这里以打印节点信息为例
          */
          System.out.println("子节点的数据发生变化,路径为"
          + pathChildrenCacheEvent.getData().getPath()
          + ",数据变为" + new String(pathChildrenCacheEvent.getData().getData(), "GBK"));
          }
          });
          pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
          /**
          * 让程序休眠,观察打印情况
          */
          Thread.sleep(60000);
          }
          }

          • 三、zk集群

          • 3.1、在另外两台机器上也安装下zk,步骤看上一篇文章即可,传送门初识zookeeper

          • 3.2、配置文件修改

          执行:
          cd /usr/local/apache-zookeeper-3.5.5-bin/(三台机器一样的操作)
          mkdir data(三台机器一样的操作)
          echo "0" > data/myid(另外两台机器上分别执行
          echo "1" > data/myid
          echo "2" > data/myid,但要注意和server.后面的数字对应

          • 3.3、分别启动三个zk并查看状态

          在三台机器上分别执行:
          cd /usr/local/apache-zookeeper-3.5.5-bin/bin/
          ./zkServer start
          ./zkServer.sh status

          把leader干掉:

          可以看到马上选举出了新的leader,达成高可用。
          • 3.4、查看数据同步情况

          分别连接上三个zk的客户端
          执行:
          cd /usr/local/apache-zookeeper-3.5.5-bin/bin/
          ./zkCli.sh
          在一个zk中创建一个节点,查看另外两个zk中是否也有即可:

          • 3.5、zk集群中的角色

          角色

          描述

          leader

          主节点,又名领导者。用于写入数据,通过选举产生,如果宕机将会选举新的主节点。

          follower

          子节点,又名追随者。用于实现数据的读取。同时他也是主节点的备选节点,并用拥有投票权。

          observer

          次级子节点,又名观察者。用于读取数据,与fllower区别在于没有投票权,不能选为主节点。并且在计算集群可用状态时不会将observer计算入内。

          注:想要成为observer只需要在集群配置中加上observer后缀即可,如:server.2=192.168.209.130:2889:3889:observer

          • 3.6、zk的选举时机和选举流程

          选举时机
          服务节点初始化启动(当节点初始起动时会在集群中寻找Leader节点,如果找到则与Leader建立连接,其自身状态变化follower或observer。如果没有找到Leader,当前节点状态将变化LOOKING,进入选举流程)
          半数以上的节点无法和Leader建立连接(在集群运行其间如果有follower或observer节点宕机只要不超过半数并不会影响整个集群服务的正常运行。但如果leader宕机,将暂停对外服务,所有follower将进入LOOKING 状态,进入选举流程)
          选举流程
          第一轮投票投给自己
          第二轮投票给myid比自己大的相邻节点
          如果得票超过半数,选举结束

          好了,zk集群也差不多就这些了,还有的就是zk的数据同步机制,包括它是怎么保证一致性的,这些就比较难了,后面专门开文章说。




          推荐阅读
          • 深入浅出TensorFlow数据读写机制
            本文详细介绍TensorFlow中的数据读写操作,包括TFRecord文件的创建与读取,以及数据集(dataset)的相关概念和使用方法。 ... [详细]
          • 本文介绍了Kettle资源库的基本概念、类型及其管理方法,同时探讨了Kettle的不同运行方式,包括图形界面、命令行以及API调用,并详细说明了日志记录的相关配置。 ... [详细]
          • 深入解析Java虚拟机(JVM)架构与原理
            本文旨在为读者提供对Java虚拟机(JVM)的全面理解,涵盖其主要组成部分、工作原理及其在不同平台上的实现。通过详细探讨JVM的结构和内部机制,帮助开发者更好地掌握Java编程的核心技术。 ... [详细]
          • 深入解析SpringMVC核心组件:DispatcherServlet的工作原理
            本文详细探讨了SpringMVC的核心组件——DispatcherServlet的运作机制,旨在帮助有一定Java和Spring基础的开发人员理解HTTP请求是如何被映射到Controller并执行的。文章将解答以下问题:1. HTTP请求如何映射到Controller;2. Controller是如何被执行的。 ... [详细]
          • 主调|大侠_重温C++ ... [详细]
          • 本文档汇总了Python编程的基础与高级面试题目,涵盖语言特性、数据结构、算法以及Web开发等多个方面,旨在帮助开发者全面掌握Python核心知识。 ... [详细]
          • 本文将详细介绍通过CAS(Central Authentication Service)实现单点登录的原理和步骤。CAS由耶鲁大学开发,旨在为多应用系统提供统一的身份认证服务。文中不仅涵盖了CAS的基本架构,还提供了具体的配置实例,帮助读者更好地理解和应用这一技术。 ... [详细]
          • 本文详细探讨了Java命令行参数的概念、使用方法及在实际编程中的应用,包括如何通过命令行传递参数给Java程序,以及如何在Java程序中解析这些参数。 ... [详细]
          • 本文详细介绍了如何在Python3环境中配置Appium1.4.6,并指导如何连接模拟器进行自动化测试。通过本文,您将了解从环境搭建到模拟器连接的完整流程。 ... [详细]
          • Zookeeper面试常见问题解析
            本文详细介绍了Zookeeper中的ZAB协议、节点类型、ACL权限控制机制、角色分工、工作状态、Watch机制、常用客户端、分布式锁实现、默认通信框架以及消息广播和领导选举的流程。 ... [详细]
          • 离线安装Grafana Cloudera Manager插件并监控CDH集群
            本文详细介绍如何离线安装Cloudera Manager (CM) 插件,并通过Grafana监控CDH集群的健康状况和资源使用情况。该插件利用CM提供的API接口进行数据获取和展示。 ... [详细]
          • 本文详细介绍了如何在Kendo UI for jQuery的数据管理组件中,将行标题字段呈现为锚点(即可点击链接),帮助开发人员更高效地实现这一功能。通过具体的代码示例和解释,即使是新手也能轻松掌握。 ... [详细]
          • 本文回顾了2017年的转型和2018年的收获,分享了几家知名互联网公司提供的工作机会及面试体验。 ... [详细]
          • window下kafka的安装以及测试
            目录一、安装JDK(需要安装依赖javaJDK)二、安装Kafka三、测试参考在Windows系统上安装消息队列kafka一、安装JDKÿ ... [详细]
          • 本文详细介绍了使用ZooKeeper构建高可用集群的方法,包括必要的软件环境准备、配置文件调整及集群启动等关键步骤。通常,一个ZooKeeper集群由奇数个节点组成,以确保Leader选举的有效性。 ... [详细]
          author-avatar
          手机用户2502937527
          这个家伙很懒,什么也没留下!
          PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
          Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有