热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ElasticJob2.1.5源码分布式环境下的Zookeeper节点监听机制

大家好,本文给大家介绍一下Elastic-Job分布式环境下的Zookeeper节点监

大家好,本文给大家介绍一下Elastic-Job 分布式环境下的Zookeeper节点监听机制,通过对常见监听器的分析和对基于Curator监听器的使用来为大家介绍,另外还提供了简单的观察者设计模式分享



Elastic-Job 分布式环境下的Zookeeper节点监听

文 | 宋小生




6.6.2 启动监听器

监听机制的实现是一种发布者/订阅者的观察者设计模式的实现。调度系统引入来Netflix实现的Curator组件,通过Curator提供的订阅机制来实现对节点事件订阅。

Curator 事件订阅有两种模式:

  • 一种是标准的观察模式

  • 一种是缓存监听模式

标准的监听模式是使用Watcher 监听器。第二种缓存监听模式引入了一种本地缓存视图的Cache机制,来实现对Zookeeper服务端事件监听。

Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache是一种缓存机制,可以借助Cache实现监听。简单来说,Cache在客户端缓存了znode的各种状态,当感知到Zookeeper集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。接下来我们来看下在调度系统中有哪些常见的监听器,首先来看下startAllListeners方法的源码:

/**
* 开启所有监听器.
*/

public void startAllListeners() {
    electionListenerManager.start();
    shardingListenerManager.start();
    failoverListenerManager.start();
    monitorExecutionListenerManager.start();
    shutdownListenerManager.start();
    triggerListenerManager.start();
    rescheduleListenerManager.start();
    guaranteeListenerManager.start();
    jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}

startAllListeners方法通过调用监听器管理对象的start方法来启动各个监听器进行节点信息的订阅。

6.6.3 监听器大全

这么多的监听器有什么作用,又会在什么场景进行触发呢,下面我们就详细看一下每个监听管理器中的监听器的作用,触发条件,和具体业务。这个可以当作参考表格,后面用来详细讲解每一块功能的时候做为参考。

选主监听管理器ElectionListenerManager
类型LeaderElectionJobListenerLeaderAbdicationJobListener
作用主节点选举监听器主节点退位监听器,当前机器状态被设置为不可见时候则删除主节
条件当前作业未关闭下 (leader节点不存在并且本地节点状态为启用状态)或者(leader节点被移除并且本地服务器状态为可用状态)作业未关闭,本地节点是主节点,并且监听到本机器被设置为了不可见状态
事件分布式锁执行主节点选举,主节点的选举过程各个作业实例会依次在节点 leader/election/latch下创建临时顺序节点来获取分布式锁,通过获取到分布式锁来执行选举过程,选举过程中写入当前实例IP 到leader/election/instance节点下用来标示当前作业的主节点删除主节点
分片监听管理器ShardingListenerManager
类型ShardingTotalCountChangedJobListenerListenServersChangedJobListener
作用监听作业分片总数配置变更,如果本地内存注册的分片总数与Zookeeper配置节点中存储的分片总数不一致则设置分片标记重新分片。服务器状态变更监听器
条件当前作业配置变更并且当前作业内存中分片总数不为0,并且内存中注册表中当前作业的分片总数和Zookeeper注册中心中最新分片总数不一致作业未关闭并且(作业实例节点或者服务器节点发生了变化)。
事件设置重新分片标记,创建分片标记节点leader/sharding/necessary更新内存中JobRegistry注册表中的分片总数。设置重新分片标记,创建节点leader/sharding/necessary。

