8.3.3 编写 Kafka 监听器
除了 send() 和 sendDefault() 的惟一方法签名之外,KafkaTemplate 与 JmsTemplate 和 RabbitTemplate 的不同之处在于它不提供任何接收消息的方法。这意味着使用 Spring 消费来自 Kafka 主题的消息的唯一方法是编写消息监听器。
对于 Kafka,消息监听器被定义为被 @KafkaListener 注解的方法。@KafkaListener 注解大致类似于 @JmsListener 和 @RabbitListener,其使用方式大致相同。下面程序清单显示了为 Kafka 编写的基于 listener 的订单接收程序。
package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order) {
ui.displayOrder(order);
}
}
handle() 方法由 @KafkaListener 注解,表示当消息到达名为 tacocloud.orders.topic 的主题时应该调用它。正如程序清单 8.9 中所写的,只为 handle() 方法提供了一个 Order(payload)参数 。但是,如果需要来自消息的其他元数据,它也可以接受一个 ConsumerRecord 或 Message 对象。
例如,handle() 的以下实现接受一个 ConsumerRecord,这样就可以记录消息的分区和时间戳:
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}
类似地,可以使用 Message 而不是 ConsumerRecord,并达到同样的效果:
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
ui.displayOrder(order);
}
值得注意的是,消息有效负载也可以通过 ConsumerRecord.value() 或 Message.getPayload() 获得。这意味着可以通过这些对象请求 Order,而不是直接将其作为 handle() 的参数。