在Kafka中使用Avro编码消息需要在Producer和Consumer两端都进行相关的配置和使用。在前一篇文章中我们介绍了如何使用Avro编码消息进行生产,本篇文章中我们将介绍如何在Consumer端对Avro编码的消息进行读取和反序列化。
- 配置依赖项
首先需要添加以下依赖项:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
- 配置Consumer
在Consumer端需要进行如下配置:
String schemaRegistryUrl = "http://localhost:8081";
String topic = "avro-topic";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, User> record : records) {
System.out.println("Received message: " + record.value());
}
}
在上面的代码中,我们将KafkaAvroDeserializer
设置为value的反序列化器,并配置了Avro的注册中心地址。KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG
设置为true,表示使用具体类型而不是通用记录类型进行反序列化。
在ConsumerRecords
中,我们可以通过record.value()
方法获取到消息体并进行后续操作。
- 编写Avro类型
我们需要在Consumer端定义和Producer端相同的Avro类型,用于反序列化消息到具体类型。在本例中,使用以下User类型:
{
"namespace": "com.example.kafka",
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}
- 运行Consumer
在两端均配置完毕后,我们可以先运行Producer发送Avro编码消息,再运行Consumer读取和反序列化消息,代码如下:
String schemaRegistryUrl = "http://localhost:8081";
String topic = "avro-topic";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, User> record : records) {
System.out.println("Received message: " + record.value());
}
}
在运行Consumer后,将会输出我们发送到Kafka中的消息。
总结
在使用Avro编码消息时,我们需要在Producer和Consumer两端都进行相关的配置和使用。在Consumer端我们需要将KafkaAvroDeserializer
设置为value的反序列化器,并配置相关的Avro注册中心地址,同时需要定义具体的Avro类型用于反序列化消息。最后在运行Consumer时,我们可以通过record.value()
方法获取到消息体并进行后续操作。