Kafka是一个分布式的流处理平台,其Java API是对 Kafka Producer、Kafka Consumer 和 Kafka Streams的封装,使得 Java 开发者可以轻松地使用到 Kafka。在 Java 中,分别使用 KafkaProducer 和 KafkaConsumer 类来实现 Kafka 生产者和消费者的功能。下面我们就来一起了解一下Java API 如何调用 Kafka 各种协议。
1. 发送消息:使用 Producer 协议
在 Kafka 中,Producer 是指生成消息客户端。为了向 Kafka 主题发送消息,Java 开发者可以使用 KafkaProducer 类。KafkaProducer 类中提供了多个 send() 方法用于发送不同类型的消息。例如,我们可以使用 send() 方法将 key-value 对的消息发送到指定的主题中:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key", "value");
producer.send(record);
producer.close();
在上面的代码中,我们设置了 Kafka 服务地址为 localhost:9092,同时设置了消息序列化和反序列化方式。随后,我们创建了一个 KafkaProducer 对象,并使用 ProducerRecord 类创建了一个消息对象。最后,我们调用 send() 方法发送了一条消息。
2. 接收消息:使用 Consumer 协议
在 Kafka 中,Consumer 是指消费消息的客户端。为了从 Kafka 中接收消息,Java 开发者可以使用 KafkaConsumer 类。在使用KafkaConsumer 类时,需要首先创建一个 Consumer 实例,指定读取数据的主题并指定需要消费的消息分区,代码如下:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
在上述代码中,我们同样配置了 Kafka 服务地址、消息序列化及反序列化方式。随后,我们创建了一个 KafkaConsumer 对象,使用 subscribe() 方法指定了需要消费的主题。在无限循环中,我们调用 poll() 方法来从 Kafka 中读取消息,如果读取到消息,我们就可以从 ConsumerRecord 对象中获取消息的偏移量、键和值。
3. 处理消息:使用 Streams 协议
在 Kafka Streams 中,Java 开发者可以通过构建流处理应用程序来处理收到的信息,并将处理结果输出到 Kafka 主题上。使用 Kafka Streams API 需要引入 Kafka Streams 的依赖包,创建一个 Kafka Streams 应用程序是从创建一个 Kafka StreamsBuilder 对象开始的。流处理应用程序通常包括以下步骤:
1)声明流处理拓扑结构及处理逻辑
2)实例化流处理应用程序
3)启动流处理应用程序
下面是一个实现基本流处理应用程序的示例代码:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("my-input-topic");
KStream<String, String> transformed = source.mapValues(value -> value.toUpperCase());
transformed.to("my-output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
在上述代码中,我们先定义了 Kafka 的服务地址、应用程序 ID 和序列化方式。使用 StreamsBuilder 类创建了处理拓扑结构,并使用 mapValues() 方法对 value 进行转换处理。最后,我们使用 to() 方法将结果输出到一个新的 Kafka 主题上。使用 KafkaStreams 类创建了流处理应用程序实例,并使用 start() 方法启动该程序。
综上所述,使用 Kafka 的 Java API 可以轻松地在 Java 程序中访问 Kafka,并使用其各种协议进行消息的发送、接收和处理。只需要按照上述步骤编写代码,即可轻松地使用 Kafka 实现高效的大规模数据传输和数据处理。