热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

分布式开源任务调度框架TBSchedule深度解析与应用实践

本文深入解析了分布式开源任务调度框架TBSchedule的核心原理与应用场景,并通过实际案例详细介绍了其部署与使用方法。首先,从源码下载开始,详细阐述了TBSchedule的安装步骤和配置要点。接着,探讨了该框架在大规模分布式环境中的性能优化策略,以及如何通过灵活的任务调度机制提升系统效率。最后,结合具体实例,展示了TBSchedule在实际项目中的应用效果,为开发者提供了宝贵的实践经验。

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

TBSchedule使用

一、下载TBSchedule 源码

http://code.taobao.org/svn/tbschedule/

二、编译TBSchedule源码形成jar包

mvn package/直接mvn deploye/install 一步完成

三、安装到本地的maven仓库

mvn deploy/install

四、在项目pom配置文件中引用这个依赖文件

com.taobao.pamirs.schedule

tbschedule

3.3.3.2

五、配置TBSchedule依赖的zookeeper配置,TBSchedule的调度依赖于zookeeper的节点配置和心跳链接。

 

# zooker service address

schedule.zookeeper.address=120.25.87.176:2181

# root path

schedule.root.path=/bbb/dd

# session timeout

schedule.session.timeout=60000

# userName

schedule.zookeeper.username=ScheduleAdmin

# password

schedule.zookeeper.password=password

 

init-method="init">

//在控制台连接zookeeper的路径要和该配置的路径一致,由于配置时默认要检查父节点是否为空,所以最好先检查该路径是否存在。

六、配置任务项和任务策略,将任务分片,查询任务

六-一、通过代码实现任务的分配

private Logger log = LoggerFactory

.getLogger(getClass());

TBScheduleManagerFactory scheduleManagerFactory;

/**

* 心跳连接时间

*/

private int heartBeatRate = 5*1000;

 

private int judgeDeadInterval = 1*60*1000;

 

private String taskParameter="";

 

private String strategyTaskParameter="";

 

/**

* jvm最大单线程数量

*/

private int numOfSingleServer = 2;

 

/**

* 最大线程组数量 总 最大线程数量 = jvm最大单线程数量 * 最大的线程组总量

* 将任务列表分成几个队列来处理

*/

private int assignNum = 2;

 

private String[] iPLists={"127.0.0.1"};

 

private String[] taskItems={"0","1","2","3","4","5","6","7","8","9"};

 

private String[] baseTaskTypeNames;

 

private String[] dealBeanNames;

 

/**

* 开始时间

*/

private String permitRunStartTime;

 

/**

* 结束时间

*/

private String permitRunEndTime;

 

/**

* 批处理时 每次处理任务的数量

*/

private int executeNumber = 1;

 

/**

* 处理完一批数据后的休息时间

*/

private int sleepTimeInterval = 0;

 

/**

* 采集不到数据的休眠时间

*/

private int sleepTimeNoData = 500;

 

/**

* 每次采集任务的数量

*/

private int fetchDataNumber = 500;

 

@Autowired

public void setScheduleManagerFactory(

TBScheduleManagerFactory tbScheduleManagerFactory) {

this.scheduleManagerFactory = tbScheduleManagerFactory;

}

 

@Test

public void initialConfigData() throws Exception {

}

 

@Override

public void afterPropertiesSet() throws Exception {

if(baseTaskTypeNames.length != dealBeanNames.length) {

throw new RuntimeException("task definition error, baseTaskTypeNames length not equals dealBeanNames length");

}

String baseTaskTypeName;

String dealBeanName;

for (int i = 0; i

baseTaskTypeName = baseTaskTypeNames[i];

dealBeanName= dealBeanNames[i];

configTasks(baseTaskTypeName, dealBeanName);

}

}

 

private void configTasks(String baseTaskTypeName, String dealBeanName)

