Kafka 消费者 API 是 Kafka 提供给开发者的一组接口和工具,用于构建应用程序以消费 Kafka 主题中的消息。消费者 API 具有以下主要作用:
-
消息订阅: 消费者 API 允许应用程序订阅一个或多个 Kafka 主题。通过指定主题名称,消费者可以从相应的主题中拉取消息。
consumer.subscribe(Arrays.asList("topic1", "topic2"));
-
消息拉取: 消费者 API 允许消费者通过轮询(polling)的方式从 Kafka 服务器拉取消息。消费者可以根据自己的需求调整拉取的频率。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
-
消息处理: 消费者 API 允许开发者实现自定义的消息处理逻辑。拉取到的消息将被存储在
ConsumerRecords
对象中,开发者可以遍历这些消息并执行相应的业务逻辑。for (ConsumerRecord<String, String> record : records) { // 处理消息逻辑 System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value()); }
-
偏移提交: 消费者 API 提供了手动提交和自动提交偏移的方式。偏移(offset)表示消费者在分区中的读取位置。通过提交偏移,消费者可以记录自己读取到的消息位置,以便在下次拉取消息时从正确的位置开始。
-
手动提交偏移:
consumer.commitSync();
-
自动提交偏移:
properties.put("enable.auto.commit", "true");
-
手动提交偏移:
-
多线程处理: 消费者 API 允许创建多个消费者实例,每个实例运行在独立的线程中,以提高消息处理的并发性。这对于处理大量消息或实现并行处理逻辑很有用。
// 创建多个消费者实例 Consumer<String, String> consumer1 = new KafkaConsumer<>(properties); Consumer<String, String> consumer2 = new KafkaConsumer<>(properties); // 在独立的线程中运行每个消费者实例 new Thread(() -> { while (true) { ConsumerRecords<String, String> records = consumer1.poll(Duration.ofMillis(100)); // 处理消息逻辑 } }).start(); new Thread(() -> { while (true) { ConsumerRecords<String, String> records = consumer2.poll(Duration.ofMillis(100)); // 处理消息逻辑 } }).start();
总体而言,Kafka 消费者 API 提供了灵活且功能丰富的工具,使得开发者能够轻松地构建消费 Kafka 主题的应用程序,处理实时流数据。
Was this helpful?
0 / 0