可动态配置的人群人群Schedule设计
1.背景
定时任务是实际开发中常见的一类功能,例如每天早上凌晨对前一天的统计统计注册用户数量、渠道来源进行统计,源码源码keycloak源码分析并以邮件报表的人群人群方式发送给相关人员。相信这样的统计统计需求,每个开发伙伴都处理过。源码源码
你可以使用Linux的人群人群Crontab启动应用程序进行处理,或者直接使用Spring的统计统计Schedule对任务进行调度,还可以使用分布式调度系统,源码源码如果xxl-job等。人群人群相信你已经轻车熟路、统计统计习以为常。源码源码直到有一天你接到了一个新需求:
1.新建一组任务,周期性的执行指定SQL并将结果以邮件的方式发送给特定人群;2.比较方便的对任务进行管理,比如启动、停止,修改调度周期等;3.动态添加、移除任务,不需要频繁的修改、发布程序;
停顿几分钟,简单思考一下,有哪几种实现思路呢?
本篇文章将从以下几部分进行讨论:
1.SpringSchedule配置和使用。首先我们将介绍Demo的骨架,并基于Spring-Boot完成Schedule的配置;2.数据库定时轮询方案。使用SpringSchedule定时轮询数据库,并执行相应任务。在执行任务策略中,我们将尝试同步和异步执行两种方案,并对其优缺点进行分析;3.基于TaskScheduler动态配置方案。基于数据库轮询或配置中心两种方案动态的对SpringTaskScheduler进行配置,以实现动态管理任务的目的;4.我们进入分布式环境,利用多个冗余节点解决系统高可用问题,同时使用分布式锁保障只会有一个任务同时执行;
2.SpringScheduleSpringBoot上的host源码Schedule的使用非常简单,无需增加新的依赖,只需简单配置即可。
1.使用@EnableScheduling启用Schedule;2.在要调度的方法上增加@Scheduled;
首先,我们需要在启动类上添加@EnableScheduling注解,该注解将启用SchedulingConfiguration配置类帮我们完成最基本的配置。
@SpringBootApplication@EnableSchedulingpublicclassConfigurableScheduleDemoApplication{ publicstaticvoidmain(String[]args){ SpringApplication.run(ConfigurableScheduleDemoApplication.class,args);}}启用Schedule配置之后,在需要被调度的方法上增加@Scheduled注解。
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}runTask任务延迟1s进行初始化,并以5s为间隔进行调度。
Scheduled注解类的详细配置如下:
配置含义样例cronlinuxcrontab表达式@Scheduled(cron="*/5****MON-FRI")工作日,每5s调度一次fixedDelay固定间隔,上次运行结束,与下次启动运行,相隔固定时长@Scheduled(fixedDelay=)运行结束后,5S后启动一次调度fixedDelayString与fixedDelay一致fixedRate固定周期,前后两次运行相隔固定的时长@Scheduled(fixedRate=)前后两个任务,间隔5秒fixedRateString与fixedRate一致initialDelay第一次执行,间隔时间@Scheduled(initialDelay=,fixedRate=)第一次执行,延时1秒,以后以5秒为周期进行调度initialDelayString与initialDelay一致环境搭建完成,让我们开始第一个方案。
3.数据库定时轮询使用数据库来管理任务,通过轮询的方案,进行动态调度。首先,我们看下最简单的方案:串行执行方案。
3.1.串行执行方案整体思路非常简单,流程如下:
主要分如下几步:
1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.根据数据库的任务配置信息,依次遍历并执行任务;3.任务执行完成后,经过计算获得下一次调度时间,将其写回到数据库;4.等待下一次任务调度。
核心代码如下:
@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载需要运行的任务://1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//依次遍历待运行任务,执行对于的vener源码任务for(TaskDefinitionV2task:shouldRunTasks){ //DoubleCheckif(task.shouldRun(now)){ //执行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}方案简单但非常有效,那该方案存在哪些问题呢?最主要的问题就是:任务串行执行,会导致后面任务出现延时运行;同时,下一轮检查也会被delay。
例如,依次加载了待执行任务task1、task2、task3。其中task1耗时5秒,task2耗时5秒,task3耗时1秒,由于三个任务串行执行,task2将延时5秒,task3延时秒;下一轮检查距上次启动相差秒。
究其根本,核心问题是调度线程和运行线程是同一个线程,调度的运行和任务的运行相互影响。
让我们看一个改进方案:并行执行方案。
3.2.并行执行方案整体执行流程如下:
相比之前的方案,新方案引入了线程池,每一个任务对应一个线程池,避免任务间的相互影响;任务在线程池中异步处理,避免了调度线程的延时。具体流程如下:
1.步骤一不变,在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.依次遍历任务,将任务提交到专有线程池中异步执行,调度线程直接返回;3.任务在线程池中运行,结束后更新下一次的运行时间;4.调度线程重新从数据库中获取待执行任务,在将任务提交至线程池中,如果有任务正在执行,使用线程池拒绝策略,抛弃最老的任务;
核心代码如下:
Spring调度任务,每1秒运行一次:
@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载所有待运行的任务//1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//遍历待运行任务for(TaskDefinitionV2task:shouldRunTasks){ //1.根据TaskId获取任务对应的线程池//2.将任务提交至线程池中this.executorServiceForTask(task.getId()).submit(newTaskRunner(task.getId()));}}自定义线程池,每个线程池最多只有一个线程,getlogin源码空闲超过秒后,线程自动回收,线程饱和时,直接丢弃最老的任务:
privateExecutorServiceexecutorServiceForTask(LongtaskId){ returnthis.executorServiceRegistry.computeIfAbsent(taskId,id->{ BasicThreadFactorythreadFactory=newBasicThreadFactory.Builder()//指定线程池名称.namingPattern("Async-Task-"+taskId+"-Thread-%d")//设置线程为后台线程.daemon(true).build();//线程池核心配置://1.每个线程池最多只有一个线程//2.线程空闲超过秒进行自动回收//3.直接使用交互器,线程空闲进行任务交互//4.使用指定的线程工厂,设置线性名称//5.线程池饱和,自动丢弃最老的任务returnnewThreadPoolExecutor(0,1,L,TimeUnit.SECONDS,newSynchronousQueue<>(),threadFactory,newThreadPoolExecutor.DiscardOldestPolicy());});}最后,在线程池中运行的Task如下:
privateclassTaskRunnerimplementsRunnable{ privatefinalDatenow=newDate();privatefinalLongtaskId;publicTaskRunner(LongtaskId){ this.taskId=taskId;}@Overridepublicvoidrun(){ //重新加载任务,保持最新的任务状态TaskDefinitionV2task=definitionV2Repository.findById(this.taskId).orElse(null);if(task!=null&&task.shouldRun(now)){ //运行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}4.TaskScheduler配置方案该方案的核心为:绕过@Schedule注解,直接对Spring底层核心类TaskScheduler进行配置。
TaskScheduler接口是Spring对调度任务的一个抽象,更是@Schedule背后默默的支持者,首先我们看下这个接口定义。
publicinterfaceTaskScheduler{ ScheduledFutureschedule(Runnabletask,Triggertrigger);ScheduledFutureschedule(Runnabletask,InstantstartTime);ScheduledFutureschedule(Runnabletask,DatestartTime);ScheduledFuturescheduleAtFixedRate(Runnabletask,InstantstartTime,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,DatestartTime,longperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,longperiod);ScheduledFuturescheduleWithFixedDelay(Runnabletask,InstantstartTime,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,DatestartTime,longdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,longdelay);}满满的都是schedule接口,其他的比较简单就不过多叙述了,重点说下Trigger这个接口,首先看下这个接口的定义:
publicinterfaceTrigger{ DatenextExecutionTime(TriggerContexttriggerContext);}只有一个方法,获取下次执行的时间。在任务执行完成后,会调用Trigger的nextExecutionTime获取下一次运行时间,从而实现周期性调度。
CronTrigger是Trigger的最常见实现,以linuxcrontab的方式配置调度任务,如:
scheduler.schedule(task,newCronTrigger("-**MON-FRI"));基础部分简单介绍到这,让我们看下数据库动态配置方案。
4.1数据库动态配置方案整体设计如下:
仍旧是轮询数据库方式,详细流程如下:
1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取所有任务;2.依次遍历任务,与内存中的TaskEntry(任务与状态)进行比对,动态的向TaskScheduler中添加或取消调度任务;3.由TaskScheduler负责实际的任务调度;
核心代码如下:
@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndConfig(){ //加载所有的任务信息List<TaskDefinitionV3>tasks=repository.findAll();//遍历任务进行任务检查for(TaskDefinitionV3task:tasks){ //获取内存任务状态TaskEntrytaskEntry=this.taskEntry.computeIfAbsent(task.getId(),TaskEntry::new);if(task.isEnable()&&taskEntry.isStop()){ //任务为可用,运行状态为停止,则重新进行schedule注册ScheduledFuture<?>scheduledFuture=this.taskScheduler.scheduleWithFixedDelay(newTaskRunner(task),task.getDelay()*);taskEntry.setScheduledFuture(scheduledFuture);log.info("successtostartscheduletaskfor{ }",task);}elseif(task.isDisable()&&taskEntry.isRunning()){ //任务为禁用,运行状态为运行中,停止正在运行在任务taskEntry.stop();log.info("successtostopscheduletaskfor{ }",task);}}}核心辅助类:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}0有没有发现,以上方案都有一个共同的kka源码缺陷:基于数据库轮询获取任务,加大了数据库压力。理论上,只有在配置发生变化时才有必要对任务进行更新,接下来让我们看下改进方案:基于配置中心的方案。
4.2配置中心通知方案整体设计如下:
核心流程如下:
1.应用启动时,从配置中心中获取调度的配置信息,并完成对TaskScheduler的配置;2.当配置发送变化时,配置中心会主动将配置推送到应用程序,应用程序在接收到变化通知时,动态的增加或取消调度任务;3.任务的实际调度仍旧由TaskScheduler完成。
由于手底下没有配置中心,暂时没有coding,思路很简单,有条件的同学可以自行完成。
5.分布式环境下应用以上方案,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就停止了,为了避免这种情况的发生,需要提升系统的可用性,实现冗余部署和自动化容灾。
以上方案,如果部署多个节点会发生什么?是的,会出现任务被多次调度的问题,为了保障在同一时刻只有一个任务在运行,需要为任务增加一个排他锁。同时,由于排他锁的存在,当一个节点处问题后,另一个节点在调度时会自动获取锁,从而解系统的单点问题。
为了简单,我们使用Redis的分布式锁。
5.1.环境搭建Redisson是Redis的一个富客户端,提供了很多高级的数据结构。本次,我们将使用RLock对应用进行保护。
首先,在pom中引入RedissonStarter。
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}1然后,在application.properties文件中增加Redis配置,具体如下:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}.2引入分布式锁最后,就可以直接使用分布式锁对任务执行进行保护了,代码如下:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}3备注:
Redis是典型的AP应用,而分布式锁严格意义上来说是CP。所以基于Redis的分布式锁只能使用在非严格环境中,比如我们的数据报表需求。如果设计金钱,需要使用CP实现,如Zookeeper或etcd等。
6.小结本文从Spring的Schedule出发,依次对数据库轮询方案、TaskScheduler配置方案进行详细讲解,以实现对调度任务的可配置化。最后,使用Redis分布式锁有效解决了分布式环境下任务重复调度和自动容灾问题。
仍旧是那句话,架构设计没有更好,只有最适合。同学们可以根据自己的需求自取。
References[1]源码:/litao/books/tree/master/configurable-schedule
SPM 软件介绍
SPM数据挖掘预测分析软件,由美国Salford Systems公司开发,其核心是先进的机器学习算法,旨在提供预测分析工具。软件主要模块包括广义路径追踪(GPS)算法、智能变量分组、自动化变量缺失值填补、逻辑回归算法、最小二乘线性回归模型等。
GPS算法通过机器学习方法建立大量候选线性模型,并自动选择最优模型,显著提升模型效果。智能变量分组高度自动化地对变量进行智能分组,减少手动工作,提升建模效率和模型性能。自动化缺失值填补模块利用算法快速处理缺失值,包含均值、中位数、众数填补方式及利用预测模型进行个性化填补。
软件提供经典逻辑回归算法,结合TreeNet,快速建立高精度模型;最小二乘线性回归模型支持单一变量线性相关性测试,结合TreeNet快速开发精确回归模型。
SPM8优势显著,高精度,TreeNet是唯一由GBM发明人源代码开发而成,经过不断迭代优化,无需深入了解GBM内部算法,即可获得高精度模型。高纬度特征快速筛选能力,TreeNet是目前最快的GBM算法,适合高纬度快速变量筛选;GPS是最快速的正则化回归算法,支持广谱正则化路径搜索策略,作为快速衍生特征筛选工具。
用户界面友好,提供强大自动化建模功能,简化操作,无需专业背景即可轻松使用。SPM的自动化建模技术包括自动化模型优化和机器学习模型置信度检验。热点追踪功能适用于信用风险和反欺诈场景,利用CART调整PRIOR设置快速识别关注人群特征。聚类和异常点分析采用监督学习算法,给出解释性强的规则形式聚类;利用CART中的AUTOMATE UNSUPERVISED找到样本异常点,应用于反欺诈等场景。快速逻辑回归模型开发通过TreeNet变量筛选和Spline变形或Data Binning快速变量分组,提供高效模型。
SPM为客户提供价值,解决大数据人力资源问题,通过高度自动化、智能化使用方式降低建模人员门槛,无需编程、理论基础和经验,较短时间内建立专家级模型。高效分析技术解放建模人员于繁琐低效手动工作,显著减少数据预处理时间,提高分析效率。自动化模型优化和机器学习模型置信度检验提升建模效率。通过GPS和Data Binning快速逻辑回归模型开发,节约人力成本,将更多精力用于商业问题理解、数据源获取、新特征构建和策略设计等创意性工作。识别高风险客户,预测即将流失客户,实现更加精准的客户关系维护。SPM建立的机器学习模型通常性能优于经典统计技术建立的模型5%到%,作为模型性能对比的基准。
北京天演融智软件有限公司作为SPM软件在中国的授权经销商,提供优质的软件销售和培训服务。
健康体检管理系统(PEIS)源码,自动生成体检报告,查询、统计和分析功能
强化健康管理,PEIS体检管理系统引领未来医疗新风尚!健康体检管理系统(PEIS)以创新科技,致力于打造高效、精准的体检服务,它的源码不仅包含了自动生成体检报告的强大功能,还整合了一系列实用查询、统计与分析工具,让健康管理变得更加智能和便捷。
PEIS系统的核心亮点在于与HIS系统的无缝对接,实现了临床信息系统在体检流程中的深度应用。无论是个人还是团体体检,无论是儿童入学、老年人保健还是职业病筛查,系统都能提供定制化的体检方案,支持多元化人群需求。通过LIS和PACS接口,系统能实时获取检验结果,生成专业且个性化的体检报告,无论是PDF还是其他格式,都一应俱全,确保体检者随时随地查看或打印。 系统设计独具匠心,支持个性化套餐选择,且灵活处理费用折扣,为体检者带来实惠。专业模板知识库为医生提供决策支持,体检者可以通过微信、自助设备轻松获取报告,体验无感服务。自动排队管理功能,智能调度体检流程,让等待时间大大减少,而多样化的支付方式,如微信、支付宝、医保卡等,让缴费过程更为顺畅。PEIS的主要功能模块全面而强大:
体检管理:预约、登记、照片采集、档案维护、费用结算,一应俱全,确保体检流程的顺利进行。
接口管理:与HIS、LIS、PACS深度集成,确保数据无缝对接,提高工作效率。
体检报告:个人报告生成,数据导出,以及多样化的统计分析报表,提供全方位的体检结果解析。
查询统计:精细的科室查询,医生工作量分析,财务结算,为健康管理提供科学依据。
基础维护:报告管理、项目维护、科室维护等基础功能,确保系统稳定运行。
PEIS不仅仅是一个体检管理系统,它更是医疗健康管理的革新者,旨在提升体检体验,助力医疗机构实现数字化转型,为每个人提供更加精准和个性化的健康保障。
2025-01-01 14:03
2025-01-01 13:18
2025-01-01 13:02
2025-01-01 12:51
2025-01-01 11:55