引入spring-kafka依赖
1 2 3 4
   | <dependency>     <groupId>org.springframework.kafka</groupId>     <artifactId>spring-kafka</artifactId> </dependency>
   | 
 
添加配置文件中的配置项
在配置文件中添加对应的生产者、消费者、管理员配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
   |  spring.kafka.producer.bootstrap-servers=192.168.88.128:9092
  spring.kafka.producer.acks=1
 
 
 
 
  spring.kafka.producer.buffer-memory=5242880
  spring.kafka.producer.client-id=kafka-test
 
 
  spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  spring.kafka.producer.retries=3
 
 
  spring.kafka.producer.properties.linger.ms=10
  spring.kafka.consumer.bootstrap-servers=192.168.88.128:9092
  spring.kafka.consumer.properties.group.id=kafka-test
 
  spring.kafka.consumer.enable-auto-commit=true
  spring.kafka.consumer.auto-commit-interval=1000ms
 
 
 
  spring.kafka.consumer.auto-offset-reset=latest
  spring.kafka.consumer.properties.session.timeout.ms=120000
  spring.kafka.consumer.properties.request.timeout.ms=180000
  spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  spring.kafka.listener.missing-topics-fatal=false
  spring.kafka.admin.properties.bootstrap.servers=192.168.88.128:9092
 
  | 
 
配置生产者和消费者
在引入依赖后,KafkaAutoConfiguration会自动创建对应的生产者和消费者,但是默认泛型类型为<Object,Object>
,如果想自定义某种类型的key或value,可以手动创建
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
   | @Configuration public class KafkaProducerConfig {     @Bean     public DefaultKafkaProducerFactory<String, String> kafkaProducerFactory(KafkaProperties kafkaProperties,                                                                             ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {         DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(                 kafkaProperties.buildProducerProperties());         String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();         if (transactionIdPrefix != null) {             factory.setTransactionIdPrefix(transactionIdPrefix);         }         customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));         return factory;     }
      @Bean     public LoggingProducerListener<String, String> kafkaProducerListener() {         return new LoggingProducerListener<>();     }
      @Bean     public KafkaTemplate<String, String> kafkaTemplate(KafkaProperties kafkaProperties,                                                        ProducerFactory<String, String> kafkaProducerFactory,                                                        ProducerListener<String, String> kafkaProducerListener,                                                        ObjectProvider<RecordMessageConverter> messageConverter) {         PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();         KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);         messageConverter.ifUnique(kafkaTemplate::setMessageConverter);         map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);         map.from(kafkaProperties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);         map.from(kafkaProperties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);         return kafkaTemplate;     } }
   | 
 
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | @Configuration public class KafkaConsumerConfig {     @Bean     public DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory(KafkaProperties kafkaProperties,                                                                             ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {         DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(                 kafkaProperties.buildConsumerProperties());         customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));         return factory;     }
      @Bean     public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();         factory.setConsumerFactory(consumerFactory);         return factory;     } }
   | 
 
主题手动创建:
1 2
   | NewTopic newTopic=new NewTopic(topic,partition,replicas); kafkaAdmin.createOrModifyTopics(newTopic);
   | 
 
发送消息
通过使用KafkaTemplate来发送消息,发送消息时可以指定partition、key、timestamp等信息:
1
   | kafkaTemplate.send(topic,0,System.currentTimeMillis(),"key","data");
   | 
 
发送事务消息
kafka支持事务发送消息,在一次事务中,发送的消息要么都成功,要么都失败,如果启动事务消息的话,需要将spring.kafka.producer.acks
赋值为all,在事务中发送消息出现异常时,所有消息都不会进行发送:
1 2 3 4 5 6 7 8 9 10 11
   | public void publish(String topic,Integer size){     if(size<=0){         size=10;     }     for(int i=0;i<size; i++){         kafkaTemplate.send(topic,"key",RandomStringUtils.randomAlphanumeric(10,20));         if(i==7){         throw new RuntimeException("error");         }     } }
  | 
 
在上面的例子中,如果i等于7,将会抛出异常,所有消息都不会发送。
消息订阅
通过KafkaListener注解进行订阅
@KafkaListener注解用于将 bean 方法指定为监听器容器的监听器。 该 bean 包装在 MessagingMessageListenerAdapter 中,该适配器配置有各种功能,例如用于转换数据的转换器以匹配方法参数。
而且可以使用SpEL或者属性占位符 配置注释上的大多数属性
@KafkaListener 注解
id(): 定义监听器容器的唯一标识符,如果未指定,则使用自动生成的 id。
 
