欢迎来到【iapp棋牌源码】【flink源码框架】【怎么屏蔽源码】scheduled 源码-皮皮网网站!!!

皮皮网

【iapp棋牌源码】【flink源码框架】【怎么屏蔽源码】scheduled 源码-皮皮网 扫描左侧二维码访问本站手机端

【iapp棋牌源码】【flink源码框架】【怎么屏蔽源码】scheduled 源码

2024-11-30 02:33:45 来源:{typename type="name"/} 分类:{typename type="name"/}

1.scheduled Դ??
2.不提你可能不知道,spring定时任务的数字星期域不符合常规的cron定义
3.SpringBoot动态定时任务的实现
4.scheduledthreadpoolexecutor 初始化多少个
5.带你学会区分ScheduledThreadPoolExecutor与Timer
6.可动态配置的Schedule设计

scheduled 源码

scheduled Դ??

       本文介绍 Java 实现定时任务的三种方法:sleep、Timer 和 ScheduledExecutorService。

       第一种方法是使用 sleep,通过在死循环中添加 sleep 休眠逻辑,实现按照固定频率运行的iapp棋牌源码定时任务。这种方式比较直接,但只能按固定频率运行,且在 JDK 8 中使用了 Lambda 表达式。

       第二种方法是使用 Timer 类,它在 JDK 1.3 中内置。可以设置首次执行的延迟时间、首次执行的具体日期时间,以及执行频率。虽然比较简单,但 Timer 是线程安全的,且有一些缺陷需要注意,不推荐在复杂业务中使用。

       第三种方法是使用 ScheduledExecutorService,它是 Timer 的替代者,基于线程池设计。可以避免 Timer 的一些问题,且任务支持并发调度执行,适用于实际复杂业务的需求。

       总结,这三种方法在实现简单定时任务时都比较实用,但实际业务中还需考虑分布式、故障转移恢复等因素。推荐使用 ScheduledExecutorService 这种方法实现定时任务。

       本文提供了参考,在不用框架的前提下实现定时任务。在小而美的场景下,这种方法效果不错。flink源码框架Java 系列教程会继续更新,关注Java技术栈第一时间推送。

       所有实战源码已上传至 GitHub 仓库,希望对读者有所帮助。

       如果你觉得文章对你有帮助,请给个在看、转发,原创不易,你的鼓励将是我继续写作的动力。

       本文版权属于 "Java技术栈",请遵循原创规则,禁止抄袭、洗稿。

不提你可能不知道,spring定时任务的数字星期域不符合常规的cron定义

       了解Spring定时任务的基本配置后,许多开发者会发现其与cron表达式的某些不寻常之处。本文将深入探讨Spring定时任务的数字星期域与传统cron定义之间的差异。

       在配置Spring定时任务时,使用@Scheduled(cron = "* * 1 * * *")可以轻松实现每天1点定时执行任务。但若尝试构建特定于星期一中午点的定时任务,您会发现cron表达式的应用与预期不符。

       在cron表达式中,星期一对应的数字是2,表示从星期天(数字1)开始的一周循环。然而,当将此类cron表达式应用于Spring定时任务时,任务实际上会在下一次星期二的同一时间执行,而非预期的星期一。

       这一现象同样存在于直接使用Spring的CronTask类,并传递cron表达式时。究其原因,Spring内部源码的怎么屏蔽源码处理逻辑导致了这一不一致性。在生成CronTrigger时,解析cron表达式的过程存在差异。

       解析过程涉及对数字星期域进行特殊转换,将其从英文缩写转换为数字,并对特定值进行处理。其中的关键在于对daysOfWeek位数组的操作,该数组用于存储解析后的星期信息。

       具体而言,解析过程首先将英文缩写转换为对应的数字表示,然后将数字域中"?"替换为"*",接着使用基础解析算法处理。最后,对daysOfWeek数组的第0位和第7位进行逻辑或操作,并将结果保存在第0位,同时清除第7位。这一处理方式导致了数字星期域与传统cron表达式之间的一天偏差。

       尽管如此,网络上关于Spring定时任务的教程和文章多聚焦于cron表达式的基础解释,较少提及此类问题的详细原因。然而,解决方法相对简单且有效:在cron表达式中使用英文缩写的星期表示,而非数字。这样做能够避免因数字转换导致的定时任务执行时间偏移。

       春代码设计人员选择这种处理方式可能与与Crontab中的cron表达式格式以及Linux计划任务的兼容性有关。Crontab采用0-7的数字表示星期,同时其格式在秒域的处理上与cron表达式有所不同。

       综上所述,对于在Spring中使用cron表达式配置定时任务的场景,推荐使用英文缩写来表示星期域。这样可以确保任务执行时间的准确性,并避免由于数字转换而导致的时间偏移。

