全部开发者教程

RabbitMQ 入门教程

RabbitMQ 简介
RabbitMQ 简介

消息容器介绍

1. 前言

Hello,大家好。本小节会为同学们介绍 RabbitMQ 在 Spring 生态中的消息容器,消息容器是 RabbitMQ 在 Spring 生态中的第三个核心元素,消息容器指的不是一种特定的容纳消息的容器,而是一系列提供容纳消息、监听消息等其他对消息指标进行监控的工具,接下来就让我们来看看到底什么是消息容器吧。

本节主要内容:

  • 消息容器基础概念概述;

  • 常用消息容器基础配置概述。

2. 消息容器基础概念概述

基础概念:

消息容器,即容纳消息的容器。通过对上述部分小节的学习,我们已经知道,在 原生 RabbitMQ 知识体系中,并没有单独地一个消息容器的概念,只有消息队列的概念。

但是,在 Spring-AMQP 中,消息容器的概念和原生 RabbitMQ 知识体系中的消息队列的概念完全不一样。Spring-AMQP 中的消息容器指的是:可以提供对消息队列、消息、消费者、消息签收模式进行控制,以及消息的全方位监听的一系列工具的统称。

我们知道,在 RabbitMQ 中,充当核心角色的就是我们应用程序中的数据,也就是消息,那么,对消息以及消息队列进行全方位监控的工具,在 Spring-AMQP 中就被称为消息容器。

消息容器的主要作用就是对经过 RabbitTemplate 消息模板发送到 RabbitMQ Server 中的消息进行监听,当然,要确保 RabbitTemplate 和消息容器所使用的是同一个 RabbitAdmin 构造的连接,这样才能对消息进行监控。

在介绍完消息容器的基础概念之后,下面让我们来看一下如何对消息容器进行简单的配置吧。

3. 常用消息容器基础配置概述

还是像上节小节一样,要想在 Spring 中使用消息容器,需要将 Spring-AMQP 和 AMQP-Stater 的依赖先引入进来,方便起见,同学们可以直接拷贝下放代码:

3.1 引入消息容器

以 Maven 引入方式为例,引入代码如下所示:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>3.6.5</version>
</dependency>

在将这两个依赖进行引入之后,我们就可以对消息容器进行配置了。

3.2 常用消息容器基础配置

在 Spring-AMQP 中,消息容器共有这五种:AbstractMessageListenerContainer、DirectMessageListenerContainer、DirectReplyToMessageListenerContainer、SimpleMessageListenerContainer,以及 MessageListenerContainer 接口。

在这五种消息容器中,常用的消息容器只有一种,就是 SimpleMessageListenerContainer ,由于 SimpleMessageListenerContainer 消息容器配置起来简单方便,所支持的配置的属性比较全面,所以该消息容器是所有消息容器中使用频率最高的,也是实际工作中使用最多的一种消息容器。

本节以 SimpleMessageListenerContainer 消息容器为例,来介绍一下 SimpleMessageListenerContainer 消息容器该如何使用吧。

Tips: SimpleMessageListenerContainer 消息容器是众多消息容器中最基本的消息容器,我们把 SimpleMessageListenerContainer 消息容器的基本使用了解之后,其他的消息容器就无师自通了,只不过其他的消息容器所提供的配置属性或多或少罢了。

初始化 SimpleMessageListenerContainer 消息容器

像 RabbitAdmin 和 RabbitTemplate 一样,要想使用消息容器,需要先对消息容器进行初始化,这个初始化过程非常简单,初始化 SimpleMessageListenerContainer 消息容器的代码如下所示:

代码实现:

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
  SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
  return simpleMessageListenerContainer;
}

代码解释:

第 1 行,我们使用 Spring 的 Bean 注解将我们声明的 simpleMessageListenerContainer 方法注入到 Spring 容器中,这样 Spring 容器就可以监听到我们注入的配置。

第 2 行,我们使用 Spring-AMQP 中的 SimpleMessageListenerContainer 类,来声明了一个名为 simpleMessageListenerContainer 的方法,用来对 simpleMessageListenerContainer 消息容器进行初始化。

第 3 行,我们实例化了一个 simpleMessageListenerContainer 实例,该实例是 Spring-AMQP 中对 simpleMessageListenerContainer 消息容器进行初始化的实例,要想使用 SimpleMessageListenerContainer ,就必须要初始化该实例。

第 4 行,我们将初始化好的 simpleMessageListenerContainer 实例进行返回。

SimpleMessageListenerContainer 基本使用

