一个专注于大数据技术架构与应用分享的技术博客

在Kafka中使用Avro编码消息:Consumer篇

在Kafka中使用Avro编码消息需要在Producer和Consumer两端都进行相关的配置和使用。在前一篇文章中我们介绍了如何使用Avro编码消息进行生产,本篇文章中我们将介绍如何在Consumer端对Avro编码的消息进行读取和反序列化。

  1. 配置依赖项

首先需要添加以下依赖项:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.2</version>
</dependency>
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>4.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.0.0</version>
</dependency>
  1. 配置Consumer

在Consumer端需要进行如下配置:

String schemaRegistryUrl = "http://localhost:8081";
String topic = "avro-topic";

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList(topic));
while (true) {
    ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, User> record : records) {
        System.out.println("Received message: " + record.value());
    }
}

在上面的代码中,我们将KafkaAvroDeserializer设置为value的反序列化器,并配置了Avro的注册中心地址。KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG设置为true,表示使用具体类型而不是通用记录类型进行反序列化。

ConsumerRecords中,我们可以通过record.value()方法获取到消息体并进行后续操作。

  1. 编写Avro类型

我们需要在Consumer端定义和Producer端相同的Avro类型,用于反序列化消息到具体类型。在本例中,使用以下User类型:

{
  "namespace": "com.example.kafka",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"}
  ]
}
  1. 运行Consumer

在两端均配置完毕后,我们可以先运行Producer发送Avro编码消息,再运行Consumer读取和反序列化消息,代码如下:

String schemaRegistryUrl = "http://localhost:8081";
String topic = "avro-topic";

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList(topic));
while (true) {
    ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, User> record : records) {
        System.out.println("Received message: " + record.value());
    }
}

在运行Consumer后,将会输出我们发送到Kafka中的消息。

总结

在使用Avro编码消息时,我们需要在Producer和Consumer两端都进行相关的配置和使用。在Consumer端我们需要将KafkaAvroDeserializer设置为value的反序列化器,并配置相关的Avro注册中心地址,同时需要定义具体的Avro类型用于反序列化消息。最后在运行Consumer时,我们可以通过record.value()方法获取到消息体并进行后续操作。

赞(0)
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《在Kafka中使用Avro编码消息:Consumer篇》
文章链接:https://macsishu.com/using-avro-encoding-message-in-kafkconsumer
本站资源仅供个人学习交流,请于下载后24小时内删除,不允许用于商业用途,否则法律问题自行承担。