SpringBoot动态定时任务的趣味php源码实现

       1. Spring 定时任务的简单实现

       在Spring Boot中使用定时任务,只需要@EnableScheduling开启定时任务支持,在需要调度的方法上添加@Scheduled注解。这样就能够在项目中开启定时调度功能了,支持通过cron、fixedRate、fixedDelay等灵活的控制执行周期和频率。

1.1 缺点

       周期一旦指定,想要更改必须要重启应用

1.2 需求

       热更新定时任务的执行周期,基于cron表达式并支持外部存储,如数据库,nacos等

       最小改造兼容现有的定时任务(仅需添加一个注解)

       动态增加定时任务

2.Spring 定时任务源码分析

       2.1 @EnableScheduling 引入了配置类 SchedulingConfiguration

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Import(SchedulingConfiguration.class)@Documentedpublic@interfaceEnableScheduling{ }

       2.2 SchedulingConfiguration只配置了一个bean,ScheduledAnnotationBeanPostProcessor从名字就知道该类实现BeanPostProcessor接口

@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{ @Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){ returnnewScheduledAnnotationBeanPostProcessor();}}

       2.3 ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization实现,可见具体处理@Scheduled实现定时任务的是processScheduled方法

@OverridepublicObjectpostProcessAfterInitialization(Objectbean,StringbeanName){ if(beaninstanceofAopInfrastructureBean||beaninstanceofTaskScheduler||beaninstanceofScheduledExecutorService){ //IgnoreAOPinfrastructuresuchasscopedproxies.returnbean;}Class<?>targetClass=AopProxyUtils.ultimateTargetClass(bean);if(!this.nonAnnotatedClasses.contains(targetClass)&&AnnotationUtils.isCandidateClass(targetClass,Arrays.asList(Scheduled.class,Schedules.class))){ //获取bean的方法及@Scheduled映射关系Map<Method,Set<Scheduled>>annotatedMethods=MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>)method->{ Set<Scheduled>scheduledMethods=AnnotatedElementUtils.getMergedRepeatableAnnotations(method,Scheduled.class,Schedules.class);return(!scheduledMethods.isEmpty()?scheduledMethods:null);});if(annotatedMethods.isEmpty()){ this.nonAnnotatedClasses.add(targetClass);if(logger.isTraceEnabled()){ logger.trace("No@Scheduledannotationsfoundonbeanclass:"+targetClass);}}else{ //Non-emptysetofmethodsannotatedMethods.forEach((method,scheduledMethods)->//处理@Scheduled注解scheduledMethods.forEach(scheduled->processScheduled(scheduled,method,bean)));if(logger.isTraceEnabled()){ logger.trace(annotatedMethods.size()+"@Scheduledmethodsprocessedonbean'"+beanName+"':"+annotatedMethods);}}}returnbean;}

       2.4 以下仅贴出ScheduledAnnotationBeanPostProcessor.processScheduled处理cron表达式的关键实现,

privatefinalScheduledTaskRegistrarregistrar;publicScheduledAnnotationBeanPostProcessor(){ this.registrar=newScheduledTaskRegistrar();}protectedvoidprocessScheduled(Scheduledscheduled,Methodmethod,Objectbean){ try{ //将定时任务方法,转为RunnableRunnablerunnable=createRunnable(bean,method);booleanprocessedSchedule=false;Set<ScheduledTask>tasks=newLinkedHashSet<>(4);//Determineinitialdelay//处理scheduled.initialDelay()的值,略过...//CheckcronexpressionStringcron=scheduled.cron();if(StringUtils.hasText(cron)){ Stringzone=scheduled.zone();if(this.embeddedValueResolver!=null){ //${ }变量值表达式的转换cron=this.embeddedValueResolver.resolveStringValue(cron);zone=this.embeddedValueResolver.resolveStringValue(zone);}if(StringUtils.hasLength(cron)){ Assert.isTrue(initialDelay==-1,"'initialDelay'notsupportedforcrontriggers");processedSchedule=true;if(!Scheduled.CRON_DISABLED.equals(cron)){ TimeZonetimeZone;if(StringUtils.hasText(zone)){ timeZone=StringUtils.parseTimeZoneString(zone);}else{ timeZone=TimeZone.getDefault();}//创建cron触发器CronTrigger对象,并注册CronTasktasks.add(this.registrar.scheduleCronTask(newCronTask(runnable,newCronTrigger(cron,timeZone))));}}}//处理fixedDelay和fixedRate,及ScheduledTask保存用于销毁,略过...}//略过catchException...}

       以上通过this.registrar.scheduleCronTask实现cron定时任务注册或初始化

3.动态定时任务的实现

       实现思路: 重写ScheduledAnnotationBeanPostProcessor.processScheduled方法,修改处理cron的部分代码,使用this.registrar.scheduleTriggerTask注册或初始化定时任务

3.1 相关类图classDiagramDisposableBean<|--DynamicCronScheduleTaskManagerEnvironmentAware<|--EnvironmentDynamicCronHandlerAbstractDynamicCronHandler<|--EnvironmentDynamicCronHandlerTrigger<|--DynamicCronTriggerEnvironmentAware:+setEnvironment()DisposableBean:+destroy()voidTrigger:+nextExecutionTime(TriggerContexttriggerContext)DateclassDynamicCronScheduleTaskManager{ +Map<String,ScheduledTask>dynamicScheduledTaskMap-ScheduledTaskRegistrarregistrar+addTriggerTask(StringcronName,TriggerTasktask)ScheduledTask+contains(StringcronName)boolean+updateTriggerTask(StringcronName)void+removeTriggerTask(StringcronName)void}classAbstractDynamicCronHandler{ -DynamicCronScheduleTaskManagerdynamicCronScheduleTaskManager;+getCronExpression(StringcronName)String+updateTriggerTash(StringcronName)void}classEnvironmentDynamicCronHandler{ +Environmentenvironment+environmentChangeEvent(EnvironmentChangeEventevent)void}classDynamicCronTrigger{ -StringcronName-AbstractDynamicCronHandlerdynamicCronHandler-StringcronExpression-CronSequenceGeneratorsequenceGenerator}classScheduledDynamicCron{ +value()String+cronName()String+handler()Class<?extendsAbstractDynamicCronHandler>}3.2 DynamicCronScheduleTaskManagerimportorg.springframework.beans.factory.DisposableBean;importorg.springframework.scheduling.config.ScheduledTask;importorg.springframework.scheduling.config.ScheduledTaskRegistrar;importorg.springframework.scheduling.config.TriggerTask;importjava.util.HashMap;importjava.util.Map;/***@authorHuangJS*@date--:下午*/publicclassDynamicCronScheduleTaskManagerimplementsDisposableBean{ privateMap<String,ScheduledTask>dynamicScheduledTaskMap=newHashMap<>();ScheduledTaskRegistrarregistrar;//添加定时任务publicScheduledTaskaddTriggerTask(StringcronName,TriggerTasktask){ ScheduledTaskscheduledTask=dynamicScheduledTaskMap.get(cronName);if(scheduledTask!=null){ scheduledTask.cancel();}scheduledTask=this.registrar.scheduleTriggerTask(task);dynamicScheduledTaskMap.put(cronName,scheduledTask);returnscheduledTask;}publicbooleancontains(StringcronName){ returnthis.dynamicScheduledTaskMap.containsKey(cronName);}//更新定时任务的触发时机publicvoidupdateTriggerTask(StringcronName){ ScheduledTaskscheduledTask=dynamicScheduledTaskMap.get(cronName);if(scheduledTask==null){ thrownewIllegalStateException("InvalidcronName""+cronName+"",nofundScheduledTask");}scheduledTask.cancel();scheduledTask=this.registrar.scheduleTriggerTask((TriggerTask)scheduledTask.getTask());dynamicScheduledTaskMap.put(cronName,scheduledTask);}//移除定时任务publicvoidremoveTriggerTask(StringcronName){ ScheduledTaskscheduledTask=dynamicScheduledTaskMap.remove(cronName);if(scheduledTask!=null){ scheduledTask.cancel();}}@Overridepublicvoiddestroy()throwsException{ for(ScheduledTaskvalue:dynamicScheduledTaskMap.values()){ value.cancel();}this.dynamicScheduledTaskMap.clear();}}3.3 AbstractDynamicCronHandlerpublicabstractclassAbstractDynamicCronHandler{ @AutowiredprivateDynamicCronScheduleTaskManagerdynamicCronScheduleTaskManager;/***获取cron表达式*@return*/publicabstractStringgetCronExpression(StringcronName);/***更新cronName对应的定时任务的触发时机*@paramcronName*/publicvoidupdateTriggerTask(StringcronName){ dynamicCronScheduleTaskManager.updateTriggerTask(cronName);}}3.4 EnvironmentDynamicCronHandler

       基于Environment,在刷新配置时,自动刷新定时任务的触发时机,支持分布式多节点集群部署。

       如,cron表达式配置在nacos,更新nacos上的配置时由于监听了EnvironmentChangeEvent事件实现了定时任务的触发时机的更新

importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.cloud.context.environment.EnvironmentChangeEvent;importorg.springframework.context.EnvironmentAware;importorg.springframework.context.event.EventListener;importorg.springframework.core.env.Environment;/***@authorHuangJS*@date--:上午*/publicclassEnvironmentDynamicCronHandlerextendsAbstractDynamicCronHandlerimplementsEnvironmentAware{ privatefinalLoggerlogger=LoggerFactory.getLogger(EnvironmentDynamicCronHandler.class);privateEnvironmentenvironment;@OverridepublicStringgetCronExpression(StringcronName){ try{ returnenvironment.getProperty(cronName);}catch(Exceptione){ logger.error(e.getMessage(),e);}returnnull;}@OverridepublicvoidsetEnvironment(Environmentenvironment){ this.environment=environment;}@EventListenerpublicvoidenvironmentChangeEvent(EnvironmentChangeEventevent){ for(Stringkey:event.getKeys()){ if(this.dynamicCronScheduleTaskManager.contains(key)){ this.dynamicCronScheduleTaskManager.updateTriggerTask(key);}}}}3.5 DynamicCronTriggerpublicclassDynamicCronTriggerimplementsTrigger{ privatefinalstaticLoggerLOGGER=LoggerFactory.getLogger(DynamicCronTrigger.class);privateStringcronName;privateAbstractDynamicCronHandlerdynamicCronHandler;privateStringcronExpression;privateCronSequenceGeneratorsequenceGenerator;publicDynamicCronTrigger(StringcronName,AbstractDynamicCronHandlerdynamicCronHandler){ this.cronName=cronName;this.dynamicCronHandler=dynamicCronHandler;}@OverridepublicDatenextExecutionTime(TriggerContexttriggerContext){ StringcronExpression=dynamicCronHandler.getCronExpression(cronName);if(cronExpression==null){ returnnull;}if(this.sequenceGenerator==null||!cronExpression.equals(this.cronExpression)){ try{ this.sequenceGenerator=newCronSequenceGenerator(cronExpression);this.cronExpression=cronExpression;}catch(Exceptione){ LOGGER.error(e.getMessage(),e);}}Datedate=triggerContext.lastCompletionTime();if(date!=null){ Datescheduled=triggerContext.lastScheduledExecutionTime();if(scheduled!=null&&date.before(scheduled)){ //Previoustaskapparentlyexecutedtooearly...//Let'ssimplyusethelastcalculatedexecutiontimethen,//inordertopreventaccidentalre-firesinthesamesecond.date=scheduled;}}else{ date=newDate();}returnthis.sequenceGenerator.next(date);}}3.6 注解类ScheduledDynamicCron@Target({ ElementType.METHOD,ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic@interfaceScheduledDynamicCron{ /***动态cron名称*@return*/@AliasFor("cronName")Stringvalue()default"";/***动态cr

scheduledthreadpoolexecutor 初始化多少个

       é€šè¿‡Executors,可以创建3种类型的ThreadPoolExecutor。

       - FixedThreadPool

       - SingleThreadExecutor

       - CachedThreadPool

       1.FixedThreadPool

       FixedThreadPool被称为可重用固定线程数的线程池。下面是FixedThreadPool的源代码实现。

       public static ExecutorService newFixedThreadPool(int nThreads) {

        return new ThreadPoolExecutor(nThreads, nThreads, 0L,

        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

       FixedThreadPool中多余的空闲线程会被立即终止。

       FixedThreadPool的execute()运行示意图如下所示。

       å¦‚果当前运行的线程数小于corePoolSize,则创建新线程来执行任务。

       å½“前运行的线程数等于corePoolSize,将任务加入LinkedBlockingQueue。

       çº¿ç¨‹æ‰§è¡Œå®Œ1中的任务后,会反复从阻塞队列中取任务执行。

带你学会区分ScheduledThreadPoolExecutor与Timer

       æ‘˜è¦ï¼šæœ¬æ–‡ç®€å•ä»‹ç»ä¸‹ScheduledThreadPoolExecutor类与Timer类的区别,ScheduledThreadPoolExecutor类相比于Timer类来说,究竟有哪些优势,以及二者分别实现任务调度的简单示例。

       JDK1.5开始提供ScheduledThreadPoolExecutor类,ScheduledThreadPoolExecutor类继承ThreadPoolExecutor类重用线程池实现了任务的周期性调度功能。在JDK1.5之前,实现任务的周期性调度主要使用的是Timer类和TimerTask类。本文,就简单介绍下ScheduledThreadPoolExecutor类与Timer类的区别,ScheduledThreadPoolExecutor类相比于Timer类来说,究竟有哪些优势,以及二者分别实现任务调度的简单示例。

二者的区别线程角度

       Timer是单线程模式,如果某个TimerTask任务的执行时间比较久,会影响到其他任务的调度执行。

       ScheduledThreadPoolExecutor是多线程模式,并且重用线程池,某个ScheduledFutureTask任务执行的时间比较久,不会影响到其他任务的调度执行。

系统时间敏感度

       Timer调度是基于操作系统的绝对时间的,对操作系统的时间敏感,一旦操作系统的时间改变,则Timer的调度不再精确。

       ScheduledThreadPoolExecutor调度是基于相对时间的,不受操作系统时间改变的影响。

是否捕获异常

       Timer不会捕获TimerTask抛出的异常,加上Timer又是单线程的。一旦某个调度任务出现异常,则整个线程就会终止,其他需要调度的任务也不再执行。

       ScheduledThreadPoolExecutor基于线程池来实现调度功能,某个任务抛出异常后,其他任务仍能正常执行。

任务是否具备优先级

       Timer中执行的TimerTask任务整体上没有优先级的概念,只是按照系统的绝对时间来执行任务。

       ScheduledThreadPoolExecutor中执行的ScheduledFutureTask类实现了java.lang.Comparable接口和java.util.concurrent.Delayed接口,这也就说明了ScheduledFutureTask类中实现了两个非常重要的方法,一个是java.lang.Comparable接口的compareTo方法,一个是java.util.concurrent.Delayed接口的getDelay方法。在ScheduledFutureTask类中compareTo方法实现了任务的比较,距离下次执行的时间间隔短的任务会排在前面,也就是说,距离下次执行的时间间隔短的任务的优先级比较高。而getDelay方法则能够返回距离下次任务执行的时间间隔。

是否支持对任务排序

       Timer不支持对任务的排序。

       ScheduledThreadPoolExecutor类中定义了一个静态内部类DelayedWorkQueue,DelayedWorkQueue类本质上是一个有序队列,为需要调度的每个任务按照距离下次执行时间间隔的大小来排序

能否获取返回的结果

       Timer中执行的TimerTask类只是实现了java.lang.Runnable接口,无法从TimerTask中获取返回的结果。

       ScheduledThreadPoolExecutor中执行的ScheduledFutureTask类继承了FutureTask类,能够通过Future来获取返回的结果。

       é€šè¿‡ä»¥ä¸Šå¯¹ScheduledThreadPoolExecutor类和Timer类的分析对比,相信在JDK1.5之后,就没有使用Timer来实现定时任务调度的必要了。

二者简单的示例

       è¿™é‡Œï¼Œç»™å‡ºä½¿ç”¨Timer和ScheduledThreadPoolExecutor实现定时调度的简单示例,为了简便,我这里就直接使用匿名内部类的形式来提交任务。

Timer类简单示例

       æºä»£ç ç¤ºä¾‹å¦‚下所示。

packageio.binghe.concurrent.lab;importjava.util.Timer;importjava.util.TimerTask;/***@authorbinghe*@version1.0.0*@description测试Timer*/publicclassTimerTest{ publicstaticvoidmain(String[]args)throwsInterruptedException{ Timertimer=newTimer();timer.scheduleAtFixedRate(newTimerTask(){ @Overridepublicvoidrun(){ System.out.println("测试Timer类");}},,);Thread.sleep();timer.cancel();}}

       è¿è¡Œç»“果如下所示。

测试Timer类测试Timer类测试Timer类测试Timer类测试Timer类测试Timer类测试Timer类测试Timer类测试Timer类测试Timer类ScheduledThreadPoolExecutor类简单示例

       æºä»£ç ç¤ºä¾‹å¦‚下所示。

packageio.binghe.concurrent.lab;importjava.util.concurrent.*;/***@authorbinghe*@version1.0.0*@description测试ScheduledThreadPoolExecutor*/publicclassScheduledThreadPoolExecutorTest{ publicstaticvoidmain(String[]args)throwsInterruptedException{ ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(3);scheduledExecutorService.scheduleAtFixedRate(newRunnable(){ @Overridepublicvoidrun(){ System.out.println("测试测试ScheduledThreadPoolExecutor");}},1,1,TimeUnit.SECONDS);//主线程休眠秒Thread.sleep();System.out.println("正在关闭线程池...");//关闭线程池scheduledExecutorService.shutdown();booleanisClosed;//等待线程池终止do{ isClosed=scheduledExecutorService.awaitTermination(1,TimeUnit.DAYS);System.out.println("正在等待线程池中的任务执行完成");}while(!isClosed);System.out.println("所有线程执行结束,线程池关闭");}}

       è¿è¡Œç»“果如下所示。

测试测试ScheduledThreadPoolExecutor测试测试ScheduledThreadPoolExecutor测试测试ScheduledThreadPoolExecutor测试测试ScheduledThreadPoolExecutor测试测试ScheduledThreadPoolExecutor测试测试ScheduledThreadPoolExecutor测试测试ScheduledThreadPoolExecutor测试测试ScheduledThreadPoolExecutor测试测试ScheduledThreadPoolExecutor正在关闭线程池...测试测试ScheduledThreadPoolExecutor正在等待线程池中的任务执行完成所有线程执行结束,线程池关闭

       æ³¨æ„ï¼šå…³äºŽTimer和ScheduledThreadPoolExecutor还有其他的使用方法,这里,我就简单列出以上两个使用示例,更多的使用方法大家可以自行实现。

       æœ¬æ–‡åˆ†äº«è‡ªåŽä¸ºäº‘社区《【高并发】ScheduledThreadPoolExecutor与Timer的区别和简单示例》,作者:冰河。

可动态配置的Schedule设计

       1.背景

       定时任务是实际开发中常见的一类功能,例如每天早上凌晨对前一天的注册用户数量、渠道来源进行统计,并以邮件报表的方式发送给相关人员。相信这样的需求,每个开发伙伴都处理过。

       你可以使用Linux的Crontab启动应用程序进行处理,或者直接使用Spring的Schedule对任务进行调度,还可以使用分布式调度系统,花椒TV源码如果xxl-job等。相信你已经轻车熟路、习以为常。直到有一天你接到了一个新需求:

       1.新建一组任务,周期性的执行指定SQL并将结果以邮件的方式发送给特定人群;2.比较方便的对任务进行管理,比如启动、停止,修改调度周期等;3.动态添加、移除任务,不需要频繁的修改、发布程序;

       停顿几分钟,简单思考一下,有哪几种实现思路呢?

       本篇文章将从以下几部分进行讨论:

       1.SpringSchedule配置和使用。首先我们将介绍Demo的骨架,并基于Spring-Boot完成Schedule的配置;2.数据库定时轮询方案。使用SpringSchedule定时轮询数据库,并执行相应任务。在执行任务策略中,我们将尝试同步和异步执行两种方案,并对其优缺点进行分析;3.基于TaskScheduler动态配置方案。基于数据库轮询或配置中心两种方案动态的对SpringTaskScheduler进行配置,以实现动态管理任务的目的;4.我们进入分布式环境,利用多个冗余节点解决系统高可用问题,同时使用分布式锁保障只会有一个任务同时执行;

2.SpringSchedule

       SpringBoot上的Schedule的使用非常简单,无需增加新的依赖,只需简单配置即可。

       1.使用@EnableScheduling启用Schedule;2.在要调度的方法上增加@Scheduled;

       首先,我们需要在启动类上添加@EnableScheduling注解,该注解将启用SchedulingConfiguration配置类帮我们完成最基本的配置。

@SpringBootApplication@EnableSchedulingpublicclassConfigurableScheduleDemoApplication{ publicstaticvoidmain(String[]args){ SpringApplication.run(ConfigurableScheduleDemoApplication.class,args);}}

       启用Schedule配置之后,在需要被调度的方法上增加@Scheduled注解。

@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}

       runTask任务延迟1s进行初始化,并以5s为间隔进行调度。

       Scheduled注解类的详细配置如下:

配置含义样例cronlinuxcrontab表达式@Scheduled(cron="*/5****MON-FRI")工作日,每5s调度一次fixedDelay固定间隔,上次运行结束,与下次启动运行,相隔固定时长@Scheduled(fixedDelay=)运行结束后,5S后启动一次调度fixedDelayString与fixedDelay一致fixedRate固定周期,前后两次运行相隔固定的时长@Scheduled(fixedRate=)前后两个任务,间隔5秒fixedRateString与fixedRate一致initialDelay第一次执行,间隔时间@Scheduled(initialDelay=,fixedRate=)第一次执行,延时1秒,以后以5秒为周期进行调度initialDelayString与initialDelay一致

       环境搭建完成,让我们开始第一个方案。

3.数据库定时轮询

       使用数据库来管理任务,通过轮询的方案,进行动态调度。首先,我们看下最简单的方案:串行执行方案。

3.1.串行执行方案

       整体思路非常简单,流程如下:

       主要分如下几步:

       1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.根据数据库的任务配置信息,依次遍历并执行任务;3.任务执行完成后,经过计算获得下一次调度时间,将其写回到数据库;4.等待下一次任务调度。

       核心代码如下:

@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载需要运行的任务://1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//依次遍历待运行任务,执行对于的任务for(TaskDefinitionV2task:shouldRunTasks){ //DoubleCheckif(task.shouldRun(now)){ //执行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}

       方案简单但非常有效,那该方案存在哪些问题呢?最主要的问题就是:任务串行执行,会导致后面任务出现延时运行;同时,下一轮检查也会被delay。

       例如,依次加载了待执行任务task1、task2、task3。其中task1耗时5秒,task2耗时5秒,task3耗时1秒,由于三个任务串行执行,task2将延时5秒,task3延时秒;下一轮检查距上次启动相差秒。

       究其根本,核心问题是调度线程和运行线程是同一个线程,调度的运行和任务的运行相互影响。

       让我们看一个改进方案:并行执行方案。

3.2.并行执行方案

       整体执行流程如下:

       相比之前的方案,新方案引入了线程池,每一个任务对应一个线程池,避免任务间的相互影响;任务在线程池中异步处理,避免了调度线程的延时。具体流程如下:

       1.步骤一不变,在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.依次遍历任务,将任务提交到专有线程池中异步执行,调度线程直接返回;3.任务在线程池中运行,结束后更新下一次的运行时间;4.调度线程重新从数据库中获取待执行任务,在将任务提交至线程池中,如果有任务正在执行,使用线程池拒绝策略,抛弃最老的任务;

       核心代码如下:

       Spring调度任务,每1秒运行一次:

@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载所有待运行的任务//1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//遍历待运行任务for(TaskDefinitionV2task:shouldRunTasks){ //1.根据TaskId获取任务对应的线程池//2.将任务提交至线程池中this.executorServiceForTask(task.getId()).submit(newTaskRunner(task.getId()));}}

       自定义线程池,每个线程池最多只有一个线程,空闲超过秒后,线程自动回收,线程饱和时,直接丢弃最老的任务:

privateExecutorServiceexecutorServiceForTask(LongtaskId){ returnthis.executorServiceRegistry.computeIfAbsent(taskId,id->{ BasicThreadFactorythreadFactory=newBasicThreadFactory.Builder()//指定线程池名称.namingPattern("Async-Task-"+taskId+"-Thread-%d")//设置线程为后台线程.daemon(true).build();//线程池核心配置://1.每个线程池最多只有一个线程//2.线程空闲超过秒进行自动回收//3.直接使用交互器,线程空闲进行任务交互//4.使用指定的线程工厂,设置线性名称//5.线程池饱和,自动丢弃最老的任务returnnewThreadPoolExecutor(0,1,L,TimeUnit.SECONDS,newSynchronousQueue<>(),threadFactory,newThreadPoolExecutor.DiscardOldestPolicy());});}

       最后,在线程池中运行的Task如下:

privateclassTaskRunnerimplementsRunnable{ privatefinalDatenow=newDate();privatefinalLongtaskId;publicTaskRunner(LongtaskId){ this.taskId=taskId;}@Overridepublicvoidrun(){ //重新加载任务,保持最新的任务状态TaskDefinitionV2task=definitionV2Repository.findById(this.taskId).orElse(null);if(task!=null&&task.shouldRun(now)){ //运行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}4.TaskScheduler配置方案

       该方案的核心为:绕过@Schedule注解,直接对Spring底层核心类TaskScheduler进行配置。

       TaskScheduler接口是Spring对调度任务的一个抽象,更是@Schedule背后默默的支持者,首先我们看下这个接口定义。

publicinterfaceTaskScheduler{ ScheduledFutureschedule(Runnabletask,Triggertrigger);ScheduledFutureschedule(Runnabletask,InstantstartTime);ScheduledFutureschedule(Runnabletask,DatestartTime);ScheduledFuturescheduleAtFixedRate(Runnabletask,InstantstartTime,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,DatestartTime,longperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,longperiod);ScheduledFuturescheduleWithFixedDelay(Runnabletask,InstantstartTime,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,DatestartTime,longdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,longdelay);}

       满满的都是schedule接口,其他的比较简单就不过多叙述了,重点说下Trigger这个接口,首先看下这个接口的定义:

publicinterfaceTrigger{ DatenextExecutionTime(TriggerContexttriggerContext);}

       只有一个方法,获取下次执行的时间。在任务执行完成后,会调用Trigger的nextExecutionTime获取下一次运行时间,从而实现周期性调度。

       CronTrigger是Trigger的最常见实现,以linuxcrontab的方式配置调度任务,如:

scheduler.schedule(task,newCronTrigger("-**MON-FRI"));

       基础部分简单介绍到这,让我们看下数据库动态配置方案。

4.1数据库动态配置方案

       整体设计如下:

       仍旧是轮询数据库方式,详细流程如下:

       1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取所有任务;2.依次遍历任务,与内存中的TaskEntry(任务与状态)进行比对,动态的向TaskScheduler中添加或取消调度任务;3.由TaskScheduler负责实际的任务调度;

       核心代码如下:

@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndConfig(){ //加载所有的任务信息List<TaskDefinitionV3>tasks=repository.findAll();//遍历任务进行任务检查for(TaskDefinitionV3task:tasks){ //获取内存任务状态TaskEntrytaskEntry=this.taskEntry.computeIfAbsent(task.getId(),TaskEntry::new);if(task.isEnable()&&taskEntry.isStop()){ //任务为可用,运行状态为停止,则重新进行schedule注册ScheduledFuture<?>scheduledFuture=this.taskScheduler.scheduleWithFixedDelay(newTaskRunner(task),task.getDelay()*);taskEntry.setScheduledFuture(scheduledFuture);log.info("successtostartscheduletaskfor{ }",task);}elseif(task.isDisable()&&taskEntry.isRunning()){ //任务为禁用,运行状态为运行中,停止正在运行在任务taskEntry.stop();log.info("successtostopscheduletaskfor{ }",task);}}}

       核心辅助类:

@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}0

       有没有发现,以上方案都有一个共同的缺陷:基于数据库轮询获取任务,加大了数据库压力。理论上,只有在配置发生变化时才有必要对任务进行更新,接下来让我们看下改进方案:基于配置中心的方案。

4.2配置中心通知方案

       整体设计如下:

       核心流程如下:

       1.应用启动时,从配置中心中获取调度的配置信息,并完成对TaskScheduler的配置;2.当配置发送变化时,配置中心会主动将配置推送到应用程序,应用程序在接收到变化通知时,动态的增加或取消调度任务;3.任务的实际调度仍旧由TaskScheduler完成。

       由于手底下没有配置中心,暂时没有coding,思路很简单,有条件的同学可以自行完成。

5.分布式环境下应用

       以上方案,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就停止了,为了避免这种情况的发生,需要提升系统的可用性,实现冗余部署和自动化容灾。

       以上方案,如果部署多个节点会发生什么?是的,会出现任务被多次调度的问题,为了保障在同一时刻只有一个任务在运行,需要为任务增加一个排他锁。同时,由于排他锁的存在,当一个节点处问题后,另一个节点在调度时会自动获取锁,从而解系统的单点问题。

       为了简单,我们使用Redis的分布式锁。

5.1.环境搭建

       Redisson是Redis的一个富客户端,提供了很多高级的数据结构。本次,我们将使用RLock对应用进行保护。

       首先,在pom中引入RedissonStarter。

@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}1

       然后,在application.properties文件中增加Redis配置,具体如下:

@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}.2引入分布式锁

       最后,就可以直接使用分布式锁对任务执行进行保护了,代码如下:

@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}3

       备注:

       Redis是典型的AP应用,而分布式锁严格意义上来说是CP。所以基于Redis的分布式锁只能使用在非严格环境中,比如我们的数据报表需求。如果设计金钱,需要使用CP实现,如Zookeeper或etcd等。

6.小结

       本文从Spring的Schedule出发,依次对数据库轮询方案、TaskScheduler配置方案进行详细讲解,以实现对调度任务的可配置化。最后,使用Redis分布式锁有效解决了分布式环境下任务重复调度和自动容灾问题。

       仍旧是那句话,架构设计没有更好,只有最适合。同学们可以根据自己的需求自取。

References

       [1]源码:/litao/books/tree/master/configurable-schedule