1.RocketMQ消费进度管理浅析
2.Java并发源码concurrent包
3.关于java中DecimalFormat的源码问题。
4.面试官说:你来设计一个短链接生成系统吧
RocketMQ消费进度管理浅析
幂等性的源码取与舍
分布式平台上幂等性相关语义的保证,是源码我们构造安全、可信赖系统的源码永恒追求。作为异步、源码解耦通常实现方案下的源码健康打卡源码最优选,我时常思考RocketMQ设计者经历怎样的源码断舍离?
众所周知消息队列关于消息消费这一概念的落地实现,大体上分为三种情形:
Atmostonce
Atleastonce
Exactlyonce
翻译一下就是源码:
至多消费一次
至少消费一次
保证消费一次
很显然如果至多消费一次,势必造成消息丢失;至少消费一次就对我们的源码业务系统提出更高的要求,保证消费一次看似美好时则需要MQ系统背负沉重代价。源码RocketMQ丝毫不犹豫的源码选择Atleastonce。将幂等的源码保证大胆的交给开发者,不仅仅体现作者对MQ性能与功能两者矛盾的源码无奈,同时也体现了对广大开发人员的源码信任。
消费现状概述上述论调虽然客观真实但不免有些悲观主义的源码意味,按照上文的理解我们业务体统需要倚仗ta,但我们又要时刻防备ta,因为一个不小心可能就会出纰漏,这还真是一个让人又爱又怕的存在。
读到这里,笔者似乎把ta描绘成了一个顽皮的孩子,但其实有些言重了,因为以我阅读源码的理解,业务系统没有异常,MQ所在的物理运行环境又比较健康的情况下,其实比较难以出现多次重复消费。
RocketMQ的幂等往往是由业务系统的异常逻辑,或者网络,或者不确定的运行环境破坏的。绝大多数情形下确定无疑ta依然是一个GoodBoy。
按照我们对消息系统的朴素理解,消息的ffactivex源码消费过程满足以下几个规律:
虽然不会严格的按照投递顺序进行消费,但大体上保持先进先出这个趋势
消息应该被精确的记录当前消费状态
总有一个角色负责统计、持久化消费偏移量带着经验主义我们看看作者都为平稳消费与进度管理做出了哪些努力。
注:RocketMQ的顺序消费模型是可以严格保证顺序的。
OffsetStore消息被消费后也就失去了在ProcessQueue中停留的资格,ProcessQueue会删除该消息,并返回当前的最小偏移量放置到消息进度表中。很容易想象,如果这个消费进度不加以持久化,那么每次启动都要重头消费,显然无法接受,可是如何持久化,又持久化到何处呢?
RocketMQ支持两种订阅模式:
集群消费模式:默认的消费模式,所有消息只需要被同组任一消费者消费一次即可,大家共享订阅Topic下的消费偏移量。
广播消费模式:各个消费者的消费行为是完全独立的,订阅Topic下所有的消息都需要被该组下所有消费者消费。针对两种消费模型的特性,容易发现二者并不好一概而论,理想的实现是划分为两个策略,一个集中到Broker管理,一个分散出去由消费者管理。OffsetStore接口负责相关事宜,源码应证了我们猜想。先来看看OffsetStore接口定义:
publicinterfaceOffsetStore{ /***从消息进度存储文件加载消息进度到内存*/voidload()throwsMQClientException;/***Getoffsetfromlocalstorage*@returnThefetchedoffset*/longreadOffset(MessageQueuemq,ReadOffsetTypetype);/***Removeoffset*/voidremoveOffset(MessageQueuemq);Map<MessageQueue,Long>cloneOffsetTable(Stringtopic);/***更新内存中的消息进度*Updatetheoffset,storeitinmemory*/voidupdateOffset(MessageQueuemq,longoffset,booleanincreaseOnly);/***保留所有偏移量,可能在本地存储或远程服务器*Persistalloffsets,maybeinlocalstorageorremotenameserver*/voidpersistAll(Set<MessageQueue>mqs);/***保留指定消息队列偏移量,可能在本地存储或远程服务器*Persisttheoffset,maybeinlocalstorageorremotenameserver*/voidpersist(MessageQueuemq);/***更新存储在Broker端的消息消费进度,使用集群模式*/voidupdateConsumeOffsetToBroker(MessageQueuemq,longoffset,booleanisOneway)throwsRemotingException,MQBrokerException,InterruptedException,MQClientException;}较之源码,方法排列被我调换了顺序,需要着重关注的我放到了后面。
注:如果没有RocketMQ源码阅读经历ProcessQueue显得有些突兀,你可以将ta理解为消息在Consumer端的hownet 源码载体、物理队列某一个截取片段。作者如此定义ta:Queueconsumptionsnapshot
LocalFileOffsetStore广播模式下消息进度保留在Consumer端,文件遵守约定放置在可配置的固定目录下,文件路径如下:
publicclassLocalFileOffsetStoreimplementsOffsetStore{ /***存储文件夹路径可定制*/publicfinalstaticStringLOCAL_OFFSET_STORE_DIR=System.getProperty("rocketmq.client.localOffsetStoreDir",System.getProperty("user.home")+File.separator+".rocketmq_offsets");/***构造方法拼接出了文件的完整路径*/publicLocalFileOffsetStore(MQClientInstancemQClientFactory,StringgroupName){ this.mQClientFactory=mQClientFactory;this.groupName=groupName;this.storePath=LOCAL_OFFSET_STORE_DIR+File.separator+this.mQClientFactory.getClientId()+File.separator+this.groupName+File.separator+"offsets.json";}}默认在用户路径下一层创建一个".RocketMQ_offsets"文件夹,注意这里有一个细节,文件夹以"."开头,在Linux系统中属于隐藏文件,需要加-a参数才能被显示。为了便于理解,下图展示了一个文件夹路径和一个Offset持久化文件的路径。
广播模式下Consumer#start()之后会调用OffsetStore.load()来加载消费进度,其原理就是根据约定拼接处文件全路径之后读取相应文件,然后序列化为OffsetSerializeWrapper对象:
publicclassOffsetSerializeWrapperextendsRemotingSerializable{ /*详细记录每个队列当前消费进度*/privateConcurrentMap<MessageQueue,AtomicLong>offsetTable=newConcurrentHashMap<>();}假设我们有个发送短信的服务订阅"SMS_prod"Topic,那么形成的Json如下所示:注意offsetTable属性也是一个Json,而且key是MessageQueue对象,valule是一个数字表示偏移量。
{ "offsetTable":{ { "topic":"SMS_prod","brokerName":"broker0""queueId":0}:,{ "topic":"SMS_prod","brokerName":"broker0""queueId":1}:,}}既然可以在指定文件load关键信息,自然就有相关机制负责写入。还记得上文提到的persistAll方法吗?
publicvoidpersistAll(Set<MessageQueue>mqs){ /*构造OffsetSerializeWrapper对象*/OffsetSerializeWrapperoffsetSerializeWrapper=newOffsetSerializeWrapper();for(Map.Entry<MessageQueue,AtomicLong>entry:offsetTable.entrySet()){ if(mqs.contains(entry.getKey())){ AtomicLongoffset=entry.getValue();offsetSerializeWrapper.getOffsetTable().put(entry.getKey(),offset);}}/*将offsetSerializeWrapper对象序列化*/StringjsonString=offsetSerializeWrapper.toJson(true);/*将序列化好的offsetSerializeWrapper写入文件*/MixAll.string2File(jsonString,this.storePath);}对offsets.json的相关操作都被封装在MixAll工具类中:
MixAll.file2String:将文件读取出来
MixAll.string2File:将序列化好的对象写入文件
RemoteBrokerOffsetStore因为偏移量维护在Broker端,所以该实现的load方法仅仅是一个声明。构造方法不需要计算文件路径也尤为简单,二者的offsetTable属性是一致的。我们着重来看看集群消费模式下如何保存消息消费进度。
publicvoidpersistAll(Set<MessageQueue>mqs){ HashSet<MessageQueue>unusedMQ=newHashSet<>();for(Map.Entry<MessageQueue,AtomicLong>entry:offsetTable.entrySet()){ MessageQueuemq=entry.getKey();AtomicLongoffset=entry.getValue();if(offset!=null){ if(mqs.contains(mq)){ this.updateConsumeOffsetToBroker(mq,offset.get());}else{ unusedMQ.add(mq);}}}if(!unusedMQ.isEmpty()){ for(MessageQueuemq:unusedMQ){ this.offsetTable.remove(mq);}}}不用深入研究,我们应该能发现至少两处不同:
粒度不同:广播模式是直接一下子把整个offsetTable持久化,而集群模式细化到了entry级别。
调用方式不同:广播模式是直接JVM内部调用写入文件即可,而集群模式需要RPC调用参与。这里有必要强调一下二者产生的offset.json文件也是有区别的,下文我会分析,同时也带大家了解该RPC过程。源码1780
RPC调用栈:RemoteBrokerOffsetStore#persistAll()->RemoteBrokerOffsetStore#updateConsumeOffsetToBroker()组装好RPC请求头UpdateConsumerOffsetRequestHeader对象->MQClientAPIImpl#updateConsumerOffsetOneway()组装好RPC请求对象RemotingCommand->NettyRemotingClient#invokeSync()发起RPC调用更新偏移量的RPC调用类型是RequestCode.UPDATE_CONSUMER_OFFSET顺着这个枚举来看看Broker端的相关处理:ConsumerManageProcessor.updateConsumerOffset()->ConsumerOffsetManager.commitOffset()追踪源码发现,其实每次Consumer进行RPC调用上报自己的消费进度,Broker接收之后并没有立即进行持久化,而是直接更新到内存中。
privatevoidcommitOffset(StringclientHost,Stringkey,intqueueId,longoffset){ Stringkey=topic+TOPIC_GROUP_SEPARATOR+group;ConcurrentMap<Integer,Long>map=offsetTable.get(key);if(Objects.isNull(map)){ map=newConcurrentHashMap<>();map.put(queueId,offset);this.offsetTable.put(key,map);}else{ LongstoreOffset=map.put(queueId,offset);}}TOPIC_GROUP_SEPARATOR为定义的常量:"@",之前我们提到过二者json有些许区别,offsetTable的key变成了一个拼接出来的字符串,该字符串左侧是TopicName,右侧是ConsumeGroupName中间用@符号连接。方便理解,我把这个json也展示出来:/***注意一下这个key:%RETRY%ConsumeGroup*笔者后期会有专门文章分析*/{ "offsetTable":{ "Topic@ConsumeGroup":{ 0:,1:,2:,3:},"%RETRY%ConsumeGroup":{ 0:0}}}持久化两种文件持久化机制没有什么大的区别定时任务触发,或者消费端正常关闭执行shotdown()之前手动触发。
广播模式定时任务定义在MQClientInstance中,MQClientInstance对象在被实例化之后调用start()时启动该定时任务。定时任务的时间间隔支持配置默认是ms,延时ms之后开始执行。
publicvoidstart()throwsMQClientException{ this.scheduledExecutorService.scheduleAtFixedRate(()->{ try{ MQClientInstance.this.persistAllConsumerOffset();}catch(Exceptione){ log.error("ScheduledTaskpersistAllConsumerOffsetexception",e);}},*,this.clientConfig.getPersistConsumerOffsetInterval(),TimeUnit.MILLISECONDS);}集群模式定时任务定义BrokerController中,BrokerController对象在被实例化之后会有一系列初始化动作,initialize()会启动该定时任务。定时任务的时间间隔支持配置默认是ms,延时ms之后开始执行。
publicclassLocalFileOffsetStoreimplementsOffsetStore{ /***存储文件夹路径可定制*/publicfinalstaticStringLOCAL_OFFSET_STORE_DIR=System.getProperty("rocketmq.client.localOffsetStoreDir",System.getProperty("user.home")+File.separator+".rocketmq_offsets");/***构造方法拼接出了文件的完整路径*/publicLocalFileOffsetStore(MQClientInstancemQClientFactory,StringgroupName){ this.mQClientFactory=mQClientFactory;this.groupName=groupName;this.storePath=LOCAL_OFFSET_STORE_DIR+File.separator+this.mQClientFactory.getClientId()+File.separator+this.groupName+File.separator+"offsets.json";}}0重复消费原理分析了那么久,我想要传达的观点就是正常使用的前提下重复消费的原因一定跟offset上报,持久化有关系。
集群消费过程中Consumer意外宕机,offset没有上报导致重复消费
集群消费过程中Broker意外宕机,offset没有将最新的偏移量持久化导致重复消费
广播消费过程Consumer意外宕机,offset没有持久化到本地文件导致重复消费
offset.json文件意外损坏或删除,进度丢失导致重复消费
offset.json文件被篡改,进度不准确导致重复消费
还有一种是因为开发者返回了错误的ACK标示,导致Rocket误判以为消费失败,触发重试逻辑导致的supervivi 源码重复消费。
Java并发源码concurrent包
深入JAVA杨京京:Java并发源码concurrent包
在JDK1.5之前,Java并发设计复杂且对程序员负担重,需考虑性能、死锁、公平性等。JDK1.5后,引入了java.util.concurrent工具包简化并发,提供多种并发模型,减轻开发负担。
Java并发工具包java.util.concurrent源自JSR-,包含用于并发程序的通用功能。该包由Doug Lea开发,旨在提供线程安全的容器、同步类、原子对象等工具,减少并发编程的复杂性。
并发容器如阻塞队列、非阻塞队列和转移队列等,实现线程安全功能,不使用同步关键字,为并发操作提供便利。
同步类如Lock等,提供线程之间的同步机制,确保数据一致性。原子对象类如AtomicInteger、AtomicLong等,提供高效的原子操作,避免同步锁,实现线程安全。
原子操作类在多线程环境中实现数据同步和互斥,确保数据一致性。实际应用场景包括线程安全的数据结构和算法实现。
java.util.concurrent.atomic包中的原子操作类,使用硬件支持的原子操作实现数据的原子性,提高并发程序的效率和性能。
值得一提的是,Java并发工具包还包含了Fork-Join框架,通过分解和合并任务,实现高效并行处理,减少等待其他线程完成时间,并利用工作偷取技术优化线程执行效率。
Java线程池如ThreadLocalRandom类,提供高性能随机数生成,通过种子内部生成和不共享随机对象减少资源争用和消耗,提高并发程序的性能。
关于java中DecimalFormat的问题。
把newSalary转为double型,然后再format就好了,看源码就会知道,String类型是不被允许的public final StringBuffer format(Object number,StringBuffer toAppendTo,
FieldPosition pos) {
if (number instanceof Long || number instanceof Integer ||
number instanceof Short || number instanceof Byte ||
number instanceof AtomicInteger ||
number instanceof AtomicLong ||
(number instanceof BigInteger &&
((BigInteger)number).bitLength () < )) {
return format(((Number)number).longValue(), toAppendTo, pos);
} else if (number instanceof BigDecimal) {
return format((BigDecimal)number, toAppendTo, pos);
} else if (number instanceof BigInteger) {
return format((BigInteger)number, toAppendTo, pos);
} else if (number instanceof Number) {
return format(((Number)number).doubleValue(), toAppendTo, pos);
} else {
throw new IllegalArgumentException("Cannot format given Object as a Number");
}
}
面试官说:你来设计一个短链接生成系统吧
引言
相信大家在生活中,特别是最近的双十一活动期间,会收到很多短信,而那些短信都有两个特征,第一个是几乎都是垃圾短信,这个特点此处可以忽略不计,第二个特点是链接很短,比如下面这个:
我们知道,短信有些是有字数限制的,直接放一个带满各种参数的链接,不合适,另外一点是,不想暴露参数。好处无非以下:
太长的链接容易被限制长度
短链接看着简洁,长链接看着容易懵
安全,不想暴露参数
可以统一链接转换,当然也可以实现统计点击次数等操作
那背后的原理是什么呢?怎么实现的?让你实现这样的系统,你会怎么设计呢?来自于某鹅场面试官
短链接的原理短链接展示的逻辑这里最重要的知识点是重定向,先复习一下/tzHLFw与/gmccapp/webpage/payPhonemoney/index.html?channel=之间的装换是怎么样的呢?前面路径不变,变化的是后面,也就是tzHLFw与gmccapp/webpage/payPhonemoney/index.html?channel=之间的转换。
实际也很简单,就是数据库里面的一条数据,一个id对应长链接(相当于全局的发号器,全局唯一的ID):
idurl1/gmccapp/webpage/payPhonemoney/index.html?channel=这里用到的,也就是我们之前说过的分布式全局唯一ID,如果我们直接用id作为参数,貌似也可以:/1,访问这个链接时,去数据库查询获得真正的url,再重定向。
单机的唯一ID很简单,用原子类AtomicLong就可以,但是分布式的就不行了,简单点可以用 redis,或者数据库自增,或者可以考虑Zookeeper之类的。
id 转换策略但是直接用递增的数字,有两个坏处:
数字很大的时候,还是很长
递增的数字,不安全,规律性太强了
明显我们平时看到的链接也不是数字的,一般都是大小写字母加上数字。为了缩短链接的长度,我们必须把id转换掉,比如我们的短链接由a-z,A-Z,0-9组成,相当于进制的数字,将id转换成为进制的数字:
publicclassShortUrl{ privatestaticfinalStringBASE="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";publicstaticStringtoBase(longnum){ StringBuilderresult=newStringBuilder();do{ inti=(int)(num%);result.append(BASE.charAt(i));num/=;}while(num>0);returnresult.reverse().toString();}publicstaticlongtoBase(Stringstr){ longresult=0;for(inti=0;i<str.length();i++){ result=result*+BASE.indexOf(str.charAt(i));}returnresult;}publicstaticvoidmain(String[]args){ //tzHLFwSystem.out.println(toBase("tzHLFw"));System.out.println(toBase(L));}}id转 位的key 或者key装换成为id都已经实现了,不过计算还是比较耗时的,不如加个字段存起来,于是数据库变成了:
idkeyurltzHLFw/gmccapp/webpage/payPhonemoney/index.html?channel=但是这样还是很容易被猜出这个id和key的对应关系,要是被遍历访问,那还是很不安全的,如果担心,可以随机将短链接的字符顺序打乱,或者在适当的位置加上一些随机生成的字符,比如第1,4,5位是随机字符,其他位置不变,只要我们计算的时候,将它对应的关系存到数据库,我们就可以通过连接的key找到对应的url。(值得注意的是,key必须是全局唯一的,如果冲突,必须重新生成)
一般短链接都有过期时间,那么我们也必须在数据库里面加上对应的字段,访问的时候,先判断是否过期,过期则不给予重定向。
性能考虑如果有很多短链接暴露出去了,数据库里面数据很多,这个时候可以考虑使用缓存优化,生成的时候顺便把缓存写入,然后读取的时候,走缓存即可,因为一般短链接和长链接的关系不会修改,即使修改,也是很低频的事情。
如果系统的id用完了怎么办?这种概率很小,如果真的发生,可以重用旧的已经失效的id号。
如果被人疯狂请求一些不存在的短链接怎么办?其实这就是缓存穿透,缓存穿透是指,缓存和数据库都没有的数据,被大量请求,比如订单号不可能为-1,但是用户请求了大量订单号为-1的数据,由于数据不存在,缓存就也不会存在该数据,所有的请求都会直接穿透到数据库。如果被恶意用户利用,疯狂请求不存在的数据,就会导致数据库压力过大,甚至垮掉。
针对这种情况,一般可以用布隆过滤器过滤掉不存在的数据请求,但是我们这里id本来就是递增且有序的,其实我们范围大致都是已知的,更加容易判断,超出的肯定不存在,或者请求到的时候,缓存里面放一个空对象也是没有问题的。
作者简介: 秦怀,公众号秦怀杂货店作者,技术之路不在一时,山高水长,纵使缓慢,驰而不息。个人写作方向:Java源码解析,JDBC,Mybatis,Spring,redis,分布式,剑指Offer,LeetCode等,认真写好每一篇文章,不喜欢标题党,不喜欢花里胡哨,大多写系列文章,不能保证我写的都完全正确,但是我保证所写的均经过实践或者查找资料。遗漏或者错误之处,还望指正。