热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

分布式开源任务调度框架TBSchedule深度解析与应用实践

本文深入解析了分布式开源任务调度框架TBSchedule的核心原理与应用场景,并通过实际案例详细介绍了其部署与使用方法。首先,从源码下载开始,详细阐述了TBSchedule的安装步骤和配置要点。接着,探讨了该框架在大规模分布式环境中的性能优化策略,以及如何通过灵活的任务调度机制提升系统效率。最后,结合具体实例,展示了TBSchedule在实际项目中的应用效果,为开发者提供了宝贵的实践经验。

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

TBSchedule使用

一、下载TBSchedule 源码

http://code.taobao.org/svn/tbschedule/

二、编译TBSchedule源码形成jar包

mvn package/直接mvn deploye/install 一步完成

三、安装到本地的maven仓库

mvn deploy/install

四、在项目pom配置文件中引用这个依赖文件

com.taobao.pamirs.schedule

tbschedule

3.3.3.2

五、配置TBSchedule依赖的zookeeper配置,TBSchedule的调度依赖于zookeeper的节点配置和心跳链接。

 

# zooker service address

schedule.zookeeper.address=120.25.87.176:2181

# root path

schedule.root.path=/bbb/dd

# session timeout

schedule.session.timeout=60000

# userName

schedule.zookeeper.username=ScheduleAdmin

# password

schedule.zookeeper.password=password

 

init-method="init">

//在控制台连接zookeeper的路径要和该配置的路径一致,由于配置时默认要检查父节点是否为空,所以最好先检查该路径是否存在。

六、配置任务项和任务策略,将任务分片,查询任务

六-一、通过代码实现任务的分配

private Logger log = LoggerFactory

.getLogger(getClass());

TBScheduleManagerFactory scheduleManagerFactory;

/**

* 心跳连接时间

*/

private int heartBeatRate = 5*1000;

 

private int judgeDeadInterval = 1*60*1000;

 

private String taskParameter="";

 

private String strategyTaskParameter="";

 

/**

* jvm最大单线程数量

*/

private int numOfSingleServer = 2;

 

/**

* 最大线程组数量 总 最大线程数量 = jvm最大单线程数量 * 最大的线程组总量

* 将任务列表分成几个队列来处理

*/

private int assignNum = 2;

 

private String[] iPLists={"127.0.0.1"};

 

private String[] taskItems={"0","1","2","3","4","5","6","7","8","9"};

 

private String[] baseTaskTypeNames;

 

private String[] dealBeanNames;

 

/**

* 开始时间

*/

private String permitRunStartTime;

 

/**

* 结束时间

*/

private String permitRunEndTime;

 

/**

* 批处理时 每次处理任务的数量

*/

private int executeNumber = 1;

 

/**

* 处理完一批数据后的休息时间

*/

private int sleepTimeInterval = 0;

 

/**

* 采集不到数据的休眠时间

*/

private int sleepTimeNoData = 500;

 

/**

* 每次采集任务的数量

*/

private int fetchDataNumber = 500;

 

@Autowired

public void setScheduleManagerFactory(

TBScheduleManagerFactory tbScheduleManagerFactory) {

this.scheduleManagerFactory = tbScheduleManagerFactory;

}

 

@Test

public void initialConfigData() throws Exception {

}

 

@Override

public void afterPropertiesSet() throws Exception {

if(baseTaskTypeNames.length != dealBeanNames.length) {

throw new RuntimeException("task definition error, baseTaskTypeNames length not equals dealBeanNames length");

}

String baseTaskTypeName;

String dealBeanName;

for (int i = 0; i

baseTaskTypeName = baseTaskTypeNames[i];

dealBeanName= dealBeanNames[i];

configTasks(baseTaskTypeName, dealBeanName);

}

}

 

private void configTasks(String baseTaskTypeName, String dealBeanName)

