热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Quartz实现数据库动态配置定时任务

为什么80%的码农都做不了架构师?项目实战或许实现的方式跟之前的代码有点不一样的1.定时任务的配置信息ConfigurationpublicclassSche

为什么80%的码农都做不了架构师?>>>   hot3.png

项目实战

或许实现的方式跟之前的代码有点不一样的

1.定时任务的配置信息

@Configuration
public class ScheduleConfigration {@Autowiredprivate ScheduleInfoAction scheduleInfoAction;@Autowiredprivate ChannelSyncTask ChannelSyncTask;/*** 用于5分钟轮训检查cron修改(基本不需要修改)* @return*/@Bean(name = "jobDetail")public MethodInvokingJobDetailFactoryBean jobDetail(){MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();methodInvokingJobDetailFactoryBean.setConcurrent(false);methodInvokingJobDetailFactoryBean.setTargetObject(scheduleInfoAction);methodInvokingJobDetailFactoryBean.setTargetMethod("reScheduleJob");return methodInvokingJobDetailFactoryBean;}/*** 用于5分钟轮训检查cron修改(基本不需要修改)* @return*/@Bean(name = "cronTrigger")public CronTriggerFactoryBean cronTrigger(){CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();cronTriggerFactoryBean.setJobDetail(jobDetail().getObject());// 设置默认刷新croncronTriggerFactoryBean.setCronExpression(Properties.getString("refresh.default.cron"));return cronTriggerFactoryBean;}/*** dycChannel 任务,需要添加新的定时任务,需要重复配置JobDetail,CronTrigger* @return*/@Bean(name = "channelSyncJobCronJobDetail")public MethodInvokingJobDetailFactoryBean channelSyncJobCronJobDetail(){MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();methodInvokingJobDetailFactoryBean.setConcurrent(false);methodInvokingJobDetailFactoryBean.setTargetObject(channelSyncTask);methodInvokingJobDetailFactoryBean.setTargetMethod("doTask");return methodInvokingJobDetailFactoryBean;}@Bean(name = "channelSyncJobCronTrigger")public CronTriggerFactoryBean channelSyncJobCronTrigger(){CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();cronTriggerFactoryBean.setJobDetail(channelSyncJobCronJobDetail().getObject());cronTriggerFactoryBean.setCronExpression("0 0 1 * * ?");return cronTriggerFactoryBean;}/*** repeat code JobDetail and CronTrigger ...**//*** 调度工厂* @return*/@Bean(name = "schedulerFactory")public SchedulerFactoryBean schedulerFactoryBean(){SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();//在添加新的触发器时,增加相应的触发器schedulerFactoryBean.setTriggers(cronTrigger().getObject(),channelSyncJobCronTrigger().getObject());return schedulerFactoryBean;}
}

2.刷新的定时任务

@Component
public class ScheduleInfoAction{private Logger logger = LoggerFactory.getLogger(ScheduleInfoAction.class);private Scheduler scheduler;public void reScheduleJob() throws SchedulerException {scheduler = (Scheduler) SpringContextUtils.getBean("schedulerFactory");// 数据库取任务列表List jobs = yunyingDao.getJobCodesFromDB(null);if (jobs != null && !jobs.isEmpty()) {for (String job : jobs) {TriggerKey triggerKey = new TriggerKey(job + "CronTrigger", Scheduler.DEFAULT_GROUP);CronTriggerImpl trigger = (CronTriggerImpl) scheduler.getTrigger(triggerKey);String dbCronExpression = getCronExpressionFromDB(job);String originConExpression;if (StringUtils.isNotBlank(dbCronExpression)){if (trigger != null){originConExpression = trigger.getCronExpression();if(!dbCronExpression.equalsIgnoreCase(originConExpression)){try{trigger.setCronExpression(dbCronExpression);scheduler.rescheduleJob(triggerKey, trigger);logger.info("jobCode:{}, dbCron:{}, originCron:{}",job,dbCronExpression,originConExpression);} catch (Exception e) {logger.error("jobCode:{}, dbCron:{}, originCron:{}, refresh Cron Error:{}" , job,dbCronExpression, originConExpression, e);}}}else {trigger = (CronTriggerImpl)SpringContextUtils.getBean(job + "CronTrigger");if (trigger != null){scheduler.scheduleJob(trigger);logger.info("scheduleJob:{}", job);}}}else {if (trigger != null){scheduler.unscheduleJob(triggerKey);logger.info("unscheduleJob:{}", job);}}}}}// 数据库取最新的Cron值private String getCronExpressionFromDB(String jobName){return yunyingDao.getCronByCode(jobName);}
}

3.设置定时任务