containerFactory(): 指定用于创建监听器容器的KafkaListenerContainerFactory 的 bean 名称。
 
topics(): 监听的主题列表,可以是主题名称、属性占位符键或表达式。
 
topicPattern(): 监听的主题模式,用于匹配多个主题。
 
topicPartitions(): 手动指定要监听的主题分区。
 
containerGroup(): 将监听器容器添加到具有此值作为名称的 bean 中,以支持在集合中启动/停止一组容器。
 
errorHandler(): 当监听器方法抛出异常时,指定用于处理异常的 KafkaListenerErrorHandler 的 bean 名称。
 
groupId(): 仅为此监听器覆盖的消费者工厂配置中的 group.id 属性。
 
idIsGroup(): 如果为 false,则使用消费者工厂中的 group.id 属性。
 
clientIdPrefix(): 覆盖消费者工厂配置中的 client.id 属性。
 
beanRef(): 用于在注解内引用当前 bean 的伪 bean 名称。
 
concurrency(): 覆盖容器工厂的 concurrency 设置。
 
autoStartup(): 覆盖容器工厂的 autoStartup 设置。
 
properties(): 覆盖消费者工厂配置的其他 Kafka 消费者属性。
 
splitIterables(): 当返回类型为 Iterable 时,设置为 false 以将结果作为单个回复记录返回。
 
contentTypeConverter(): 设置用于将消息内容转换为所需类型的 SmartMessageConverter 的 bean 名称。
 
batch(): 覆盖容器工厂的 batchListener 设置。
 
filter(): 覆盖容器工厂的 recordFilterStrategy 设置。
 
info(): 将静态信息添加为带有键 KafkaHeaders.LISTENER_INFO 的标头,用于在拦截器、过滤器等中引用。
 
 
@TopicPartition 注解
 
@PartitionOffset 注解
 
1 2 3 4
   | @KafkaListener(topics = "write-test") public void  consume1(String data){     System.out.println(data); }
   | 
 
手动分配分区
可以在注解中手动指定所监控的主题的分区,分区的初始offset
1 2 3 4 5 6 7 8
   | @KafkaListener(id = "thing2", topicPartitions =         {@TopicPartition(topic = "topic1", partitions = {"0", "1"}),                 @TopicPartition(topic = "topic2", partitions = "0",                         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))         }) public void listen(ConsumerRecord<?, ?> record) {      }
   | 
 
也可以为主题所分配的所有分区制定初始offset
1 2 3 4 5 6 7
   | @KafkaListener(id = "thing3", topicPartitions =         { @TopicPartition(topic = "topic1", partitions = { "0", "1" },              partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))         }) public void listen(ConsumerRecord<?, ?> record) {     ... }
   | 
 
手动ACK
修改配置:将消费者的enable.auto.commit设置为false,监听器工厂的ACK_MODE设置为MANUAL_IMMEDIATE,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | @Bean public DefaultKafkaConsumerFactory<String, String> kafkaManualAckConsumerFactory(KafkaProperties kafkaProperties,                                                                                 ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers){     Map<String, Object> configs=kafkaProperties.buildConsumerProperties();     configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);     configs.put(ConsumerConfig.GROUP_ID_CONFIG,"kafka-ack-test");     configs.put(ConsumerConfig.CLIENT_ID_CONFIG,"kafka-ack-test1");     DefaultKafkaConsumerFactory<String, String> factory=new DefaultKafkaConsumerFactory<>(configs);     customizers.orderedStream().forEach((customizer)->customizer.customize(factory));     return factory; } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory(ConsumerFactory<String, String> kafkaManualAckConsumerFactory){     ConcurrentKafkaListenerContainerFactory<String, String> factory=new ConcurrentKafkaListenerContainerFactory<>();     factory.setConsumerFactory(kafkaManualAckConsumerFactory);     factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);     return factory; }
   | 
 
配置了手动ACK后,使用该监听器工厂监听消息时必须指定Acknowledgment参数,在方法中应该手动进行ack或者nack,否则消息可能重复处理。
1 2 3 4 5 6 7 8 9 10
   | @KafkaListener(topicPartitions = {          @TopicPartition(topic = "write-test", partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "140")})  }, containerFactory = "kafkaManualAckListenerContainerFactory")  public void consumeWriteTest(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) {      System.out.println("partition:" + consumerRecord.partition() + " offset:" + consumerRecord.offset() + " value:" + consumerRecord.value());      if ((RandomUtils.nextInt(0, 2) & 1) == 1) {          ack.acknowledge();      } else          ack.nack(Duration.ofMillis(1));  }
  |