1.scheduled Դ??
2.不提你可能不知道,spring定时任务的数字星期域不符合常规的cron定义
3.SpringBoot动态定时任务的实现
4.scheduledthreadpoolexecutor åå§åå¤å°ä¸ª
5.å¸¦ä½ å¦ä¼åºåScheduledThreadPoolExecutorä¸Timer
6.可动态配置的Schedule设计
scheduled Դ??
本文介绍 Java 实现定时任务的三种方法:sleep、Timer 和 ScheduledExecutorService。
第一种方法是使用 sleep,通过在死循环中添加 sleep 休眠逻辑,实现按照固定频率运行的用户随机锁源码定时任务。这种方式比较直接,但只能按固定频率运行,且在 JDK 8 中使用了 Lambda 表达式。
第二种方法是使用 Timer 类,它在 JDK 1.3 中内置。可以设置首次执行的延迟时间、首次执行的具体日期时间,以及执行频率。虽然比较简单,但 Timer 是线程安全的,且有一些缺陷需要注意,不推荐在复杂业务中使用。
第三种方法是使用 ScheduledExecutorService,它是 Timer 的替代者,基于线程池设计。可以避免 Timer 的一些问题,且任务支持并发调度执行,适用于实际复杂业务的需求。
总结,这三种方法在实现简单定时任务时都比较实用,但实际业务中还需考虑分布式、故障转移恢复等因素。推荐使用 ScheduledExecutorService 这种方法实现定时任务。
本文提供了参考,在不用框架的前提下实现定时任务。在小而美的场景下,这种方法效果不错。源码医院his系统Java 系列教程会继续更新,关注Java技术栈第一时间推送。
所有实战源码已上传至 GitHub 仓库,希望对读者有所帮助。
如果你觉得文章对你有帮助,请给个在看、转发,原创不易,你的鼓励将是我继续写作的动力。
本文版权属于 "Java技术栈",请遵循原创规则,禁止抄袭、洗稿。
不提你可能不知道,spring定时任务的数字星期域不符合常规的cron定义
了解Spring定时任务的基本配置后,许多开发者会发现其与cron表达式的某些不寻常之处。本文将深入探讨Spring定时任务的数字星期域与传统cron定义之间的差异。
在配置Spring定时任务时,使用@Scheduled(cron = "* * 1 * * *")可以轻松实现每天1点定时执行任务。但若尝试构建特定于星期一中午点的定时任务,您会发现cron表达式的应用与预期不符。
在cron表达式中,星期一对应的数字是2,表示从星期天(数字1)开始的一周循环。然而,当将此类cron表达式应用于Spring定时任务时,任务实际上会在下一次星期二的同一时间执行,而非预期的星期一。
这一现象同样存在于直接使用Spring的CronTask类,并传递cron表达式时。究其原因,Spring内部源码的io流框架源码处理逻辑导致了这一不一致性。在生成CronTrigger时,解析cron表达式的过程存在差异。
解析过程涉及对数字星期域进行特殊转换,将其从英文缩写转换为数字,并对特定值进行处理。其中的关键在于对daysOfWeek位数组的操作,该数组用于存储解析后的星期信息。
具体而言,解析过程首先将英文缩写转换为对应的数字表示,然后将数字域中"?"替换为"*",接着使用基础解析算法处理。最后,对daysOfWeek数组的第0位和第7位进行逻辑或操作,并将结果保存在第0位,同时清除第7位。这一处理方式导致了数字星期域与传统cron表达式之间的一天偏差。
尽管如此,网络上关于Spring定时任务的教程和文章多聚焦于cron表达式的基础解释,较少提及此类问题的详细原因。然而,解决方法相对简单且有效:在cron表达式中使用英文缩写的星期表示,而非数字。这样做能够避免因数字转换导致的定时任务执行时间偏移。
春代码设计人员选择这种处理方式可能与与Crontab中的cron表达式格式以及Linux计划任务的兼容性有关。Crontab采用0-7的数字表示星期,同时其格式在秒域的处理上与cron表达式有所不同。
综上所述,对于在Spring中使用cron表达式配置定时任务的场景,推荐使用英文缩写来表示星期域。这样可以确保任务执行时间的准确性,并避免由于数字转换而导致的时间偏移。
SpringBoot动态定时任务的视频源码提取地址实现
1. Spring 定时任务的简单实现
在Spring Boot中使用定时任务,只需要@EnableScheduling开启定时任务支持,在需要调度的方法上添加@Scheduled注解。这样就能够在项目中开启定时调度功能了,支持通过cron、fixedRate、fixedDelay等灵活的控制执行周期和频率。
1.1 缺点周期一旦指定,想要更改必须要重启应用
1.2 需求热更新定时任务的执行周期,基于cron表达式并支持外部存储,如数据库,nacos等
最小改造兼容现有的定时任务(仅需添加一个注解)
动态增加定时任务
2.Spring 定时任务源码分析2.1 @EnableScheduling 引入了配置类 SchedulingConfiguration
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Import(SchedulingConfiguration.class)@Documentedpublic@interfaceEnableScheduling{ }2.2 SchedulingConfiguration只配置了一个bean,ScheduledAnnotationBeanPostProcessor从名字就知道该类实现BeanPostProcessor接口
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{ @Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){ returnnewScheduledAnnotationBeanPostProcessor();}}2.3 ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization实现,可见具体处理@Scheduled实现定时任务的是processScheduled方法
@OverridepublicObjectpostProcessAfterInitialization(Objectbean,StringbeanName){ if(beaninstanceofAopInfrastructureBean||beaninstanceofTaskScheduler||beaninstanceofScheduledExecutorService){ //IgnoreAOPinfrastructuresuchasscopedproxies.returnbean;}Class<?>targetClass=AopProxyUtils.ultimateTargetClass(bean);if(!this.nonAnnotatedClasses.contains(targetClass)&&AnnotationUtils.isCandidateClass(targetClass,Arrays.asList(Scheduled.class,Schedules.class))){ //获取bean的方法及@Scheduled映射关系Map<Method,Set<Scheduled>>annotatedMethods=MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>)method->{ Set<Scheduled>scheduledMethods=AnnotatedElementUtils.getMergedRepeatableAnnotations(method,Scheduled.class,Schedules.class);return(!scheduledMethods.isEmpty()?scheduledMethods:null);});if(annotatedMethods.isEmpty()){ this.nonAnnotatedClasses.add(targetClass);if(logger.isTraceEnabled()){ logger.trace("No@Scheduledannotationsfoundonbeanclass:"+targetClass);}}else{ //Non-emptysetofmethodsannotatedMethods.forEach((method,scheduledMethods)->//处理@Scheduled注解scheduledMethods.forEach(scheduled->processScheduled(scheduled,method,bean)));if(logger.isTraceEnabled()){ logger.trace(annotatedMethods.size()+"@Scheduledmethodsprocessedonbean'"+beanName+"':"+annotatedMethods);}}}returnbean;}2.4 以下仅贴出ScheduledAnnotationBeanPostProcessor.processScheduled处理cron表达式的关键实现,
privatefinalScheduledTaskRegistrarregistrar;publicScheduledAnnotationBeanPostProcessor(){ this.registrar=newScheduledTaskRegistrar();}protectedvoidprocessScheduled(Scheduledscheduled,Methodmethod,Objectbean){ try{ //将定时任务方法,转为RunnableRunnablerunnable=createRunnable(bean,method);booleanprocessedSchedule=false;Set<ScheduledTask>tasks=newLinkedHashSet<>(4);//Determineinitialdelay//处理scheduled.initialDelay()的值,略过...//CheckcronexpressionStringcron=scheduled.cron();if(StringUtils.hasText(cron)){ Stringzone=scheduled.zone();if(this.embeddedValueResolver!=null){ //${ }变量值表达式的转换cron=this.embeddedValueResolver.resolveStringValue(cron);zone=this.embeddedValueResolver.resolveStringValue(zone);}if(StringUtils.hasLength(cron)){ Assert.isTrue(initialDelay==-1,"'initialDelay'notsupportedforcrontriggers");processedSchedule=true;if(!Scheduled.CRON_DISABLED.equals(cron)){ TimeZonetimeZone;if(StringUtils.hasText(zone)){ timeZone=StringUtils.parseTimeZoneString(zone);}else{ timeZone=TimeZone.getDefault();}//创建cron触发器CronTrigger对象,并注册CronTasktasks.add(this.registrar.scheduleCronTask(newCronTask(runnable,newCronTrigger(cron,timeZone))));}}}//处理fixedDelay和fixedRate,及ScheduledTask保存用于销毁,略过...}//略过catchException...}以上通过this.registrar.scheduleCronTask实现cron定时任务注册或初始化
3.动态定时任务的实现实现思路: 重写ScheduledAnnotationBeanPostProcessor.processScheduled方法,修改处理cron的部分代码,使用this.registrar.scheduleTriggerTask注册或初始化定时任务
3.1 相关类图classDiagramDisposableBean<|--DynamicCronScheduleTaskManagerEnvironmentAware<|--EnvironmentDynamicCronHandlerAbstractDynamicCronHandler<|--EnvironmentDynamicCronHandlerTrigger<|--DynamicCronTriggerEnvironmentAware:+setEnvironment()DisposableBean:+destroy()voidTrigger:+nextExecutionTime(TriggerContexttriggerContext)DateclassDynamicCronScheduleTaskManager{ +Map<String,ScheduledTask>dynamicScheduledTaskMap-ScheduledTaskRegistrarregistrar+addTriggerTask(StringcronName,TriggerTasktask)ScheduledTask+contains(StringcronName)boolean+updateTriggerTask(StringcronName)void+removeTriggerTask(StringcronName)void}classAbstractDynamicCronHandler{ -DynamicCronScheduleTaskManagerdynamicCronScheduleTaskManager;+getCronExpression(StringcronName)String+updateTriggerTash(StringcronName)void}classEnvironmentDynamicCronHandler{ +Environmentenvironment+environmentChangeEvent(EnvironmentChangeEventevent)void}classDynamicCronTrigger{ -StringcronName-AbstractDynamicCronHandlerdynamicCronHandler-StringcronExpression-CronSequenceGeneratorsequenceGenerator}classScheduledDynamicCron{ +value()String+cronName()String+handler()Class<?extendsAbstractDynamicCronHandler>}3.2 DynamicCronScheduleTaskManagerimportorg.springframework.beans.factory.DisposableBean;importorg.springframework.scheduling.config.ScheduledTask;importorg.springframework.scheduling.config.ScheduledTaskRegistrar;importorg.springframework.scheduling.config.TriggerTask;importjava.util.HashMap;importjava.util.Map;/***@authorHuangJS*@date--:下午*/publicclassDynamicCronScheduleTaskManagerimplementsDisposableBean{ privateMap<String,ScheduledTask>dynamicScheduledTaskMap=newHashMap<>();ScheduledTaskRegistrarregistrar;//添加定时任务publicScheduledTaskaddTriggerTask(StringcronName,TriggerTasktask){ ScheduledTaskscheduledTask=dynamicScheduledTaskMap.get(cronName);if(scheduledTask!=null){ scheduledTask.cancel();}scheduledTask=this.registrar.scheduleTriggerTask(task);dynamicScheduledTaskMap.put(cronName,scheduledTask);returnscheduledTask;}publicbooleancontains(StringcronName){ returnthis.dynamicScheduledTaskMap.containsKey(cronName);}//更新定时任务的触发时机publicvoidupdateTriggerTask(StringcronName){ ScheduledTaskscheduledTask=dynamicScheduledTaskMap.get(cronName);if(scheduledTask==null){ thrownewIllegalStateException("InvalidcronName""+cronName+"",nofundScheduledTask");}scheduledTask.cancel();scheduledTask=this.registrar.scheduleTriggerTask((TriggerTask)scheduledTask.getTask());dynamicScheduledTaskMap.put(cronName,scheduledTask);}//移除定时任务publicvoidremoveTriggerTask(StringcronName){ ScheduledTaskscheduledTask=dynamicScheduledTaskMap.remove(cronName);if(scheduledTask!=null){ scheduledTask.cancel();}}@Overridepublicvoiddestroy()throwsException{ for(ScheduledTaskvalue:dynamicScheduledTaskMap.values()){ value.cancel();}this.dynamicScheduledTaskMap.clear();}}3.3 AbstractDynamicCronHandlerpublicabstractclassAbstractDynamicCronHandler{ @AutowiredprivateDynamicCronScheduleTaskManagerdynamicCronScheduleTaskManager;/***获取cron表达式*@return*/publicabstractStringgetCronExpression(StringcronName);/***更新cronName对应的定时任务的触发时机*@paramcronName*/publicvoidupdateTriggerTask(StringcronName){ dynamicCronScheduleTaskManager.updateTriggerTask(cronName);}}3.4 EnvironmentDynamicCronHandler基于Environment,在刷新配置时,自动刷新定时任务的触发时机,支持分布式多节点集群部署。
如,cron表达式配置在nacos,更新nacos上的配置时由于监听了EnvironmentChangeEvent事件实现了定时任务的触发时机的更新
importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.cloud.context.environment.EnvironmentChangeEvent;importorg.springframework.context.EnvironmentAware;importorg.springframework.context.event.EventListener;importorg.springframework.core.env.Environment;/***@authorHuangJS*@date--:上午*/publicclassEnvironmentDynamicCronHandlerextendsAbstractDynamicCronHandlerimplementsEnvironmentAware{ privatefinalLoggerlogger=LoggerFactory.getLogger(EnvironmentDynamicCronHandler.class);privateEnvironmentenvironment;@OverridepublicStringgetCronExpression(StringcronName){ try{ returnenvironment.getProperty(cronName);}catch(Exceptione){ logger.error(e.getMessage(),e);}returnnull;}@OverridepublicvoidsetEnvironment(Environmentenvironment){ this.environment=environment;}@EventListenerpublicvoidenvironmentChangeEvent(EnvironmentChangeEventevent){ for(Stringkey:event.getKeys()){ if(this.dynamicCronScheduleTaskManager.contains(key)){ this.dynamicCronScheduleTaskManager.updateTriggerTask(key);}}}}3.5 DynamicCronTriggerpublicclassDynamicCronTriggerimplementsTrigger{ privatefinalstaticLoggerLOGGER=LoggerFactory.getLogger(DynamicCronTrigger.class);privateStringcronName;privateAbstractDynamicCronHandlerdynamicCronHandler;privateStringcronExpression;privateCronSequenceGeneratorsequenceGenerator;publicDynamicCronTrigger(StringcronName,AbstractDynamicCronHandlerdynamicCronHandler){ this.cronName=cronName;this.dynamicCronHandler=dynamicCronHandler;}@OverridepublicDatenextExecutionTime(TriggerContexttriggerContext){ StringcronExpression=dynamicCronHandler.getCronExpression(cronName);if(cronExpression==null){ returnnull;}if(this.sequenceGenerator==null||!cronExpression.equals(this.cronExpression)){ try{ this.sequenceGenerator=newCronSequenceGenerator(cronExpression);this.cronExpression=cronExpression;}catch(Exceptione){ LOGGER.error(e.getMessage(),e);}}Datedate=triggerContext.lastCompletionTime();if(date!=null){ Datescheduled=triggerContext.lastScheduledExecutionTime();if(scheduled!=null&&date.before(scheduled)){ //Previoustaskapparentlyexecutedtooearly...//Let'ssimplyusethelastcalculatedexecutiontimethen,//inordertopreventaccidentalre-firesinthesamesecond.date=scheduled;}}else{ date=newDate();}returnthis.sequenceGenerator.next(date);}}3.6 注解类ScheduledDynamicCron@Target({ ElementType.METHOD,ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic@interfaceScheduledDynamicCron{ /***动态cron名称*@return*/@AliasFor("cronName")Stringvalue()default"";/***动态crscheduledthreadpoolexecutor åå§åå¤å°ä¸ª
éè¿Executors,å¯ä»¥å建3ç§ç±»åçThreadPoolExecutorã
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
1.FixedThreadPool
FixedThreadPool被称为å¯éç¨åºå®çº¿ç¨æ°ç线ç¨æ± ãä¸é¢æ¯FixedThreadPoolçæºä»£ç å®ç°ã
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
FixedThreadPoolä¸å¤ä½ç空é²çº¿ç¨ä¼è¢«ç«å³ç»æ¢ã
FixedThreadPoolçexecute()è¿è¡ç¤ºæå¾å¦ä¸æ示ã
å¦æå½åè¿è¡ç线ç¨æ°å°äºcorePoolSize,åå建æ°çº¿ç¨æ¥æ§è¡ä»»å¡ã
å½åè¿è¡ç线ç¨æ°çäºcorePoolSize,å°ä»»å¡å å ¥LinkedBlockingQueueã
线ç¨æ§è¡å®1ä¸çä»»å¡åï¼ä¼åå¤ä»é»å¡éåä¸åä»»å¡æ§è¡ã
å¸¦ä½ å¦ä¼åºåScheduledThreadPoolExecutorä¸Timer
æè¦ï¼æ¬æç®åä»ç»ä¸ScheduledThreadPoolExecutorç±»ä¸Timerç±»çåºå«ï¼ScheduledThreadPoolExecutorç±»ç¸æ¯äºTimerç±»æ¥è¯´ï¼ç©¶ç«æåªäºä¼å¿ï¼ä»¥åäºè åå«å®ç°ä»»å¡è°åº¦çç®å示ä¾ãJDK1.5å¼å§æä¾ScheduledThreadPoolExecutorç±»ï¼ScheduledThreadPoolExecutor类继æ¿ThreadPoolExecutorç±»éç¨çº¿ç¨æ± å®ç°äºä»»å¡çå¨ææ§è°åº¦åè½ãå¨JDK1.5ä¹åï¼å®ç°ä»»å¡çå¨ææ§è°åº¦ä¸»è¦ä½¿ç¨çæ¯Timerç±»åTimerTaskç±»ãæ¬æï¼å°±ç®åä»ç»ä¸ScheduledThreadPoolExecutorç±»ä¸Timerç±»çåºå«ï¼ScheduledThreadPoolExecutorç±»ç¸æ¯äºTimerç±»æ¥è¯´ï¼ç©¶ç«æåªäºä¼å¿ï¼ä»¥åäºè åå«å®ç°ä»»å¡è°åº¦çç®å示ä¾ã
äºè çåºå«çº¿ç¨è§åº¦Timeræ¯å线ç¨æ¨¡å¼ï¼å¦ææ个TimerTaskä»»å¡çæ§è¡æ¶é´æ¯è¾ä¹ ï¼ä¼å½±åå°å ¶ä»ä»»å¡çè°åº¦æ§è¡ã
ScheduledThreadPoolExecutoræ¯å¤çº¿ç¨æ¨¡å¼ï¼å¹¶ä¸éç¨çº¿ç¨æ± ï¼æ个ScheduledFutureTaskä»»å¡æ§è¡çæ¶é´æ¯è¾ä¹ ï¼ä¸ä¼å½±åå°å ¶ä»ä»»å¡çè°åº¦æ§è¡ã
ç³»ç»æ¶é´ææ度Timerè°åº¦æ¯åºäºæä½ç³»ç»çç»å¯¹æ¶é´çï¼å¯¹æä½ç³»ç»çæ¶é´ææï¼ä¸æ¦æä½ç³»ç»çæ¶é´æ¹åï¼åTimerçè°åº¦ä¸å精确ã
ScheduledThreadPoolExecutorè°åº¦æ¯åºäºç¸å¯¹æ¶é´çï¼ä¸åæä½ç³»ç»æ¶é´æ¹åçå½±åã
æ¯å¦æè·å¼å¸¸Timerä¸ä¼æè·TimerTaskæåºçå¼å¸¸ï¼å ä¸Timeråæ¯å线ç¨çãä¸æ¦æ个è°åº¦ä»»å¡åºç°å¼å¸¸ï¼åæ´ä¸ªçº¿ç¨å°±ä¼ç»æ¢ï¼å ¶ä»éè¦è°åº¦çä»»å¡ä¹ä¸åæ§è¡ã
ScheduledThreadPoolExecutoråºäºçº¿ç¨æ± æ¥å®ç°è°åº¦åè½ï¼æ个任å¡æåºå¼å¸¸åï¼å ¶ä»ä»»å¡ä»è½æ£å¸¸æ§è¡ã
ä»»å¡æ¯å¦å ·å¤ä¼å 级Timerä¸æ§è¡çTimerTaskä»»å¡æ´ä½ä¸æ²¡æä¼å 级çæ¦å¿µï¼åªæ¯æç §ç³»ç»çç»å¯¹æ¶é´æ¥æ§è¡ä»»å¡ã
ScheduledThreadPoolExecutorä¸æ§è¡çScheduledFutureTaskç±»å®ç°äºjava.lang.Comparableæ¥å£åjava.util.concurrent.Delayedæ¥å£ï¼è¿ä¹å°±è¯´æäºScheduledFutureTaskç±»ä¸å®ç°äºä¸¤ä¸ªé常éè¦çæ¹æ³ï¼ä¸ä¸ªæ¯java.lang.Comparableæ¥å£çcompareToæ¹æ³ï¼ä¸ä¸ªæ¯java.util.concurrent.Delayedæ¥å£çgetDelayæ¹æ³ãå¨ScheduledFutureTaskç±»ä¸compareToæ¹æ³å®ç°äºä»»å¡çæ¯è¾ï¼è·ç¦»ä¸æ¬¡æ§è¡çæ¶é´é´éççä»»å¡ä¼æå¨åé¢ï¼ä¹å°±æ¯è¯´ï¼è·ç¦»ä¸æ¬¡æ§è¡çæ¶é´é´éççä»»å¡çä¼å 级æ¯è¾é«ãègetDelayæ¹æ³åè½å¤è¿åè·ç¦»ä¸æ¬¡ä»»å¡æ§è¡çæ¶é´é´éã
æ¯å¦æ¯æ对任å¡æåºTimerä¸æ¯æ对任å¡çæåºã
ScheduledThreadPoolExecutorç±»ä¸å®ä¹äºä¸ä¸ªéæå é¨ç±»DelayedWorkQueueï¼DelayedWorkQueueç±»æ¬è´¨ä¸æ¯ä¸ä¸ªæåºéåï¼ä¸ºéè¦è°åº¦çæ¯ä¸ªä»»å¡æç §è·ç¦»ä¸æ¬¡æ§è¡æ¶é´é´éç大å°æ¥æåº
è½å¦è·åè¿åçç»æTimerä¸æ§è¡çTimerTaskç±»åªæ¯å®ç°äºjava.lang.Runnableæ¥å£ï¼æ æ³ä»TimerTaskä¸è·åè¿åçç»æã
ScheduledThreadPoolExecutorä¸æ§è¡çScheduledFutureTask类继æ¿äºFutureTaskç±»ï¼è½å¤éè¿Futureæ¥è·åè¿åçç»æã
éè¿ä»¥ä¸å¯¹ScheduledThreadPoolExecutorç±»åTimerç±»çåæ对æ¯ï¼ç¸ä¿¡å¨JDK1.5ä¹åï¼å°±æ²¡æ使ç¨Timeræ¥å®ç°å®æ¶ä»»å¡è°åº¦çå¿ è¦äºã
äºè ç®åç示ä¾è¿éï¼ç»åºä½¿ç¨TimeråScheduledThreadPoolExecutorå®ç°å®æ¶è°åº¦çç®å示ä¾ï¼ä¸ºäºç®ä¾¿ï¼æè¿éå°±ç´æ¥ä½¿ç¨å¿åå é¨ç±»çå½¢å¼æ¥æ交任å¡ã
Timerç±»ç®å示ä¾æºä»£ç 示ä¾å¦ä¸æ示ã
packageio.binghe.concurrent.lab;importjava.util.Timer;importjava.util.TimerTask;/***@authorbinghe*@version1.0.0*@descriptionæµè¯Timer*/publicclassTimerTest{ publicstaticvoidmain(String[]args)throwsInterruptedException{ Timertimer=newTimer();timer.scheduleAtFixedRate(newTimerTask(){ @Overridepublicvoidrun(){ System.out.println("æµè¯Timerç±»");}},,);Thread.sleep();timer.cancel();}}è¿è¡ç»æå¦ä¸æ示ã
æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»ScheduledThreadPoolExecutorç±»ç®å示ä¾æºä»£ç 示ä¾å¦ä¸æ示ã
packageio.binghe.concurrent.lab;importjava.util.concurrent.*;/***@authorbinghe*@version1.0.0*@descriptionæµè¯ScheduledThreadPoolExecutor*/publicclassScheduledThreadPoolExecutorTest{ publicstaticvoidmain(String[]args)throwsInterruptedException{ ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(3);scheduledExecutorService.scheduleAtFixedRate(newRunnable(){ @Overridepublicvoidrun(){ System.out.println("æµè¯æµè¯ScheduledThreadPoolExecutor");}},1,1,TimeUnit.SECONDS);//主线ç¨ä¼ç ç§Thread.sleep();System.out.println("æ£å¨å ³é线ç¨æ± ...");//å ³é线ç¨æ± scheduledExecutorService.shutdown();booleanisClosed;//çå¾ çº¿ç¨æ± ç»æ¢do{ isClosed=scheduledExecutorService.awaitTermination(1,TimeUnit.DAYS);System.out.println("æ£å¨çå¾ çº¿ç¨æ± ä¸çä»»å¡æ§è¡å®æ");}while(!isClosed);System.out.println("ææ线ç¨æ§è¡ç»æï¼çº¿ç¨æ± å ³é");}}è¿è¡ç»æå¦ä¸æ示ã
æµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræ£å¨å ³é线ç¨æ± ...æµè¯æµè¯ScheduledThreadPoolExecutoræ£å¨çå¾ çº¿ç¨æ± ä¸çä»»å¡æ§è¡å®æææ线ç¨æ§è¡ç»æï¼çº¿ç¨æ± å ³é注æï¼å ³äºTimeråScheduledThreadPoolExecutorè¿æå ¶ä»ç使ç¨æ¹æ³ï¼è¿éï¼æå°±ç®åååºä»¥ä¸ä¸¤ä¸ªä½¿ç¨ç¤ºä¾ï¼æ´å¤ç使ç¨æ¹æ³å¤§å®¶å¯ä»¥èªè¡å®ç°ã
æ¬æå享èªå为äºç¤¾åºããé«å¹¶åãScheduledThreadPoolExecutorä¸Timerçåºå«åç®å示ä¾ãï¼ä½è ï¼å°æ²³ã
可动态配置的Schedule设计
1.背景
定时任务是实际开发中常见的一类功能,例如每天早上凌晨对前一天的注册用户数量、渠道来源进行统计,并以邮件报表的方式发送给相关人员。相信这样的需求,每个开发伙伴都处理过。
你可以使用Linux的Crontab启动应用程序进行处理,或者直接使用Spring的Schedule对任务进行调度,还可以使用分布式调度系统,营销利器广告源码如果xxl-job等。相信你已经轻车熟路、习以为常。直到有一天你接到了一个新需求:
1.新建一组任务,周期性的执行指定SQL并将结果以邮件的方式发送给特定人群;2.比较方便的对任务进行管理,比如启动、停止,修改调度周期等;3.动态添加、移除任务,不需要频繁的修改、发布程序;
停顿几分钟,简单思考一下,有哪几种实现思路呢?
本篇文章将从以下几部分进行讨论:
1.SpringSchedule配置和使用。首先我们将介绍Demo的骨架,并基于Spring-Boot完成Schedule的配置;2.数据库定时轮询方案。使用SpringSchedule定时轮询数据库,并执行相应任务。在执行任务策略中,我们将尝试同步和异步执行两种方案,并对其优缺点进行分析;3.基于TaskScheduler动态配置方案。基于数据库轮询或配置中心两种方案动态的对SpringTaskScheduler进行配置,以实现动态管理任务的目的;4.我们进入分布式环境,利用多个冗余节点解决系统高可用问题,同时使用分布式锁保障只会有一个任务同时执行;
2.SpringScheduleSpringBoot上的Schedule的使用非常简单,无需增加新的依赖,只需简单配置即可。
1.使用@EnableScheduling启用Schedule;2.在要调度的方法上增加@Scheduled;
首先,我们需要在启动类上添加@EnableScheduling注解,该注解将启用SchedulingConfiguration配置类帮我们完成最基本的配置。
@SpringBootApplication@EnableSchedulingpublicclassConfigurableScheduleDemoApplication{ publicstaticvoidmain(String[]args){ SpringApplication.run(ConfigurableScheduleDemoApplication.class,args);}}启用Schedule配置之后,在需要被调度的方法上增加@Scheduled注解。
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}runTask任务延迟1s进行初始化,并以5s为间隔进行调度。
Scheduled注解类的详细配置如下:
配置含义样例cronlinuxcrontab表达式@Scheduled(cron="*/5****MON-FRI")工作日,每5s调度一次fixedDelay固定间隔,上次运行结束,与下次启动运行,相隔固定时长@Scheduled(fixedDelay=)运行结束后,5S后启动一次调度fixedDelayString与fixedDelay一致fixedRate固定周期,前后两次运行相隔固定的时长@Scheduled(fixedRate=)前后两个任务,间隔5秒fixedRateString与fixedRate一致initialDelay第一次执行,间隔时间@Scheduled(initialDelay=,fixedRate=)第一次执行,延时1秒,以后以5秒为周期进行调度initialDelayString与initialDelay一致环境搭建完成,让我们开始第一个方案。
3.数据库定时轮询使用数据库来管理任务,通过轮询的方案,进行动态调度。首先,我们看下最简单的方案:串行执行方案。
3.1.串行执行方案整体思路非常简单,流程如下:
主要分如下几步:
1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.根据数据库的任务配置信息,依次遍历并执行任务;3.任务执行完成后,经过计算获得下一次调度时间,将其写回到数据库;4.等待下一次任务调度。
核心代码如下:
@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载需要运行的任务://1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//依次遍历待运行任务,执行对于的任务for(TaskDefinitionV2task:shouldRunTasks){ //DoubleCheckif(task.shouldRun(now)){ //执行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}方案简单但非常有效,那该方案存在哪些问题呢?最主要的问题就是:任务串行执行,会导致后面任务出现延时运行;同时,下一轮检查也会被delay。
例如,依次加载了待执行任务task1、task2、task3。其中task1耗时5秒,task2耗时5秒,task3耗时1秒,由于三个任务串行执行,task2将延时5秒,task3延时秒;下一轮检查距上次启动相差秒。
究其根本,核心问题是调度线程和运行线程是同一个线程,调度的运行和任务的运行相互影响。
让我们看一个改进方案:并行执行方案。
3.2.并行执行方案整体执行流程如下:
相比之前的方案,新方案引入了线程池,每一个任务对应一个线程池,避免任务间的相互影响;任务在线程池中异步处理,避免了调度线程的延时。具体流程如下:
1.步骤一不变,在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.依次遍历任务,将任务提交到专有线程池中异步执行,调度线程直接返回;3.任务在线程池中运行,结束后更新下一次的运行时间;4.调度线程重新从数据库中获取待执行任务,在将任务提交至线程池中,如果有任务正在执行,使用线程池拒绝策略,抛弃最老的任务;
核心代码如下:
Spring调度任务,每1秒运行一次:
@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载所有待运行的任务//1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//遍历待运行任务for(TaskDefinitionV2task:shouldRunTasks){ //1.根据TaskId获取任务对应的线程池//2.将任务提交至线程池中this.executorServiceForTask(task.getId()).submit(newTaskRunner(task.getId()));}}自定义线程池,每个线程池最多只有一个线程,空闲超过秒后,线程自动回收,线程饱和时,直接丢弃最老的任务:
privateExecutorServiceexecutorServiceForTask(LongtaskId){ returnthis.executorServiceRegistry.computeIfAbsent(taskId,id->{ BasicThreadFactorythreadFactory=newBasicThreadFactory.Builder()//指定线程池名称.namingPattern("Async-Task-"+taskId+"-Thread-%d")//设置线程为后台线程.daemon(true).build();//线程池核心配置://1.每个线程池最多只有一个线程//2.线程空闲超过秒进行自动回收//3.直接使用交互器,线程空闲进行任务交互//4.使用指定的线程工厂,设置线性名称//5.线程池饱和,自动丢弃最老的任务returnnewThreadPoolExecutor(0,1,L,TimeUnit.SECONDS,newSynchronousQueue<>(),threadFactory,newThreadPoolExecutor.DiscardOldestPolicy());});}最后,在线程池中运行的Task如下:
privateclassTaskRunnerimplementsRunnable{ privatefinalDatenow=newDate();privatefinalLongtaskId;publicTaskRunner(LongtaskId){ this.taskId=taskId;}@Overridepublicvoidrun(){ //重新加载任务,保持最新的任务状态TaskDefinitionV2task=definitionV2Repository.findById(this.taskId).orElse(null);if(task!=null&&task.shouldRun(now)){ //运行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}4.TaskScheduler配置方案该方案的核心为:绕过@Schedule注解,直接对Spring底层核心类TaskScheduler进行配置。
TaskScheduler接口是Spring对调度任务的一个抽象,更是@Schedule背后默默的支持者,首先我们看下这个接口定义。
publicinterfaceTaskScheduler{ ScheduledFutureschedule(Runnabletask,Triggertrigger);ScheduledFutureschedule(Runnabletask,InstantstartTime);ScheduledFutureschedule(Runnabletask,DatestartTime);ScheduledFuturescheduleAtFixedRate(Runnabletask,InstantstartTime,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,DatestartTime,longperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,longperiod);ScheduledFuturescheduleWithFixedDelay(Runnabletask,InstantstartTime,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,DatestartTime,longdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,longdelay);}满满的都是schedule接口,其他的比较简单就不过多叙述了,重点说下Trigger这个接口,首先看下这个接口的定义:
publicinterfaceTrigger{ DatenextExecutionTime(TriggerContexttriggerContext);}只有一个方法,获取下次执行的时间。在任务执行完成后,会调用Trigger的nextExecutionTime获取下一次运行时间,从而实现周期性调度。
CronTrigger是Trigger的最常见实现,以linuxcrontab的方式配置调度任务,如:
scheduler.schedule(task,newCronTrigger("-**MON-FRI"));基础部分简单介绍到这,让我们看下数据库动态配置方案。
4.1数据库动态配置方案整体设计如下:
仍旧是轮询数据库方式,详细流程如下:
1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取所有任务;2.依次遍历任务,与内存中的TaskEntry(任务与状态)进行比对,动态的向TaskScheduler中添加或取消调度任务;3.由TaskScheduler负责实际的任务调度;
核心代码如下:
@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndConfig(){ //加载所有的任务信息List<TaskDefinitionV3>tasks=repository.findAll();//遍历任务进行任务检查for(TaskDefinitionV3task:tasks){ //获取内存任务状态TaskEntrytaskEntry=this.taskEntry.computeIfAbsent(task.getId(),TaskEntry::new);if(task.isEnable()&&taskEntry.isStop()){ //任务为可用,运行状态为停止,则重新进行schedule注册ScheduledFuture<?>scheduledFuture=this.taskScheduler.scheduleWithFixedDelay(newTaskRunner(task),task.getDelay()*);taskEntry.setScheduledFuture(scheduledFuture);log.info("successtostartscheduletaskfor{ }",task);}elseif(task.isDisable()&&taskEntry.isRunning()){ //任务为禁用,运行状态为运行中,停止正在运行在任务taskEntry.stop();log.info("successtostopscheduletaskfor{ }",task);}}}核心辅助类:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}0有没有发现,以上方案都有一个共同的缺陷:基于数据库轮询获取任务,加大了数据库压力。理论上,只有在配置发生变化时才有必要对任务进行更新,接下来让我们看下改进方案:基于配置中心的方案。
4.2配置中心通知方案整体设计如下:
核心流程如下:
1.应用启动时,从配置中心中获取调度的配置信息,并完成对TaskScheduler的配置;2.当配置发送变化时,配置中心会主动将配置推送到应用程序,应用程序在接收到变化通知时,动态的增加或取消调度任务;3.任务的实际调度仍旧由TaskScheduler完成。
由于手底下没有配置中心,暂时没有coding,思路很简单,有条件的同学可以自行完成。
5.分布式环境下应用以上方案,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就停止了,为了避免这种情况的发生,需要提升系统的可用性,实现冗余部署和自动化容灾。
以上方案,如果部署多个节点会发生什么?是的,会出现任务被多次调度的问题,为了保障在同一时刻只有一个任务在运行,需要为任务增加一个排他锁。同时,由于排他锁的存在,当一个节点处问题后,另一个节点在调度时会自动获取锁,从而解系统的单点问题。
为了简单,我们使用Redis的分布式锁。
5.1.环境搭建Redisson是Redis的一个富客户端,提供了很多高级的数据结构。本次,我们将使用RLock对应用进行保护。
首先,在pom中引入RedissonStarter。
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}1然后,在application.properties文件中增加Redis配置,具体如下:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}.2引入分布式锁最后,就可以直接使用分布式锁对任务执行进行保护了,代码如下:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}3备注:
Redis是典型的AP应用,而分布式锁严格意义上来说是CP。所以基于Redis的分布式锁只能使用在非严格环境中,比如我们的数据报表需求。如果设计金钱,需要使用CP实现,如Zookeeper或etcd等。
6.小结本文从Spring的Schedule出发,依次对数据库轮询方案、TaskScheduler配置方案进行详细讲解,以实现对调度任务的可配置化。最后,使用Redis分布式锁有效解决了分布式环境下任务重复调度和自动容灾问题。
仍旧是那句话,架构设计没有更好,只有最适合。同学们可以根据自己的需求自取。
References[1]源码:/litao/books/tree/master/configurable-schedule