1.RocketMQ消费进度管理浅析
2.Xxl-Job中的定时定概念和使用详解
3.一文带你搞懂xxl-job(分布式任务调度平台)
RocketMQ消费进度管理浅析
幂等性的取与舍
分布式平台上幂等性相关语义的保证,是广播广播我们构造安全、可信赖系统的源码源码永恒追求。作为异步、设置解耦通常实现方案下的定时定最优选,我时常思考RocketMQ设计者经历怎样的广播广播社区首页源码断舍离?
众所周知消息队列关于消息消费这一概念的落地实现,大体上分为三种情形:
Atmostonce
Atleastonce
Exactlyonce
翻译一下就是源码源码:
至多消费一次
至少消费一次
保证消费一次
很显然如果至多消费一次,势必造成消息丢失;至少消费一次就对我们的设置业务系统提出更高的要求,保证消费一次看似美好时则需要MQ系统背负沉重代价。定时定RocketMQ丝毫不犹豫的广播广播选择Atleastonce。将幂等的源码源码保证大胆的交给开发者,不仅仅体现作者对MQ性能与功能两者矛盾的设置无奈,同时也体现了对广大开发人员的定时定信任。
消费现状概述上述论调虽然客观真实但不免有些悲观主义的广播广播意味,按照上文的源码源码理解我们业务体统需要倚仗ta,但我们又要时刻防备ta,因为一个不小心可能就会出纰漏,这还真是一个让人又爱又怕的存在。
读到这里,笔者似乎把ta描绘成了一个顽皮的孩子,但其实有些言重了,因为以我阅读源码的理解,业务系统没有异常,MQ所在的物理运行环境又比较健康的情况下,其实比较难以出现多次重复消费。
RocketMQ的幂等往往是由业务系统的异常逻辑,或者网络,或者不确定的运行环境破坏的。绝大多数情形下确定无疑ta依然是一个GoodBoy。
按照我们对消息系统的朴素理解,消息的csdn qt 算法源码消费过程满足以下几个规律:
虽然不会严格的按照投递顺序进行消费,但大体上保持先进先出这个趋势
消息应该被精确的记录当前消费状态
总有一个角色负责统计、持久化消费偏移量带着经验主义我们看看作者都为平稳消费与进度管理做出了哪些努力。
注:RocketMQ的顺序消费模型是可以严格保证顺序的。
OffsetStore消息被消费后也就失去了在ProcessQueue中停留的资格,ProcessQueue会删除该消息,并返回当前的最小偏移量放置到消息进度表中。很容易想象,如果这个消费进度不加以持久化,那么每次启动都要重头消费,显然无法接受,可是如何持久化,又持久化到何处呢?
RocketMQ支持两种订阅模式:
集群消费模式:默认的消费模式,所有消息只需要被同组任一消费者消费一次即可,大家共享订阅Topic下的消费偏移量。
广播消费模式:各个消费者的消费行为是完全独立的,订阅Topic下所有的消息都需要被该组下所有消费者消费。针对两种消费模型的特性,容易发现二者并不好一概而论,理想的实现是划分为两个策略,一个集中到Broker管理,一个分散出去由消费者管理。OffsetStore接口负责相关事宜,源码应证了我们猜想。先来看看OffsetStore接口定义:
publicinterfaceOffsetStore{ /***从消息进度存储文件加载消息进度到内存*/voidload()throwsMQClientException;/***Getoffsetfromlocalstorage*@returnThefetchedoffset*/longreadOffset(MessageQueuemq,ReadOffsetTypetype);/***Removeoffset*/voidremoveOffset(MessageQueuemq);Map<MessageQueue,Long>cloneOffsetTable(Stringtopic);/***更新内存中的消息进度*Updatetheoffset,storeitinmemory*/voidupdateOffset(MessageQueuemq,longoffset,booleanincreaseOnly);/***保留所有偏移量,可能在本地存储或远程服务器*Persistalloffsets,maybeinlocalstorageorremotenameserver*/voidpersistAll(Set<MessageQueue>mqs);/***保留指定消息队列偏移量,可能在本地存储或远程服务器*Persisttheoffset,maybeinlocalstorageorremotenameserver*/voidpersist(MessageQueuemq);/***更新存储在Broker端的消息消费进度,使用集群模式*/voidupdateConsumeOffsetToBroker(MessageQueuemq,longoffset,booleanisOneway)throwsRemotingException,MQBrokerException,InterruptedException,MQClientException;}较之源码,方法排列被我调换了顺序,需要着重关注的我放到了后面。
注:如果没有RocketMQ源码阅读经历ProcessQueue显得有些突兀,泄漏源码会怎样你可以将ta理解为消息在Consumer端的载体、物理队列某一个截取片段。作者如此定义ta:Queueconsumptionsnapshot
LocalFileOffsetStore广播模式下消息进度保留在Consumer端,文件遵守约定放置在可配置的固定目录下,文件路径如下:
publicclassLocalFileOffsetStoreimplementsOffsetStore{ /***存储文件夹路径可定制*/publicfinalstaticStringLOCAL_OFFSET_STORE_DIR=System.getProperty("rocketmq.client.localOffsetStoreDir",System.getProperty("user.home")+File.separator+".rocketmq_offsets");/***构造方法拼接出了文件的完整路径*/publicLocalFileOffsetStore(MQClientInstancemQClientFactory,StringgroupName){ this.mQClientFactory=mQClientFactory;this.groupName=groupName;this.storePath=LOCAL_OFFSET_STORE_DIR+File.separator+this.mQClientFactory.getClientId()+File.separator+this.groupName+File.separator+"offsets.json";}}默认在用户路径下一层创建一个".RocketMQ_offsets"文件夹,注意这里有一个细节,文件夹以"."开头,在Linux系统中属于隐藏文件,需要加-a参数才能被显示。为了便于理解,下图展示了一个文件夹路径和一个Offset持久化文件的路径。
广播模式下Consumer#start()之后会调用OffsetStore.load()来加载消费进度,其原理就是根据约定拼接处文件全路径之后读取相应文件,然后序列化为OffsetSerializeWrapper对象:
publicclassOffsetSerializeWrapperextendsRemotingSerializable{ /*详细记录每个队列当前消费进度*/privateConcurrentMap<MessageQueue,AtomicLong>offsetTable=newConcurrentHashMap<>();}假设我们有个发送短信的服务订阅"SMS_prod"Topic,那么形成的Json如下所示:注意offsetTable属性也是一个Json,而且key是MessageQueue对象,valule是一个数字表示偏移量。
{ "offsetTable":{ { "topic":"SMS_prod","brokerName":"broker0""queueId":0}:,{ "topic":"SMS_prod","brokerName":"broker0""queueId":1}:,}}既然可以在指定文件load关键信息,自然就有相关机制负责写入。还记得上文提到的persistAll方法吗?
publicvoidpersistAll(Set<MessageQueue>mqs){ /*构造OffsetSerializeWrapper对象*/OffsetSerializeWrapperoffsetSerializeWrapper=newOffsetSerializeWrapper();for(Map.Entry<MessageQueue,AtomicLong>entry:offsetTable.entrySet()){ if(mqs.contains(entry.getKey())){ AtomicLongoffset=entry.getValue();offsetSerializeWrapper.getOffsetTable().put(entry.getKey(),offset);}}/*将offsetSerializeWrapper对象序列化*/StringjsonString=offsetSerializeWrapper.toJson(true);/*将序列化好的offsetSerializeWrapper写入文件*/MixAll.string2File(jsonString,this.storePath);}对offsets.json的相关操作都被封装在MixAll工具类中:
MixAll.file2String:将文件读取出来
MixAll.string2File:将序列化好的对象写入文件
RemoteBrokerOffsetStore因为偏移量维护在Broker端,所以该实现的load方法仅仅是一个声明。构造方法不需要计算文件路径也尤为简单,二者的offsetTable属性是一致的。我们着重来看看集群消费模式下如何保存消息消费进度。
publicvoidpersistAll(Set<MessageQueue>mqs){ HashSet<MessageQueue>unusedMQ=newHashSet<>();for(Map.Entry<MessageQueue,AtomicLong>entry:offsetTable.entrySet()){ MessageQueuemq=entry.getKey();AtomicLongoffset=entry.getValue();if(offset!=null){ if(mqs.contains(mq)){ this.updateConsumeOffsetToBroker(mq,offset.get());}else{ unusedMQ.add(mq);}}}if(!unusedMQ.isEmpty()){ for(MessageQueuemq:unusedMQ){ this.offsetTable.remove(mq);}}}不用深入研究,我们应该能发现至少两处不同:
粒度不同:广播模式是直接一下子把整个offsetTable持久化,而集群模式细化到了entry级别。
调用方式不同:广播模式是直接JVM内部调用写入文件即可,而集群模式需要RPC调用参与。这里有必要强调一下二者产生的exe怎样查看源码offset.json文件也是有区别的,下文我会分析,同时也带大家了解该RPC过程。
RPC调用栈:RemoteBrokerOffsetStore#persistAll()->RemoteBrokerOffsetStore#updateConsumeOffsetToBroker()组装好RPC请求头UpdateConsumerOffsetRequestHeader对象->MQClientAPIImpl#updateConsumerOffsetOneway()组装好RPC请求对象RemotingCommand->NettyRemotingClient#invokeSync()发起RPC调用更新偏移量的RPC调用类型是RequestCode.UPDATE_CONSUMER_OFFSET顺着这个枚举来看看Broker端的相关处理:ConsumerManageProcessor.updateConsumerOffset()->ConsumerOffsetManager.commitOffset()追踪源码发现,其实每次Consumer进行RPC调用上报自己的消费进度,Broker接收之后并没有立即进行持久化,而是直接更新到内存中。
privatevoidcommitOffset(StringclientHost,Stringkey,intqueueId,longoffset){ Stringkey=topic+TOPIC_GROUP_SEPARATOR+group;ConcurrentMap<Integer,Long>map=offsetTable.get(key);if(Objects.isNull(map)){ map=newConcurrentHashMap<>();map.put(queueId,offset);this.offsetTable.put(key,map);}else{ LongstoreOffset=map.put(queueId,offset);}}TOPIC_GROUP_SEPARATOR为定义的常量:"@",之前我们提到过二者json有些许区别,offsetTable的key变成了一个拼接出来的字符串,该字符串左侧是TopicName,右侧是ConsumeGroupName中间用@符号连接。方便理解,我把这个json也展示出来:/***注意一下这个key:%RETRY%ConsumeGroup*笔者后期会有专门文章分析*/{ "offsetTable":{ "Topic@ConsumeGroup":{ 0:,1:,2:,3:},"%RETRY%ConsumeGroup":{ 0:0}}}持久化两种文件持久化机制没有什么大的区别定时任务触发,或者消费端正常关闭执行shotdown()之前手动触发。
广播模式定时任务定义在MQClientInstance中,MQClientInstance对象在被实例化之后调用start()时启动该定时任务。定时任务的时间间隔支持配置默认是ms,延时ms之后开始执行。
publicvoidstart()throwsMQClientException{ this.scheduledExecutorService.scheduleAtFixedRate(()->{ try{ MQClientInstance.this.persistAllConsumerOffset();}catch(Exceptione){ log.error("ScheduledTaskpersistAllConsumerOffsetexception",e);}},*,this.clientConfig.getPersistConsumerOffsetInterval(),TimeUnit.MILLISECONDS);}集群模式定时任务定义BrokerController中,BrokerController对象在被实例化之后会有一系列初始化动作,initialize()会启动该定时任务。定时任务的时间间隔支持配置默认是ms,延时ms之后开始执行。
publicclassLocalFileOffsetStoreimplementsOffsetStore{ /***存储文件夹路径可定制*/publicfinalstaticStringLOCAL_OFFSET_STORE_DIR=System.getProperty("rocketmq.client.localOffsetStoreDir",System.getProperty("user.home")+File.separator+".rocketmq_offsets");/***构造方法拼接出了文件的完整路径*/publicLocalFileOffsetStore(MQClientInstancemQClientFactory,StringgroupName){ this.mQClientFactory=mQClientFactory;this.groupName=groupName;this.storePath=LOCAL_OFFSET_STORE_DIR+File.separator+this.mQClientFactory.getClientId()+File.separator+this.groupName+File.separator+"offsets.json";}}0重复消费原理分析了那么久,我想要传达的观点就是正常使用的前提下重复消费的原因一定跟offset上报,持久化有关系。
集群消费过程中Consumer意外宕机,offset没有上报导致重复消费
集群消费过程中Broker意外宕机,offset没有将最新的偏移量持久化导致重复消费
广播消费过程Consumer意外宕机,offset没有持久化到本地文件导致重复消费
offset.json文件意外损坏或删除,进度丢失导致重复消费
offset.json文件被篡改,直播类app源码进度不准确导致重复消费
还有一种是因为开发者返回了错误的ACK标示,导致Rocket误判以为消费失败,触发重试逻辑导致的重复消费。
Xxl-Job中的概念和使用详解
一、调度中心
调度中心是独立的Web服务,专门用于触发定时任务执行。它提供管理界面,方便用户配置和控制定时任务的执行逻辑。调度中心依赖数据库存储数据,并支持集群模式,但集群内各实例间无直接通信,数据共享通过数据库实现。
二、执行器
执行器是执行具体任务的实体,与服务实例一一对应。每个执行器有自己的命名,通常推荐以服务名命名,以方便识别。
三、任务
任务就是定时执行的逻辑,一个执行器可以包含多个任务。调度中心负责管理任务的触发逻辑,执行器则负责实际执行任务。
创建调度中心与执行器:
1. 下载调度中心源码,调整数据库连接信息,执行指定的SQL脚本文件。
2. 启动调度中心,可以打包成jar或直接运行,访问指定URL即可访问控制台。
3. 添加执行器与任务:设置执行器名字,指定任务名称和选择任务执行模式。
实现步骤:
1. 引入依赖,配置XxlJobSpringExecutor,并在服务中使用@XxlJob注解定义任务。
2. 任务执行:通过反射或动态修改代码实现任务逻辑,配置执行器与任务。
核心原理:
执行器启动时执行初始化操作,包括JobHandler初始化,创建Http服务器和注册到调度中心。JobHandler封装定时任务,负责执行任务。调度中心会计算任务触发时机,通过查询数据库获取任务信息,并按照预读时间决定执行哪些任务。
任务触发流程:
1. 调度中心启动后,开启调度线程,查询并调度任务执行。
2. 调度线程将任务提交到线程池执行。
3. 执行器根据路由策略选择执行器实例,执行任务并返回结果给调度中心。
优化与路由策略:
1. 使用线程池异步执行任务触发,避免阻塞调度效率。
2. 实现快慢线程池,优化任务触发时间较长的任务处理。
3. 路由策略多样,包括分片广播、一致性Hash、LRU等,确保任务均衡分配。
执行与结果回调:
执行器创建单独线程执行任务,并将结果异步回调给调度中心。至此,任务执行过程完成。
总结:
通过调度中心和执行器协同工作,实现灵活的定时任务管理。核心原理包括初始化、任务调度、路由选择以及执行结果回调,通过优化策略保证任务高效执行。Xxl-Job提供丰富功能和灵活配置,适用于各类定时任务场景。
一文带你搞懂xxl-job(分布式任务调度平台)
本篇文章主要记录项目中遇到的 xxl-job 的实战,希望能通过这篇文章告诉读者们什么是 xxl-job 以及怎么使用 xxl-job 并分享一个实战案例。
xxl-job 是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。设计思想是将调度行为抽象形成 调度中心 平台,平台本身不承担业务逻辑,而是负责发起 调度请求 后,由 执行器 接收调度请求并执行 任务,这里的 任务 抽象为 分散的 JobHandler。通过这种方式即可实现 调度 与 任务 相互解耦,从而提高系统整体的稳定性和拓展性。
任务调度指的是系统在约定的指定时间自动去执行指定的任务的过程。在开发项目时大家可能遇到过类似的场景问题,如系统需要定时在每天0点进行数据备份、活动开始前几小时预热执行一些前置业务、定时对 MQ 消息表的发送装填等。这些场景问题都可以通过任务调度 来解决。
单体系统 中有许多实现 任务调度 的方式,如多线程方式、Timer 类、Spring Tasks 等等。这里比较常用的是 Spring Tasks(通过 @EnableScheduling + @Scheduled 的注解可以自定义定时任务,有兴趣的可以去了解一下)。
在分布式系统下,每个服务都可以搭建为集群,这样的好处是可以将任务切片分给每一个服务从而实现并行执行,提高任务调度的处理效率。那么为什么分布式系统 不能使用 单体系统 的任务调度实现方式呢?在集群服务下,如果还是使用每台机器按照单体系统的任务调度实现方式实现的话,会出现下面这四个问题:怎么做到对任务的控制(如何避免任务重复执行)、如果某台机器宕机了,会不会存在任务丢失、如果要增加服务实例,怎么做到弹性扩容、如何做到对任务调度的执行情况统一监测。通过上面的问题可以了解到分布式系统下需要一个满足高可用、容错管理、负载均衡等功能的任务调度平台来实现任务调度。
xxl-job 分布式任务调度系统是一个开源软件,可以在 github 或 gitee 上查看和下载 xxl-job 的源码。在 docker 下安装 xxl-job、创建映射容器的文件目录、在/mydata/xxl-job 的目录下创建 application.properties 文件、导入 tables_xxl-job.sql 文件到指定的数据库、配置参数如数据库位置、访问口令等。
在 Spring Boot 项目中,导入 xxl-job 的 maven 依赖,配置application.yml 文件指定调度中心地址、访问口令、执行器名称和端口等属性,编写配置类配置自定义任务和执行器,完成 SpringBoot 集成 xxl-job 实现分布式任务调度的全过程。
实战案例:当前项目需要对上传到分布式文件系统 minio 中的视频文件进行统一格式的视频转码操作。利用 xxl-job 的方式以任务调度的方式定时处理视频转码操作,以任务调度的方式,可以使得视频转码操作不会阻塞主线程,避免影响主要业务的吞吐量;以集群服务分片接收任务的方式,可以将任务均分给每个机器使得任务调度可以并行执行,提高总任务处理时间以及降低单台机器 CPU 的开销。
xxl-job 执行流程图:在集群部署时,配置路由策略中选择分片广播的方式,可以使一次任务调度会广播触发集群中所有的执行器执行一次任务,并且可以向系统传递分片参数。利用这一特性可以根据当前执行器的分片序号和分片总数来获取对应的任务记录。通过 Bean 模式(基于方法)获取分片序号和分片总数,编写 sql 获取任务记录,实现对集群服务均分任务的操作。
确保任务不会被重复消费:通过幂等性实现,依靠任务的状态(未处理1;处理中2;处理失败3;处理成功4)通过比较和设置的方式只有在状态为未处理或处理失败时才能设置为处理中,避免多个执行器同时处理该任务。设置调度过期策略和阻塞处理策略保证真正的幂等性。
编写完成所有任务:分片视频转码处理,通过分片广播拿到的参数以取模的方式获取当前执行器所属的任务记录集合,遍历集合并发执行任务,使用乐观锁抢占当前任务,执行任务过程包含分布式文件系统下载、视频转码、上传转码后的视频、更新任务状态(处理成功),使用 JUC 工具类 CountDownLatch 实现所有任务执行完后才退出方法,中间使用 xxl-job 的日志记录错误信息和执行结果。
清理任务表中转码成功的任务的记录并将其插入任务历史表,视频补偿机制处理任务超时情况下的任务,做出补偿,处理失败次数大于3次的任务,做出补偿。测试并查看日志,准备好的任务表记录,启动三台媒资服务器,并开启任务,可以单独查看每个任务的日志,通过日志中的执行日志查看具体日志信息,可以看到直接为了测试改错的路径导致下载视频出错,查看数据库表的变化,核心的视频转码任务执行成功,并且逻辑正确,能够起到分布式任务调度的作用。
2024-11-29 06:442049人浏览
2024-11-29 06:11790人浏览
2024-11-29 05:58809人浏览
2024-11-29 05:30417人浏览
2024-11-29 04:45149人浏览
2024-11-29 04:34667人浏览
1.微隔离沙盒/微隔离沙箱2.CTF入门学习籽料非常详细)零基础入门到精通,收藏这一篇就够了!文末自取)3.常见的 Webshell 查杀工具4.SDS是什么微隔离沙盒/微隔离沙箱 微隔离沙盒/微
1.��ͷ����ѡ�ɹ�ʽԴ����ͷ����ѡ�ɹ�ʽԴ�� A1:=BARSLAST(REF(CROSS(KDJ.K#MONTH,KDJ.D#MONTH),1)); 月底背离:REF(C
1.2024中国燕窝十大品牌你都知道哪些呢?2.燕窝品牌排行榜前十名,全国燕窝品牌十大排名_燕窝哪个牌子好有什么品牌推荐3.什么品牌的燕窝最好?4.燕之屋燕窝靠谱吗?5.想知道正典燕窝这个牌子到底好不