throws Exception, InterruptedException {

while(this.scheduleManagerFactory.isZookeeperInitialSucess() == false){

Thread.sleep(1000);

}

scheduleManagerFactory.stopServer(null);

Thread.sleep(1000);

try {

this.scheduleManagerFactory.getScheduleDataManager()

.deleteTaskType(baseTaskTypeName);

} catch (Exception e) {

 

}

ScheduleTaskType baseTaskType = new ScheduleTaskType();

baseTaskType.setBaseTaskType(baseTaskTypeName);

baseTaskType.setDealBeanName(dealBeanName);

baseTaskType.setHeartBeatRate(heartBeatRate);

baseTaskType.setJudgeDeadInterval(judgeDeadInterval);

baseTaskType.setTaskParameter(taskParameter);

baseTaskType.setTaskItems(taskItems);

baseTaskType.setPermitRunStartTime(permitRunStartTime);

baseTaskType.setPermitRunEndTime(permitRunEndTime);

baseTaskType.setExecuteNumber(executeNumber);

baseTaskType.setSleepTimeInterval(sleepTimeInterval);

baseTaskType.setSleepTimeNoData(sleepTimeNoData);

baseTaskType.setFetchDataNumber(fetchDataNumber);

this.scheduleManagerFactory.getScheduleDataManager()

.createBaseTaskType(baseTaskType);

String taskName = baseTaskTypeName + "$TEST";

String strategyName = baseTaskTypeName +"-Strategy";

try {

this.scheduleManagerFactory.getScheduleStrategyManager()

.deleteMachineStrategy(strategyName,true);

} catch (Exception e) {

e.printStackTrace();

}

ScheduleStrategy strategy = new ScheduleStrategy();

strategy.setStrategyName(strategyName);

strategy.setKind(ScheduleStrategy.Kind.Schedule);

strategy.setTaskName(taskName);

strategy.setTaskParameter(strategyTaskParameter);

strategy.setNumOfSingleServer(numOfSingleServer);

strategy.setAssignNum(assignNum);

strategy.setIPList(iPLists);

this.scheduleManagerFactory.getScheduleStrategyManager()

.createScheduleStrategy(strategy);

log.info("创建调度任务成功" + strategy.toString());

}

六-二、通过TBSchedule自带的控制台来实现任务的配置

 

七、任务的理解

1、同一个jvm中,不同线程之间如何防止任务被重复执行?一个scheduleServer的内部线程间如何进行任务分片?

答复:1、数据分片是在不同的jvm,获知同一个jvm中不同的线程组间起作用。在同一个线程组内的10个线程,是通过一个同步的任务队列来实现的。2、每个线程从队列中取任务执行,如果没有任务了,则由一个线程负责调用selectTasks方法再获取一批新的任务。

主要是设置休眠时间:即selectTasks方法返回列表的size为0后,进入休眠。休眠完成之后重新执行该定时任务

2、任务项设置的意义和selectTasks方法的参数含义

答复:1、 任务项(0,1,2,3,4,5,6,7,8,9)就是任务分片的策略。这个配置就是把数据分成10片。可以表示 ID的最后一位,也可以是一个独立的字段。根据你的业务来定。

2、 如果只有1组线程,则所有的任务片都分配给他。这时selectTasks方法的参数:taskItemNum =10, queryCondition由10个元素,分别对应0,1,2,3,4,5,6,7,8,9。

a) 如果只有2组线程,则任务片被分成两份。这时

b) 一个线程组的selectTasks方法的参数:taskItemNum =10, queryCondition有5个元素(0,2,4,6,8)

3、 另外一个线程组的selectTasks方法的参数:taskItemNum =10, queryCondition有5个元素(1,3,5,7,9)

4、 如果有10个线程组。则每组线程只会获取到1个任务片。这时selectTasks方法的参数:taskItemNum =10, queryCondition只有一个元素,对应0到9中的一个。

1、 执行期间和时间的修改功能 a) 在创建任务和修改任务的时候,有两个属性(执行开始时间,执行结算时间)用于控制任务的执行时间。 b) 时间格式遵循标准的cron格式 http://dogstar.javaeye.com/blog/116130 还增强了原来不支持的倒数第几天的能力。 c) 当时间到底开始时间的时候,就开始执行任务,到达结束时间则终止调度(不管是否所有的任务都处理完)。如果没有设置执行结束时间。则一直运行,直到selectTasks返回的记录数为0,就终止执行。等待下个开始运行时间在启动。 d) 如果要动态修改任务的执行时间区间,则先 点击“暂停”按钮,等所有的服务器都停止完毕(大概需要几秒时间)。当再次单击任务,出现如下情形表示停止完毕。然后修改执行开始时间,执行结算时间。在恢复任务调度,就可以实现调度时间的修改 2、 任务处理的问题 a) Schedule主要是提供任务调度的分配管理。每一个任务是否执行成功,是通过业务方的bean来实现的。 b) 你需求的例子,我理解的解决方案如下: i. 你从云梯拉下来100万数据放到保险应用的数据库中。这个表中有两个关键字段USER_ID和STS(状态 0-未发送,1-已发送) ii. 在bean的selectTasks方法的查询sql中除了根据任务进行分片外,还需要增加状态条件。例如 USER_ID % 10 in( ?,?,?) AND sts =0 iii. 在bean的execute方法中,在发送完消息后,你还需要 修改数据状态 update table STS =1 where USER_ID =? 。这样下次就不会取到这条数据了。 iv. 这样就可以保障机器重新启动后,也不会出现问题。你可以参考DBDemoSingle.java的实现模式。你使用的接口应该是IScheduleTaskDealSingle。 如果旺旺的接口支持批量发送消息的时候,你才需要使用IScheduleTaskDealMulti接口。


转:https://my.oschina.net/u/1867229/blog/850598



推荐阅读
author-avatar
手浪用户2602914837
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有