0%

SpringBoot使用Kafka

引入spring-kafka依赖

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

添加配置文件中的配置项

在配置文件中添加对应的生产者、消费者、管理员配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
###########【生产者配置】###########
spring.kafka.producer.bootstrap-servers=192.168.88.128:9092
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
#控制单个批次发送的消息大小 16KB
#spring.kafka.producer.batch-size=16384
# 生产端缓冲区大小
#33554432B=32M
#5242880=5M
spring.kafka.producer.buffer-memory=5242880
#客户端id 用于标识一个唯一的生产者,用于日志记录,后续监控
spring.kafka.producer.client-id=kafka-test
#消息压缩类型,可以减少网络传输和存储成本 可选(none、gzip、snappy和lz4)
#spring.kafka.producer.compression-type=snappy
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 重试次数
spring.kafka.producer.retries=3
# 提交延时
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
spring.kafka.producer.properties.linger.ms=10
###########【消费者配置】###########
spring.kafka.consumer.bootstrap-servers=192.168.88.128:9092
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=kafka-test
#spring.kafka.client-id=kafka-test
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto-commit-interval=1000ms
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
#kafka管理配置
spring.kafka.admin.properties.bootstrap.servers=192.168.88.128:9092

配置生产者和消费者

在引入依赖后,KafkaAutoConfiguration会自动创建对应的生产者和消费者,但是默认泛型类型为<Object,Object>
,如果想自定义某种类型的key或value,可以手动创建
生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Configuration
public class KafkaProducerConfig {
@Bean
public DefaultKafkaProducerFactory<String, String> kafkaProducerFactory(KafkaProperties kafkaProperties,
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(
kafkaProperties.buildProducerProperties());
String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}

@Bean
public LoggingProducerListener<String, String> kafkaProducerListener() {
return new LoggingProducerListener<>();
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(KafkaProperties kafkaProperties,
ProducerFactory<String, String> kafkaProducerFactory,
ProducerListener<String, String> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
map.from(kafkaProperties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
map.from(kafkaProperties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
return kafkaTemplate;
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory(KafkaProperties kafkaProperties,
ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(
kafkaProperties.buildConsumerProperties());
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}

主题手动创建:

1
2
NewTopic newTopic=new NewTopic(topic,partition,replicas);
kafkaAdmin.createOrModifyTopics(newTopic);

发送消息

通过使用KafkaTemplate来发送消息,发送消息时可以指定partition、key、timestamp等信息:

1
kafkaTemplate.send(topic,0,System.currentTimeMillis(),"key","data");

发送事务消息

kafka支持事务发送消息,在一次事务中,发送的消息要么都成功,要么都失败,如果启动事务消息的话,需要将spring.kafka.producer.acks
赋值为all,在事务中发送消息出现异常时,所有消息都不会进行发送:

1
2
3
4
5
6
7
8
9
10
11
public void publish(String topic,Integer size){
if(size<=0){
size=10;
}
for(int i=0;i<size; i++){
kafkaTemplate.send(topic,"key",RandomStringUtils.randomAlphanumeric(10,20));
if(i==7){
throw new RuntimeException("error");
}
}
}

在上面的例子中,如果i等于7,将会抛出异常,所有消息都不会发送。

消息订阅

通过KafkaListener注解进行订阅

@KafkaListener注解用于将 bean 方法指定为监听器容器的监听器。 该 bean 包装在 MessagingMessageListenerAdapter 中,该适配器配置有各种功能,例如用于转换数据的转换器以匹配方法参数。
而且可以使用SpEL或者属性占位符 配置注释上的大多数属性

  • @KafkaListener 注解

    • id(): 定义监听器容器的唯一标识符,如果未指定,则使用自动生成的 id。

    • containerFactory(): 指定用于创建监听器容器的KafkaListenerContainerFactory 的 bean 名称。

    • topics(): 监听的主题列表,可以是主题名称、属性占位符键或表达式。

    • topicPattern(): 监听的主题模式,用于匹配多个主题。

    • topicPartitions(): 手动指定要监听的主题分区。

    • containerGroup(): 将监听器容器添加到具有此值作为名称的 bean 中,以支持在集合中启动/停止一组容器。

    • errorHandler(): 当监听器方法抛出异常时,指定用于处理异常的 KafkaListenerErrorHandler 的 bean 名称。

    • groupId(): 仅为此监听器覆盖的消费者工厂配置中的 group.id 属性。

    • idIsGroup(): 如果为 false,则使用消费者工厂中的 group.id 属性。

    • clientIdPrefix(): 覆盖消费者工厂配置中的 client.id 属性。

    • beanRef(): 用于在注解内引用当前 bean 的伪 bean 名称。

    • concurrency(): 覆盖容器工厂的 concurrency 设置。

    • autoStartup(): 覆盖容器工厂的 autoStartup 设置。

    • properties(): 覆盖消费者工厂配置的其他 Kafka 消费者属性。

    • splitIterables(): 当返回类型为 Iterable 时,设置为 false 以将结果作为单个回复记录返回。

    • contentTypeConverter(): 设置用于将消息内容转换为所需类型的 SmartMessageConverter 的 bean 名称。

    • batch(): 覆盖容器工厂的 batchListener 设置。

    • filter(): 覆盖容器工厂的 recordFilterStrategy 设置。

    • info(): 将静态信息添加为带有键 KafkaHeaders.LISTENER_INFO 的标头,用于在拦截器、过滤器等中引用。

  • @TopicPartition 注解

    • topic(): 要监听的主题。

    • partitions(): 要监听的主题的分区列表。与partitionOffsets配置冲突,同一个分区不可在两者同时配置

    • partitionOffsets(): 主题分区的初始偏移量信息数组。

  • @PartitionOffset 注解

    • partition(): 要监听的主题的分区。

    • initialOffset(): 分区的初始偏移量。

    • relativeToCurrent(): 初始偏移量是否相对于当前位置。

1
2
3
4
@KafkaListener(topics = "write-test")
public void consume1(String data){
System.out.println(data);
}

手动分配分区

可以在注解中手动指定所监控的主题的分区,分区的初始offset

1
2
3
4
5
6
7
8
@KafkaListener(id = "thing2", topicPartitions =
{@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
//...
}

也可以为主题所分配的所有分区制定初始offset

1
2
3
4
5
6
7
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}

手动ACK

修改配置:将消费者的enable.auto.commit设置为false,监听器工厂的ACK_MODE设置为MANUAL_IMMEDIATE,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Bean
public DefaultKafkaConsumerFactory<String, String> kafkaManualAckConsumerFactory(KafkaProperties kafkaProperties,
ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers){
Map<String, Object> configs=kafkaProperties.buildConsumerProperties();
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
configs.put(ConsumerConfig.GROUP_ID_CONFIG,"kafka-ack-test");
configs.put(ConsumerConfig.CLIENT_ID_CONFIG,"kafka-ack-test1");
DefaultKafkaConsumerFactory<String, String> factory=new DefaultKafkaConsumerFactory<>(configs);
customizers.orderedStream().forEach((customizer)->customizer.customize(factory));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory(ConsumerFactory<String, String> kafkaManualAckConsumerFactory){
ConcurrentKafkaListenerContainerFactory<String, String> factory=new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaManualAckConsumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}

配置了手动ACK后,使用该监听器工厂监听消息时必须指定Acknowledgment参数,在方法中应该手动进行ack或者nack,否则消息可能重复处理。

1
2
3
4
5
6
7
8
9
10
@KafkaListener(topicPartitions = {
@TopicPartition(topic = "write-test", partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "140")})
}, containerFactory = "kafkaManualAckListenerContainerFactory")
public void consumeWriteTest(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) {
System.out.println("partition:" + consumerRecord.partition() + " offset:" + consumerRecord.offset() + " value:" + consumerRecord.value());
if ((RandomUtils.nextInt(0, 2) & 1) == 1) {
ack.acknowledge();
} else
ack.nack(Duration.ofMillis(1));
}