    1)基础抽象类的实现,(由于可能处于分布式环境中,需要使用zookeeper的分布式锁)

public abstract class BaseTask {private Logger logger = LoggerFactory.getLogger(BaseTask.class);public abstract void execute();public void doTask(){String clazzName = this.getClass().getSimpleName();String className = clazzName.substring(0,1).toLowerCase() + clazzName.substring(1);String lockPath = "/oper/schedule/" + className + "/lock";ZooKeeperClient zkClient = (ZooKeeperClient)SpringContextUtils.getBean("zkClient");CronTriggerImpl cronTrigger = (CronTriggerImpl)SpringContextUtils.getBean(className.replace("Task","Job") + "CronTrigger");logger.info("lock path:{},zkClient:{},trigger cron:{}", lockPath, zkClient, cronTrigger.getCronExpression());Boolean isGetLocker = false;InterProcessMutex lock = new InterProcessMutex(zkClient.getClient(), lockPath);try {if (lock.acquire(2000, TimeUnit.MILLISECONDS)){isGetLocker = true;execute();}} catch (Exception e) {logger.error("BaseTask 执行锁任务时抛错:", e);}finally {try {if(isGetLocker){lock.release();}} catch (Exception e) {logger.error("释放锁出错",e);}}}
}

需要执行的定时任务的代码实现继承BaseTask,并实现业务代码。

不过,这里我们中间又使用了Groovy代码,由于Groovy代码在这里可以直接对数据库进行操作,所以这里继承BaseTask 的定时任务,我们是调用Groovy Class 的方法。

    2)定时任务的实现

@Component
public class DycChannelSyncTask extends BaseTask{private Logger logger = LoggerFactory.getLogger(DycChannelSyncTask.class);public void execute() {logger.info("渠道信息开始执行任务:" + TimeUtils.formatTime(new Date(), TimeUtils.FORMAT_YYYYMMDDHHMMSS));//进行数据抽取存储,这里调的是groovy的方法channelSyncJob.doJob();logger.info("渠道信息任务执行完成:" + TimeUtils.formatTime(new Date(), TimeUtils.FORMAT_YYYYMMDDHHMMSS));}
}

4.需要引入的Maven

org.quartz-schedulerquartz2.2.1org.codehaus.groovygroovy-all2.4.6mysqlmysql-connector-java5.1.31org.apache.zookeeperzookeeperorg.slf4jslf4j-log4j12log4jlog4jorg.apache.curatorcurator-frameworkorg.apache.curatorcurator-recipes

5.数据库中定时任务的表结构

CREATE TABLE `tb_job_config` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',`jobCode` varchar(32) DEFAULT NULL COMMENT '商户',`jobName` varchar(32) DEFAULT NULL COMMENT '渠道编号',`cron` varchar(128) DEFAULT NULL,`jobDetail` longtext COMMENT '渠道名称',`status` smallint(1) DEFAULT NULL COMMENT '1、有效 2 无效',PRIMARY KEY (`id`),UNIQUE KEY `merchantNo_channelCode_unique` (`jobCode`,`jobName`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=164 DEFAULT CHARSET=utf8 COMMENT='任务调度配置表';
INSERT INTO `tb_job_config` (`id`, `jobCode`, `jobName`, `cron`, `jobDetail`, `status`)
VALUES ('139', 'channelUserSyncJob', '渠道用户同步', '0 0 0/1 * * ? ', '{\"intervalHours\":\"2\"}', '1');// 定时任务执行完,记录一下任务执行情况,有助于排查问题等等
CREATE TABLE `tb_job_execute_log` (`id` bigint(64) NOT NULL AUTO_INCREMENT,`jobCode` varchar(255) DEFAULT NULL COMMENT '任务编号',`totalNum` bigint(64) DEFAULT NULL COMMENT '记录总数',`successNum` bigint(64) DEFAULT NULL COMMENT '记录成功数',`errorNum` bigint(64) DEFAULT NULL COMMENT '记录错误数',`recordTime` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '记录时间',PRIMARY KEY (`id`),KEY `Index_jobCode` (`jobCode`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=52810 DEFAULT CHARSET=utf8 COMMENT='任务执行情况记录表';

另外注意的是数据库中命名的Job,要与实际代码中的命名规则保持一致。

我觉得使用之前的Job,获取我们这边的代码可以简化很多。


转:https://my.oschina.net/itommy/blog/974039



推荐阅读
author-avatar
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有