2019独角兽企业重金招聘Python工程师标准>>>
一、下载TBSchedule 源码
http://code.taobao.org/svn/tbschedule/
二、编译TBSchedule源码形成jar包
mvn package/直接mvn deploye/install 一步完成
三、安装到本地的maven仓库
mvn deploy/install
四、在项目pom配置文件中引用这个依赖文件
五、配置TBSchedule依赖的zookeeper配置,TBSchedule的调度依赖于zookeeper的节点配置和心跳链接。
# zooker service address
schedule.zookeeper.address=120.25.87.176:2181
# root path
schedule.root.path=/bbb/dd
# session timeout
schedule.session.timeout=60000
# userName
schedule.zookeeper.username=ScheduleAdmin
# password
schedule.zookeeper.password=password
init-method="init">
六、配置任务项和任务策略,将任务分片,查询任务
六-一、通过代码实现任务的分配
private Logger log = LoggerFactory
.getLogger(getClass());
TBScheduleManagerFactory scheduleManagerFactory;
/**
* 心跳连接时间
*/
private int heartBeatRate = 5*1000;
private int judgeDeadInterval = 1*60*1000;
private String taskParameter="";
private String strategyTaskParameter="";
/**
* jvm最大单线程数量
*/
private int numOfSingleServer = 2;
/**
* 最大线程组数量 总 最大线程数量 = jvm最大单线程数量 * 最大的线程组总量
* 将任务列表分成几个队列来处理
*/
private int assignNum = 2;
private String[] iPLists={"127.0.0.1"};
private String[] taskItems={"0","1","2","3","4","5","6","7","8","9"};
private String[] baseTaskTypeNames;
private String[] dealBeanNames;
/**
* 开始时间
*/
private String permitRunStartTime;
/**
* 结束时间
*/
private String permitRunEndTime;
/**
* 批处理时 每次处理任务的数量
*/
private int executeNumber = 1;
/**
* 处理完一批数据后的休息时间
*/
private int sleepTimeInterval = 0;
/**
* 采集不到数据的休眠时间
*/
private int sleepTimeNoData = 500;
/**
* 每次采集任务的数量
*/
private int fetchDataNumber = 500;
@Autowired
public void setScheduleManagerFactory(
TBScheduleManagerFactory tbScheduleManagerFactory) {
this.scheduleManagerFactory = tbScheduleManagerFactory;
}
@Test
public void initialConfigData() throws Exception {
}
@Override
public void afterPropertiesSet() throws Exception {
if(baseTaskTypeNames.length != dealBeanNames.length) {
throw new RuntimeException("task definition error, baseTaskTypeNames length not equals dealBeanNames length");
}
String baseTaskTypeName;
String dealBeanName;
for (int i = 0; i baseTaskTypeName = baseTaskTypeNames[i]; dealBeanName= dealBeanNames[i]; configTasks(baseTaskTypeName, dealBeanName); } } private void configTasks(String baseTaskTypeName, String dealBeanName) throws Exception, InterruptedException { while(this.scheduleManagerFactory.isZookeeperInitialSucess() == false){ Thread.sleep(1000); } scheduleManagerFactory.stopServer(null); Thread.sleep(1000); try { this.scheduleManagerFactory.getScheduleDataManager() .deleteTaskType(baseTaskTypeName); } catch (Exception e) { } ScheduleTaskType baseTaskType = new ScheduleTaskType(); baseTaskType.setBaseTaskType(baseTaskTypeName); baseTaskType.setDealBeanName(dealBeanName); baseTaskType.setHeartBeatRate(heartBeatRate); baseTaskType.setJudgeDeadInterval(judgeDeadInterval); baseTaskType.setTaskParameter(taskParameter); baseTaskType.setTaskItems(taskItems); baseTaskType.setPermitRunStartTime(permitRunStartTime); baseTaskType.setPermitRunEndTime(permitRunEndTime); baseTaskType.setExecuteNumber(executeNumber); baseTaskType.setSleepTimeInterval(sleepTimeInterval); baseTaskType.setSleepTimeNoData(sleepTimeNoData); baseTaskType.setFetchDataNumber(fetchDataNumber); this.scheduleManagerFactory.getScheduleDataManager() .createBaseTaskType(baseTaskType); String taskName = baseTaskTypeName + "$TEST"; String strategyName = baseTaskTypeName +"-Strategy"; try { this.scheduleManagerFactory.getScheduleStrategyManager() .deleteMachineStrategy(strategyName,true); } catch (Exception e) { e.printStackTrace(); } ScheduleStrategy strategy = new ScheduleStrategy(); strategy.setStrategyName(strategyName); strategy.setKind(ScheduleStrategy.Kind.Schedule); strategy.setTaskName(taskName); strategy.setTaskParameter(strategyTaskParameter); strategy.setNumOfSingleServer(numOfSingleServer); strategy.setAssignNum(assignNum); strategy.setIPList(iPLists); this.scheduleManagerFactory.getScheduleStrategyManager() .createScheduleStrategy(strategy); log.info("创建调度任务成功" + strategy.toString()); } 六-二、通过TBSchedule自带的控制台来实现任务的配置 七、任务的理解 1、同一个jvm中,不同线程之间如何防止任务被重复执行?一个scheduleServer的内部线程间如何进行任务分片? 答复:1、数据分片是在不同的jvm,获知同一个jvm中不同的线程组间起作用。在同一个线程组内的10个线程,是通过一个同步的任务队列来实现的。2、每个线程从队列中取任务执行,如果没有任务了,则由一个线程负责调用selectTasks方法再获取一批新的任务。 主要是设置休眠时间:即selectTasks方法返回列表的size为0后,进入休眠。休眠完成之后重新执行该定时任务 2、任务项设置的意义和selectTasks方法的参数含义 答复:1、 任务项(0,1,2,3,4,5,6,7,8,9)就是任务分片的策略。这个配置就是把数据分成10片。可以表示 ID的最后一位,也可以是一个独立的字段。根据你的业务来定。 2、 如果只有1组线程,则所有的任务片都分配给他。这时selectTasks方法的参数:taskItemNum =10, queryCondition由10个元素,分别对应0,1,2,3,4,5,6,7,8,9。 a) 如果只有2组线程,则任务片被分成两份。这时 b) 一个线程组的selectTasks方法的参数:taskItemNum =10, queryCondition有5个元素(0,2,4,6,8) 3、 另外一个线程组的selectTasks方法的参数:taskItemNum =10, queryCondition有5个元素(1,3,5,7,9) 4、 如果有10个线程组。则每组线程只会获取到1个任务片。这时selectTasks方法的参数:taskItemNum =10, queryCondition只有一个元素,对应0到9中的一个。 1、 执行期间和时间的修改功能 a) 在创建任务和修改任务的时候,有两个属性(执行开始时间,执行结算时间)用于控制任务的执行时间。 b) 时间格式遵循标准的cron格式 http://dogstar.javaeye.com/blog/116130 还增强了原来不支持的倒数第几天的能力。 c) 当时间到底开始时间的时候,就开始执行任务,到达结束时间则终止调度(不管是否所有的任务都处理完)。如果没有设置执行结束时间。则一直运行,直到selectTasks返回的记录数为0,就终止执行。等待下个开始运行时间在启动。 d) 如果要动态修改任务的执行时间区间,则先 点击“暂停”按钮,等所有的服务器都停止完毕(大概需要几秒时间)。当再次单击任务,出现如下情形表示停止完毕。然后修改执行开始时间,执行结算时间。在恢复任务调度,就可以实现调度时间的修改 2、 任务处理的问题 a) Schedule主要是提供任务调度的分配管理。每一个任务是否执行成功,是通过业务方的bean来实现的。 b) 你需求的例子,我理解的解决方案如下: i. 你从云梯拉下来100万数据放到保险应用的数据库中。这个表中有两个关键字段USER_ID和STS(状态 0-未发送,1-已发送) ii. 在bean的selectTasks方法的查询sql中除了根据任务进行分片外,还需要增加状态条件。例如 USER_ID % 10 in( ?,?,?) AND sts =0 iii. 在bean的execute方法中,在发送完消息后,你还需要 修改数据状态 update table STS =1 where USER_ID =? 。这样下次就不会取到这条数据了。 iv. 这样就可以保障机器重新启动后,也不会出现问题。你可以参考DBDemoSingle.java的实现模式。你使用的接口应该是IScheduleTaskDealSingle。 如果旺旺的接口支持批量发送消息的时候,你才需要使用IScheduleTaskDealMulti接口。