throws Exception, InterruptedException {

while(this.scheduleManagerFactory.isZookeeperInitialSucess() == false){

Thread.sleep(1000);

}

scheduleManagerFactory.stopServer(null);

Thread.sleep(1000);

try {

this.scheduleManagerFactory.getScheduleDataManager()

.deleteTaskType(baseTaskTypeName);

} catch (Exception e) {

 

}

ScheduleTaskType baseTaskType = new ScheduleTaskType();

baseTaskType.setBaseTaskType(baseTaskTypeName);

baseTaskType.setDealBeanName(dealBeanName);

baseTaskType.setHeartBeatRate(heartBeatRate);

baseTaskType.setJudgeDeadInterval(judgeDeadInterval);

baseTaskType.setTaskParameter(taskParameter);

baseTaskType.setTaskItems(taskItems);

baseTaskType.setPermitRunStartTime(permitRunStartTime);

baseTaskType.setPermitRunEndTime(permitRunEndTime);

baseTaskType.setExecuteNumber(executeNumber);

baseTaskType.setSleepTimeInterval(sleepTimeInterval);

baseTaskType.setSleepTimeNoData(sleepTimeNoData);

baseTaskType.setFetchDataNumber(fetchDataNumber);

this.scheduleManagerFactory.getScheduleDataManager()

.createBaseTaskType(baseTaskType);

String taskName = baseTaskTypeName + "$TEST";

String strategyName = baseTaskTypeName +"-Strategy";

try {

this.scheduleManagerFactory.getScheduleStrategyManager()

.deleteMachineStrategy(strategyName,true);

} catch (Exception e) {

e.printStackTrace();

}

ScheduleStrategy strategy = new ScheduleStrategy();

strategy.setStrategyName(strategyName);

strategy.setKind(ScheduleStrategy.Kind.Schedule);

strategy.setTaskName(taskName);

strategy.setTaskParameter(strategyTaskParameter);

strategy.setNumOfSingleServer(numOfSingleServer);

strategy.setAssignNum(assignNum);

strategy.setIPList(iPLists);

this.scheduleManagerFactory.getScheduleStrategyManager()

.createScheduleStrategy(strategy);

log.info("创建调度任务成功" + strategy.toString());

}

六-二、通过TBSchedule自带的控制台来实现任务的配置

 

七、任务的理解

1、同一个jvm中,不同线程之间如何防止任务被重复执行?一个scheduleServer的内部线程间如何进行任务分片?

答复:1、数据分片是在不同的jvm,获知同一个jvm中不同的线程组间起作用。在同一个线程组内的10个线程,是通过一个同步的任务队列来实现的。2、每个线程从队列中取任务执行,如果没有任务了,则由一个线程负责调用selectTasks方法再获取一批新的任务。

主要是设置休眠时间:即selectTasks方法返回列表的size为0后,进入休眠。休眠完成之后重新执行该定时任务

2、任务项设置的意义和selectTasks方法的参数含义

答复:1、 任务项(0,1,2,3,4,5,6,7,8,9)就是任务分片的策略。这个配置就是把数据分成10片。可以表示 ID的最后一位,也可以是一个独立的字段。根据你的业务来定。

2、 如果只有1组线程,则所有的任务片都分配给他。这时selectTasks方法的参数:taskItemNum =10, queryCondition由10个元素,分别对应0,1,2,3,4,5,6,7,8,9。

a) 如果只有2组线程,则任务片被分成两份。这时

b) 一个线程组的selectTasks方法的参数:taskItemNum =10, queryCondition有5个元素(0,2,4,6,8)

3、 另外一个线程组的selectTasks方法的参数:taskItemNum =10, queryCondition有5个元素(1,3,5,7,9)

4、 如果有10个线程组。则每组线程只会获取到1个任务片。这时selectTasks方法的参数:taskItemNum =10, queryCondition只有一个元素,对应0到9中的一个。

1、 执行期间和时间的修改功能 a) 在创建任务和修改任务的时候,有两个属性(执行开始时间,执行结算时间)用于控制任务的执行时间。 b) 时间格式遵循标准的cron格式 http://dogstar.javaeye.com/blog/116130 还增强了原来不支持的倒数第几天的能力。 c) 当时间到底开始时间的时候,就开始执行任务,到达结束时间则终止调度(不管是否所有的任务都处理完)。如果没有设置执行结束时间。则一直运行,直到selectTasks返回的记录数为0,就终止执行。等待下个开始运行时间在启动。 d) 如果要动态修改任务的执行时间区间,则先 点击“暂停”按钮,等所有的服务器都停止完毕(大概需要几秒时间)。当再次单击任务,出现如下情形表示停止完毕。然后修改执行开始时间,执行结算时间。在恢复任务调度,就可以实现调度时间的修改 2、 任务处理的问题 a) Schedule主要是提供任务调度的分配管理。每一个任务是否执行成功,是通过业务方的bean来实现的。 b) 你需求的例子,我理解的解决方案如下: i. 你从云梯拉下来100万数据放到保险应用的数据库中。这个表中有两个关键字段USER_ID和STS(状态 0-未发送,1-已发送) ii. 在bean的selectTasks方法的查询sql中除了根据任务进行分片外,还需要增加状态条件。例如 USER_ID % 10 in( ?,?,?) AND sts =0 iii. 在bean的execute方法中,在发送完消息后,你还需要 修改数据状态 update table STS =1 where USER_ID =? 。这样下次就不会取到这条数据了。 iv. 这样就可以保障机器重新启动后,也不会出现问题。你可以参考DBDemoSingle.java的实现模式。你使用的接口应该是IScheduleTaskDealSingle。 如果旺旺的接口支持批量发送消息的时候,你才需要使用IScheduleTaskDealMulti接口。


转:https://my.oschina.net/u/1867229/blog/850598