配置 SimpleMessageListenerContainer 消息容器,和之前的配置 RabbitAdmin 以及 RabbitTemplate 不同,我们只需要直接在上述初始化方法中去填充我们需要的配置属性即可,这里以基本常用属性为例,代码如下所示:

simpleMessageListenerContainer.setQueues(new Queue("test_001"), new Queue("test_002"), new Queue("test_003"));
simpleMessageListenerContainer.setConcurrentConsumers(1);
simpleMessageListenerContainer.setMaxConcurrentConsumers(3);
simpleMessageListenerContainer.setDefaultRequeueRejected(false);
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
  @Override
  public String createConsumerTag(String queue) {
  return queue + "_" + "created consumer tag";
  }
});
simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
  @Override
  public void onMessage(Message message, Channel channel) throws Exception {
  String msg = new String(message.getBody());
  // do someting...
  }
  simpleMessageListenerContainer.setAutoStartup(true)
});

代码解释:

第 1-2 行,我们使用 simpleMessageListenerContainer 的 setQueues 方法,来设置我们需要进行监听的队列,这里新建了三个队列,名称分别为:test_001 ,test_002 ,test_003。

第 3-4 行,我们分别使用了 simpleMessageListenerContainer 的 setConcurrentConsumers 方法和 setMaxConcurrentConsumers 方法,来分别设置当前用于消费消息的消费者个数,以及最大允许用于消费消息的消费者个数,这里分别被设置成了 1 和 3 ,表示:当前用于消费消息的消费者个数为 1 个,最大允许进行消费消息的消费者个数为 3 个。

第 5 行,我们使用 simpleMessageListenerContainer 的 setDefaultRequeueRejected 方法,来设置是否开启重回队列, 关于什么是重回队列,在前面小节中都有详细介绍,这里不再赘述。当 setDefaultRequeueRejected 方法的值为 true 时,表示开启重回队列,当为 false 时,表示关闭重回队列,这里表示不适使用重回队列,即该方法的值为 false。

第 6 行,我们使用 simpleMessageListenerContainer 的 setAcknowledgeMode 方法,来设置当前消息的接收模式, 即对应原生 RabbitMQ 中的消息签收模式,是自动签收还是手动签收,这里,AcknowledgeMode.AUTO 表示自动签收消息。

第 7-10 行,我们使用 simpleMessageListenerContainer 的 setConsumerTagStrategy 方法,来为消费者设置一个标签,设置标签需要通过 new ConsumerTagStrategy 匿名内部类的方式来实现,通过重写其中的 createConsumerTag 方法来设置消费者的 Tag 标签。

第 13-17 行,我们使用 simpleMessageListenerContainer 的 setMessageListener 方法,来添加一种监听器, 添加监听器的方式也是通过匿名内部类实现;new ChannelAwareMessageListener 监听器监听的是,当消息经过 channel 时的情况,实现该监听器需要重写其中的 onMessage 方法,我们可以将业务逻辑填充在 onMessage 方法中。

setMessageListener 方法可以添加任意一种 Spring-AMQP 中支持的监听器,不单单只有 ChannelAwareMessageListener 监听器这一种,其他的监听器就交给同学们自行探索吧。

第 19 行,我们使用 simpleMessageListenerContainer 的 setAutoStartup 方法,来设置 simpleMessageListenerContainer 消息容器的启动模式,这里被设置为了 true ,表示,当 Spring 容器启动时,simpleMessageListenerContainer 消息容器也随之启动。

Tips: 1. setQueues 方法中的参数只有一个,就是消息队列,其参数类型为 Queue… ,该参数理论上可以支持无限个队列被添加进去,从而对这些队列进行监听;
2. AcknowledgeMode 的消息签收类型,不只有 AUTO 这一种,还有 NONE 和 MANUAL ,分别表示不设置和手动监听。
3. simpleMessageListenerContainer 消息容器是支持热加载的一款消息容器,即当我们的应用程序处于运行状态时,我们手动改变了某一配置属性的值,这种改动是立即生效的,不需要我们重启我们的应用程序,这点同学们注意。

4. 小结

本小节详细为同学们介绍了 Spring 生态中的消息容器这一概念,并通过结合常用的 SimpleMessageListenerContainer 消息容器,来介绍了消息容器的基本配置方法和常用配置属性,希望同学们可以对 Spring-AMQP 中的常用消息容器 SimpleMessageListenerContainer 有一个基础的认知,这样,我们以后在使用其他消息容器时,才会做到融会贯通。