引入spring-kafka依赖
1 2 3 4
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
添加配置文件中的配置项
在配置文件中添加对应的生产者、消费者、管理员配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| spring.kafka.producer.bootstrap-servers=192.168.88.128:9092
spring.kafka.producer.acks=1
spring.kafka.producer.buffer-memory=5242880
spring.kafka.producer.client-id=kafka-test
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.retries=3
spring.kafka.producer.properties.linger.ms=10
spring.kafka.consumer.bootstrap-servers=192.168.88.128:9092
spring.kafka.consumer.properties.group.id=kafka-test
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000ms
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.properties.session.timeout.ms=120000
spring.kafka.consumer.properties.request.timeout.ms=180000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.admin.properties.bootstrap.servers=192.168.88.128:9092
|
配置生产者和消费者
在引入依赖后,KafkaAutoConfiguration会自动创建对应的生产者和消费者,但是默认泛型类型为<Object,Object>
,如果想自定义某种类型的key或value,可以手动创建
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Configuration public class KafkaProducerConfig { @Bean public DefaultKafkaProducerFactory<String, String> kafkaProducerFactory(KafkaProperties kafkaProperties, ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) { DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>( kafkaProperties.buildProducerProperties()); String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix(); if (transactionIdPrefix != null) { factory.setTransactionIdPrefix(transactionIdPrefix); } customizers.orderedStream().forEach((customizer) -> customizer.customize(factory)); return factory; }
@Bean public LoggingProducerListener<String, String> kafkaProducerListener() { return new LoggingProducerListener<>(); }
@Bean public KafkaTemplate<String, String> kafkaTemplate(KafkaProperties kafkaProperties, ProducerFactory<String, String> kafkaProducerFactory, ProducerListener<String, String> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory); messageConverter.ifUnique(kafkaTemplate::setMessageConverter); map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener); map.from(kafkaProperties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic); map.from(kafkaProperties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix); return kafkaTemplate; } }
|
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Configuration public class KafkaConsumerConfig { @Bean public DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory(KafkaProperties kafkaProperties, ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) { DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>( kafkaProperties.buildConsumerProperties()); customizers.orderedStream().forEach((customizer) -> customizer.customize(factory)); return factory; }
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); return factory; } }
|
主题手动创建:
1 2
| NewTopic newTopic=new NewTopic(topic,partition,replicas); kafkaAdmin.createOrModifyTopics(newTopic);
|
发送消息
通过使用KafkaTemplate来发送消息,发送消息时可以指定partition、key、timestamp等信息:
1
| kafkaTemplate.send(topic,0,System.currentTimeMillis(),"key","data");
|
发送事务消息
kafka支持事务发送消息,在一次事务中,发送的消息要么都成功,要么都失败,如果启动事务消息的话,需要将spring.kafka.producer.acks
赋值为all
,在事务中发送消息出现异常时,所有消息都不会进行发送:
1 2 3 4 5 6 7 8 9 10 11
| public void publish(String topic,Integer size){ if(size<=0){ size=10; } for(int i=0;i<size; i++){ kafkaTemplate.send(topic,"key",RandomStringUtils.randomAlphanumeric(10,20)); if(i==7){ throw new RuntimeException("error"); } } }
|
在上面的例子中,如果i等于7,将会抛出异常,所有消息都不会发送。
消息订阅
通过KafkaListener注解进行订阅
@KafkaListener
注解用于将 bean 方法指定为监听器容器的监听器。 该 bean 包装在 MessagingMessageListenerAdapter
中,该适配器配置有各种功能,例如用于转换数据的转换器以匹配方法参数。
而且可以使用SpEL或者属性占位符 配置注释上的大多数属性
@KafkaListener
注解
id()
: 定义监听器容器的唯一标识符,如果未指定,则使用自动生成的 id。
containerFactory()
: 指定用于创建监听器容器的KafkaListenerContainerFactory
的 bean 名称。
topics()
: 监听的主题列表,可以是主题名称、属性占位符键或表达式。
topicPattern()
: 监听的主题模式,用于匹配多个主题。
topicPartitions()
: 手动指定要监听的主题分区。
containerGroup()
: 将监听器容器添加到具有此值作为名称的 bean 中,以支持在集合中启动/停止一组容器。
errorHandler()
: 当监听器方法抛出异常时,指定用于处理异常的 KafkaListenerErrorHandler 的 bean 名称。
groupId()
: 仅为此监听器覆盖的消费者工厂配置中的 group.id 属性。
idIsGroup()
: 如果为 false,则使用消费者工厂中的 group.id 属性。
clientIdPrefix()
: 覆盖消费者工厂配置中的 client.id 属性。
beanRef()
: 用于在注解内引用当前 bean 的伪 bean 名称。
concurrency()
: 覆盖容器工厂的 concurrency 设置。
autoStartup()
: 覆盖容器工厂的 autoStartup 设置。
properties()
: 覆盖消费者工厂配置的其他 Kafka 消费者属性。
splitIterables()
: 当返回类型为 Iterable 时,设置为 false 以将结果作为单个回复记录返回。
contentTypeConverter()
: 设置用于将消息内容转换为所需类型的 SmartMessageConverter 的 bean 名称。
batch()
: 覆盖容器工厂的 batchListener 设置。
filter()
: 覆盖容器工厂的 recordFilterStrategy 设置。
info()
: 将静态信息添加为带有键 KafkaHeaders.LISTENER_INFO 的标头,用于在拦截器、过滤器等中引用。
@TopicPartition
注解
@PartitionOffset
注解
1 2 3 4
| @KafkaListener(topics = "write-test") public void consume1(String data){ System.out.println(data); }
|
手动分配分区
可以在注解中手动指定所监控的主题的分区,分区的初始offset
1 2 3 4 5 6 7 8
| @KafkaListener(id = "thing2", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { }
|
也可以为主题所分配的所有分区制定初始offset
1 2 3 4 5 6 7
| @KafkaListener(id = "thing3", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }, partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")) }) public void listen(ConsumerRecord<?, ?> record) { ... }
|
手动ACK
修改配置:将消费者的enable.auto.commit
设置为false,监听器工厂的ACK_MODE设置为MANUAL_IMMEDIATE,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Bean public DefaultKafkaConsumerFactory<String, String> kafkaManualAckConsumerFactory(KafkaProperties kafkaProperties, ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers){ Map<String, Object> configs=kafkaProperties.buildConsumerProperties(); configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); configs.put(ConsumerConfig.GROUP_ID_CONFIG,"kafka-ack-test"); configs.put(ConsumerConfig.CLIENT_ID_CONFIG,"kafka-ack-test1"); DefaultKafkaConsumerFactory<String, String> factory=new DefaultKafkaConsumerFactory<>(configs); customizers.orderedStream().forEach((customizer)->customizer.customize(factory)); return factory; } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory(ConsumerFactory<String, String> kafkaManualAckConsumerFactory){ ConcurrentKafkaListenerContainerFactory<String, String> factory=new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(kafkaManualAckConsumerFactory); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; }
|
配置了手动ACK后,使用该监听器工厂监听消息时必须指定Acknowledgment
参数,在方法中应该手动进行ack或者nack,否则消息可能重复处理。
1 2 3 4 5 6 7 8 9 10
| @KafkaListener(topicPartitions = { @TopicPartition(topic = "write-test", partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "140")}) }, containerFactory = "kafkaManualAckListenerContainerFactory") public void consumeWriteTest(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) { System.out.println("partition:" + consumerRecord.partition() + " offset:" + consumerRecord.offset() + " value:" + consumerRecord.value()); if ((RandomUtils.nextInt(0, 2) & 1) == 1) { ack.acknowledge(); } else ack.nack(Duration.ofMillis(1)); }
|