推荐阅读
  • Python与R语言在功能和应用场景上各有优势。尽管R语言在统计分析和数据可视化方面具有更强的专业性,但Python作为一种通用编程语言,适用于更广泛的领域,包括Web开发、自动化脚本和机器学习等。对于初学者而言,Python的学习曲线更为平缓,上手更加容易。此外,Python拥有庞大的社区支持和丰富的第三方库,使其在实际应用中更具灵活性和扩展性。 ... [详细]
  • 在CentOS上部署和配置FreeSWITCH
    在CentOS系统上部署和配置FreeSWITCH的过程涉及多个步骤。本文详细介绍了从源代码安装FreeSWITCH的方法,包括必要的依赖项安装、编译和配置过程。此外,还提供了常见的配置选项和故障排除技巧,帮助用户顺利完成部署并确保系统的稳定运行。 ... [详细]
  • Spring Boot 实战(一):基础的CRUD操作详解
    在《Spring Boot 实战(一)》中,详细介绍了基础的CRUD操作,涵盖创建、读取、更新和删除等核心功能,适合初学者快速掌握Spring Boot框架的应用开发技巧。 ... [详细]
  • 本题库精选了Java核心知识点的练习题,旨在帮助学习者巩固和检验对Java理论基础的掌握。其中,选择题部分涵盖了访问控制权限等关键概念,例如,Java语言中仅允许子类或同一包内的类访问的访问权限为protected。此外,题库还包括其他重要知识点,如异常处理、多线程、集合框架等,全面覆盖Java编程的核心内容。 ... [详细]
  • 在软件开发领域,“池”技术被广泛应用,如数据库连接池、线程池等。本文重点探讨Java中的线程池ThreadPoolExecutor,通过详细解析其内部机制,帮助开发者理解如何高效利用线程池管理任务执行。线程池不仅能够显著减少系统资源的消耗,提高响应速度,还能通过合理的配置,如饱和策略,确保在高负载情况下系统的稳定性和可靠性。文章还将结合实际案例,展示线程池在不同应用场景下的具体实现与优化技巧。 ... [详细]
  • 开发心得:利用 Redis 构建分布式系统的轻量级协调机制
    开发心得:利用 Redis 构建分布式系统的轻量级协调机制 ... [详细]
  • Java集合框架特性详解与开发实践笔记
    Java集合框架特性详解与开发实践笔记 ... [详细]
  • 浅析Java泛型及其应用
    Java泛型是自JDK 5引入的一项重要特性,旨在增强代码的类型安全性和复用性。通过泛型,开发人员可以在编译阶段进行类型检查,有效避免运行时的类型转换错误。本文将探讨Java泛型的基本概念、实现机制及其在实际开发中的应用场景,帮助读者深入理解并灵活运用这一强大工具。 ... [详细]
  • 深入解析Gradle中的Project核心组件
    在Gradle构建系统中,`Project` 是一个核心组件,扮演着至关重要的角色。通过使用 `./gradlew projects` 命令,可以清晰地列出当前项目结构中包含的所有子项目,这有助于开发者更好地理解和管理复杂的多模块项目。此外,`Project` 对象还提供了丰富的配置选项和生命周期管理功能,使得构建过程更加灵活高效。 ... [详细]
  • 探讨 `org.openide.windows.TopComponent.componentOpened()` 方法的应用及其代码实例分析 ... [详细]
  • 在 Linux 系统中,`/proc` 目录实现了一种特殊的文件系统,称为 proc 文件系统。与传统的文件系统不同,proc 文件系统主要用于提供内核和进程信息的动态视图,通过文件和目录的形式呈现。这些信息包括系统状态、进程细节以及各种内核参数,为系统管理员和开发者提供了强大的诊断和调试工具。此外,proc 文件系统还支持实时读取和修改某些内核参数,增强了系统的灵活性和可配置性。 ... [详细]
  • Java 8 引入了 Stream API,这一新特性极大地增强了集合数据的处理能力。通过 Stream API,开发者可以更加高效、简洁地进行集合数据的遍历、过滤和转换操作。本文将详细解析 Stream API 的核心概念和常见用法,帮助读者更好地理解和应用这一强大的工具。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • Go语言中Goroutine与通道机制及其异常处理深入解析
    在Go语言中,Goroutine可视为一种轻量级的并发执行单元,其资源消耗远低于传统线程,初始栈大小仅为2KB,而普通线程则通常需要几MB。此外,Goroutine的调度由Go运行时自动管理,能够高效地支持成千上万个并发任务。本文深入探讨了Goroutine的工作原理及其与通道(channel)的配合使用,特别是在异常处理方面的最佳实践,为开发者提供了一套完整的解决方案,以确保程序的稳定性和可靠性。 ... [详细]
  • 深入解析十大经典排序算法:动画演示、原理分析与代码实现
    本文深入探讨了十种经典的排序算法,不仅通过动画直观展示了每种算法的运行过程,还详细解析了其背后的原理与机制,并提供了相应的代码实现,帮助读者全面理解和掌握这些算法的核心要点。 ... [详细]
author-avatar
手浪用户2602914837
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有