1.Spring-kafka批量消费优化
2.聊聊在springboot项目中如何配置多个kafka消费者
3.springboot2中如何整合kafka组件?
4.Spring Kafka:Retry Topic、DLT 的使用与原理
5.轻松上手 Spring Boot & Kafka 实战!
Spring-kafka批量消费优化
Spring Kafka批量消费优化通过使用ConcurrentKafkaListenerContainerFactory实现。该类继承自AbstractKafkaListenerContainerFactory,后者提供大部分通用属性,如BatchListener、angularjs tutorial 源码ContainerProperties等。
ConcurrentKafkaListenerContainerFactory特有的属性为concurrency,其默认值为1。若使用默认值,批量消费时会通过迭代器拉取消费,首先拉取完一个分区,再拉取其他分区。
Spring Kafka的concurrency属性用于多线程消费分区。例如,若设置concurrency为2,且主题分区为2,将出现两个线程并行消费每个分区的场景。
设置规则应遵循concurrency不超过分区数除以应用实例数,最多为等于,短网址转换源码若设置过多则为闲置,无效。通过合理调整concurrency值,可以实现更高效的批量消费优化。
聊聊在springboot项目中如何配置多个kafka消费者
在SpringBoot项目中配置多个Kafka消费者,以满足不同消费者消费特定Kafka消息的需求,通常通过Kafka的API配置实现。但Spring-Kafka简化开发时,原生配置项并未提供支持多Kafka配置的功能。本文将介绍如何通过改造Spring-Kafka,使其支持多Kafka配置。
首先,通过`@ConfigurationProperties`指定KafkaProperties前缀,允许配置多个,如:
配置消费者工厂,将消费者工厂绑定对应的KafkaProperties。
接着,配置消费者监听器工厂,并绑定指定消费者工厂及配置。同城微论坛源码
以下为完整配置示例,确保使用`@Primary`指定,避免因存在多个KafkaProperties而导致自动装配错误。
同项目使用多个Kafka消费者示例:
在项目pom中引入Spring-Kafka GAV。
在yml配置文件中配置如下内容:
配置生产者,绑定`@Primary`配置的KafkaProperties。
配置消费者监听,并绑定containerFactory,以消费指定的Kafka消息。
完成测试,发送消息并观察控制台输出,验证重复消费情况。
总结,核心在于通过注入多个KafkaProperties实现多配置。改造后的配置中,生产者配置也需同步,以避免使用默认配置信息。本文示例中使用的`@LybGeekKafkaListener`注解与`@KafkaListener`功能类似,实为同一代码示例的后台系统模板源码复用。
springboot2中如何整合kafka组件?
在Spring Boot 2中整合Kafka组件,通过Spring for Apache Kafka简化集成过程,主要步骤如下:
首先,Maven或Gradle项目中添加Spring for Apache Kafka依赖,Maven示例为:
然后,在application.properties或application.yml配置文件中设置Kafka连接信息,例如:
配置内容包括Kafka服务器地址、端口、消费者组ID、自动位移重置方式及默认主题。
创建Kafka生产者时,使用KafkaTemplate类,示例代码如下:
通过构造函数注入KafkaTemplate,并在sendMessage方法中发送消息至默认主题。
创建Kafka消费者时,使用@KafkaListener注解,示例如下:
设置监听主题(如“my-topic”)及消费者组ID(如“my-group”),当消息到达时,会调用receiveMessage方法并打印接收到的CQ授权系统源码消息。
总结,以上为Spring Boot 2中Kafka组件基本整合步骤,基于具体需求调整和定制。深入了解Spring for Apache Kafka功能和选项以实现更深入的集成。
Spring Kafka:Retry Topic、DLT 的使用与原理
Spring Kafka 在核心功能之外,扩展了Retry Topic和DLT(死信队列)的支持。这个增强在spring-kafka 2.7.及更高版本中可用,早期版本则不支持。
默认情况下,当消费逻辑遇到异常,Spring Kafka会进行快速重试,最多次,每次无间隔。如果重试后依旧失败,它会尝试commit记录。重试的机制基于SeekUtils#doSeeks,可以通过自定义SeekToCurrentErrorHandler来调整,例如设置重试间隔和失败后将消息发送到DLT。
定制SeekToCurrentErrorHandler后,异常后的处理会间隔秒重试3次,如果所有尝试都失败,消息会被转移到死信队列。这样的设计避免了单个消息重试占用消费线程,而是通过专用的retry线程处理。
开启Retry Topic和DLT的使用可以通过注解和全局配置实现。@RetryableTopic注解可以应用在`@KafkaListener`方法上,设置默认重试3次,间隔1秒,如果重试后依然失败,消息将转到死信队列。用户还可以自定义死信处理逻辑。
配置方面,可以调整重试次数、延迟时间和死信策略,支持Spring EL表达式。`fixedDelayTopicStrategy`的选择很重要,但具体策略可以根据需求调整。
源码解析显示,Spring Kafka通过暂停和恢复分区实现延迟重试。消息在Retry Topic中带有延迟时间,监听器在消费前检查并暂停分区,确保在期望的时间重新开始消费。这种设计有助于控制消息的延迟时间。
关于Retry Topic策略,FixedDelayStrategy有MULTIPLE_TOPICS和SINGLE_TOPIC两种。前者会创建多个主题以实现指数级增长的重试时间,而后者保持固定延迟,但可能在分区分配上产生不一致。如何配置多个retry线程,可以根据需要调整KafkaListener的并发设置或自定义ContainerFactory。
对于更深入的学习和实践,可以参考GitHub上的Spring Kafka示例:github.com/TavenYin/tav...
轻松上手 Spring Boot & Kafka 实战!
Kafka集群安装、配置和启动
Kafka需要依赖zookeeper,并且自身集成了zookeeper,zookeeper至少需要3个节点保证集群高可用,下面是在单机linux下创建kafka3个节点伪集群模式。
1、下载包
下载地址: kafka.apache.org/downlo...
2、解压包
tar -zxvf kafka_2.-1.0.0.tgz mv kafka_2.-1.0.0 kafka1 mv kafka_2.-1.0.0 kafka2 mv kafka_2.-1.0.0 kafka3
3、创建ZK集群
修改ZK配置文件:kafka1-3/config/zookeeper.properties分别修改对应的参数。
/usr/local/kafka/zookeeper1-3目录下分别创建myid文件,内容对应1~3
启动ZK,分别进行Kafka1-3目录:
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动报文件失败,需要手动创建文件目录并赋予对应的权限。
4、创建Kafka集群
配置文件:kafka1-3/config/server.properties分别修改对应的参数。
启动Kafka,分别进行Kafka1-3目录:
bin/kafka-server-start.sh config/server.properties &
启动报文件失败,需要手动创建文件目录并赋予对应的权限。
5、集群测试
在kafka1上面发送消息:
bin/kafka-console-producer.sh --broker-list localhost: --topic test
在kafka2、kafka3消费消息:
bin/kafka-console-consumer.sh --zookeeper localhost: --from-beginning --topic my-replicated-topic
Spring Boot 集成 Kafka 实战
1、添加spring-kafka依赖
2、添加Spring Boot的自动配置
自动配置类:
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
配置属性类:
org.springframework.boot.autoconfigure.kafka.KafkaProperties
3、发送消息
4、接收消息
在任何bean里面,添加@KafkaListener,支持消息接收。
5、参考资料
Spring Boot & Kafka官方文档:
docs.spring.io/spring-b...
Spring for Apache Kafka官方文档:
记得关注我,分享更主流的Java技术~
更多 Spring Boot 干货:
Spring Boot 宣布移除 run 命令,真让我猝不及防!
Spring Boot 定时任务开启后,怎么符合条件自动停止?
Spring Boot 保护敏感配置的 4 种方法,让你的系统不再裸奔!!
Spring Boot 集成 Flyway,数据库也能做版本控制,太牛逼了!
个官方 Spring Boot Starters 出炉!别再重复造轮子了……
Spring Boot Redis 实现分布式锁,真香!!
Spring Boot 之配置导入,强大到不行!
年轻人的第一个自定义 Spring Boot Starter!
Spring Boot 面试,一个问题就干趴下了!(下)
Spring Boot 最核心的 个注解,都是干货!
好了,最后栈长再送你一份Spring Boot 学习笔记,包括底层实现原理及代码实战,非常齐全,助你快速打通 Spring Boot 的各个环节。
链接: pan.baidu.com/s/wLzA6... 提取码: ztsj
最后,别忘了点在看、转发哦,需要你的鼓励~