幂等性监听管理器MonitorExecutionListenerManager
类型MonitorExecutionSettingsChangedJobListener
作用在作业配置monitorExecution幂等配置为false的时候,删除作业的运行状态标记。
条件作业配置发生来变更,并且将monitorExecution设置为了false。
事件清除所有分片项的运行状态节点(sharding/%s/running)。
实例关闭监听器管理器ShutdownListenerManager
类型InstanceShutdownStatusJobListener
作用实例关闭的时候做一些关闭资源的操作。
条件内存中作业注册表中作业调度控制器对象和实例对象还存在,作业触发状态不是暂停状态paused,监听到当前实例对应的节点被移除,并且当前作业对应的当前实例已经不存在了(不是重连导致的)
事件如果是主节点,则移除主节点标记。关闭监控服务(对外开放的socket端口)如果诊断服务正在运行,则关闭诊断服务定时任务。移除作业调度控制器注册表信息,关闭调度。移除注册中心注册表信息,删除zookeeper作业缓存。移除注册表中的作业实例对象。移除注册表中的作业运行对象。移除注册表中的作业分片总数 。
作业触发任务监听管理器TriggerListenerManager
类型

JobTriggerStatusJobListener

作用监听是否在作业的instances节点下设置了触发作业的标示节点TRIGGER
条件在当前实例对应的instances节点下新增了TRIGGER节点
事件删除触发标示,本地未运行作业的话则触发一次作业的执行。
 
重新调度监听管理器RescheduleListenerManager
类型CronSettingAndJobEventChangedJobListener
作用监听作业配置CRON表达式变更,执行rescheduleJob方法重新调度作业保证CRON表达式生效。
条件作业配置的CRON表达式发生了变更,并且作业未关闭 。
事件调用Quartz的rescheduleJob重新调度作业,来保证新的CRON表达式生效

保证分布式服务全部开始结束监听管理器GuaranteeListenerManager
类型StartedNodeRemovedJobListenerCompletedNodeRemovedJobListener
作用当所有分片项机器准备业务做完,需要执行作业的时候会处于wait等待状态,这个监听器是用来唤醒所有处于等待状态任务线程来继续执行作业。所有分片项作业执行完毕,需要结束的作业线程会处于wait等待状态,这个监听器是用来唤醒所有处于等待结束的任务线程来结束作业。
条件当监听到保证启动节点移除(jobName/guarantee/started)。保证作业完成的节点被移除(jobName/guarantee/completed)
事件使用notifyAll方法唤醒所有处于wait状态的作业线程。使用notifyAll方法唤醒所有处于wait状态的作业线程。
 
重连注册中心监听器RegistryCenterConnectionStateListener
类型RegistryCenterConnectionStateListener
作用与注册中心的连接状态断开或者重新连上之后分别针对作业进行暂停和恢复逻辑
条件与注册中心的连接状态发生变更的时候
事件当与注册中心连接丢失或者挂起的时候暂停作业,当与注册中心连接恢复的时候则恢复作业执行
 
6.6.4 监听器使用

了解了监听器的作用,触发条件和具体业务事件,我们可以通过选主节点监听器来看下监听器是如何进行注册的。以ElectionListenerManager主节点监听管理器中的主节点选举监听器LeaderElectionJobListener为例,来看一下启动监听器的过程,在调用ElectionListenerManager的start方法时候会触发如下代码:

@Override
public void start() {
    addDataListener(new LeaderElectionJobListener());
    addDataListener(new LeaderAbdicationJobListener());
}


主要包含了两步:

  • 创建选主节点监听器

  • 添加数据监听器

(1)创建选主节点监听器

先来看下主节点选举监听器类型的必要实现

LeaderElectionJobListener类型与其他监听器一样通过继承AbstractJobListener作业监听器模版重写dataChanged模版方法来在节点发生变更的时候触发监听业务。而作业监听器模版通过实现TreeCacheListener接口,通过重写childEvent方法来实现节点的通知。

TreeCacheListener接口是Curator框架中用来进行对外监听通知的监听接口,主要包含childEvent方法在节点发生变化的时候,进行调用监听对象的childEvent方法,类型继承关系如下图所示

图6.3 调度系统选主监听UML

TreeCacheListener的childEvent一般在哪些场景会被触发呢,这个就需要看Curator包中的TreeCacheEvent.Type枚举类型,一般在添加节点,移除节点,节点数据变更,初始化连接,连接断开,连接重连,连接挂起时候被触发。

(2)添加数据监听器

了解了节点监听何时和如何被调用的我们还需要看一个节点如何才能注册成为订阅对象让Curator的监听事件触发。接下来我们就来看 addDataListener方法,在ElectionListenerManager类型中addDataListener方法主要通过调用jobNodeStorage方法的addDataListener如下代码所示:

protected void addDataListener(final TreeCacheListener listener) {
    jobNodeStorage.addDataListener(listener);
}

jobNodeStorage在第3章中我们提到过是用来封装对注册中心的逻辑操作,在注册中心对象针对节点的增删改查逻辑基础上进行二次封装,这里我们会看到jobNodeStorage类型中二次封装了添加数据监听的方法,逻辑如下所示:

/**
 * 注册数据监听器.
 * @param listener 数据监听器
 */

public void addDataListener(final TreeCacheListener listener) {
    TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
    cache.getListenable().addListener(listener);
}


  • 获取TreeCache对象

  • 向TreeCache监听对象中添加监听器

TreeCache尝试将ZooKeeper路径的所有子级中的所有数据保留在本地。此类将监视Zookeeper路径,响应更新/创建/删除事件,下拉数据等。可以通过获取TreeCache的Listener容器来将自定义监听器注册到容器中,当发生更改时,该自定义监听器将得到通知。

整理下监听配置一共经历了几个步骤:

  • 编写监听器实现TreeCacheListener接口

  • 向TreeCache的监听器容器中添加监听器对象

  • 节点发生变更则触发监听器的childEvent方法

  • childEvent方法调用dataChanged方法

  • 在dataChanged方法中处理具体的业务逻辑

6.6.5 观察者设计模式 

整理好了调度系统中使用Curator框架针对Zookeeper节点监听器的实现,为大家分享下监听器模式的底层设计模式思想,对于监听订阅,当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。

举个观察者的案例,比如我们订阅报纸,我们可以订阅报纸,也可以取消订阅报纸,当我们订阅报纸时有新的报纸更新了就需要一个一个通知订阅了报纸的我们,当我们取消了订阅就无法获取报纸新的更新,再比如我们关注了中间件源码微信公众号,当有新的内容发布时候我们就可以收到更新的内容,当我们取消订阅的时候有新的优质内容发布的时候我们就无法感知,所以建议大家关注中间件源码微信公众号不要取消关注,

那观察者设计模式有什么优缺点呢?可以看如下表格:


优点
缺点
观察者和订阅主题(被观察者)是抽象耦合的观者者过多的时候就需要连续遍历,过多的时候就需要考虑算法的优化
适合触发机制观察者订阅者不能出现循环调用依赖,容易导致堆栈溢

那如何实现一个简单的观察者设计模式呢

我们可以建立如下模型:

  • 主题(被观察者)用来订阅

  •  抽象观察者,用来与主题耦合

  • 具体观察者通过继承抽象观察者,用来订阅主题


主题(被观察者)类型的设计Subject
成员变量集合,用来存储订阅了主题的观察者
观察者订阅主题方法
取消订阅主题的观察者方法
题更新方法(主题更新方法会触发所有订阅了主题的观察者)


抽象的观察者设计
观察者主题更新的通知方法(主题更新的时候需要触发观察者的通知方法


具体的观察者设计
实现抽象的观察者,并重写通知方法


使用UML图可以如下标示:

图 6.4 观察者设计模式UML


在UML示例中一个主题(被观察者)包含了多个抽象观察者,主题可以动态添加(注册)观察者对象,也可以移除,当有事件产生的时候可以通知所有观察者触发监听notify方法。

接下来我们就来看下主题(被观察者)的代码实现:

import java.util.ArrayList;
import java.util.List;

public class Subject {

    /**
     * 订阅了主题的观察者集合
     */

    private List observerList = new ArrayList<>();

    /**
     * 添加观察者
     * @param observer
     */

    public void addObserver(Observer observer) {
        observerList.add(observer);
    }


    /**
     * 移除观察者     
     * @param observer
     */

    public void removeObserver(Observer observer) {
        observerList.remove(observer);
    }

    /**
     * 通知触发函数     
     * @param msg
     */

    public void notify(String msg) {
        observerList.forEach(observer -> {
            observer.notify(msg);
        });
    }
}


抽象的观察者:

public abstract class Observer {

   public abstract void notify(String msg);

}

具体的观察者:

public class MyObserver extends Observer {

    @Override
    public void notify(String msg) {
        System.out.println(msg);
    }
}


运行测试

public class DemoMain {
    public static void main(String[] args) {
        Subject subject = new Subject();
        subject.addObserver(new MyObserver());
        subject.addObserver(new MyObserver());
        subject.notify("hell world");
    }
}

 

输出结果:


在Demo中我们执行了如下的流程:

  • 创建了一个订阅主题

  • 创建观察者1对象同时添加到主题对象中(观察者订阅主题

  • 创建观察者2对象同时添加到主题对象中(观察者订阅主题)

  • 执行主题的通知方法

  • 最后可以看到两个控制台可以打印两条数据,这两条数据分别是两个观察者打印的


通过对调度监听器的了解和对观察者设计模式的学习,后面如果有订阅主题的业务场景可以尝试使用观察者模式尝试一下,观察者设计模式在页面控件触发机制,网络请求回调机制,消息订阅等功能里面我们都会再遇到 。

- END -




推荐阅读
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 解决Sharepoint 2013运行状况分析出现的“一个或多个服务器未响应”问题的方法
    本文介绍了解决Sharepoint 2013运行状况分析中出现的“一个或多个服务器未响应”问题的方法。对于有高要求的客户来说,系统检测问题的存在是不可接受的。文章详细描述了解决该问题的步骤,包括删除服务器、处理分布式缓存留下的记录以及使用代码等方法。同时还提供了相关关键词和错误提示信息,以帮助读者更好地理解和解决该问题。 ... [详细]
  • LVS实现负载均衡的原理LVS负载均衡负载均衡集群是LoadBalance集群。是一种将网络上的访问流量分布于各个节点,以降低服务器压力,更好的向客户端 ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • Oracle seg,V$TEMPSEG_USAGE与Oracle排序的关系及使用方法
    本文介绍了Oracle seg,V$TEMPSEG_USAGE与Oracle排序之间的关系,V$TEMPSEG_USAGE是V_$SORT_USAGE的同义词,通过查询dba_objects和dba_synonyms视图可以了解到它们的详细信息。同时,还探讨了V$TEMPSEG_USAGE的使用方法。 ... [详细]
  • 单页面应用 VS 多页面应用的区别和适用场景
    本文主要介绍了单页面应用(SPA)和多页面应用(MPA)的区别和适用场景。单页面应用只有一个主页面,所有内容都包含在主页面中,页面切换快但需要做相关的调优;多页面应用有多个独立的页面,每个页面都要加载相关资源,页面切换慢但适用于对SEO要求较高的应用。文章还提到了两者在资源加载、过渡动画、路由模式和数据传递方面的差异。 ... [详细]
  • php缓存ri,浅析ThinkPHP缓存之快速缓存(F方法)和动态缓存(S方法)(日常整理)
    thinkPHP的F方法只能用于缓存简单数据类型,不支持有效期和缓存对象。S()缓存方法支持有效期,又称动态缓存方法。本文是小编日常整理有关thinkp ... [详细]
  • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
  • 近期,某用户在重启RAC一个节点的数据库实例时,发现启动速度非常慢。同时业务部门反馈连接RAC存活节点的业务也受影响。通过对日志的分析, ... [详细]
  • {moduleinfo:{card_count:[{count_phone:1,count:1}],search_count:[{count_phone:4 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • 本文整理了Java中com.evernote.android.job.JobRequest.getTransientExtras()方法的一些代码示例,展示了 ... [详细]
author-avatar
Ymgif影像--阿雅XX_506
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有