RabbitMQ

1.基本概述

RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)并提供可靠的消息传递机制。通过RabbitMQ,应用程序可以进行异步通信,提高系统的可靠性和扩展性。

官方网站:RabbitMQ: easy to use, flexible messaging and streaming | RabbitMQ

优势有:

  • 解除耦合,拓展性强
  • 无需等待,性能好
  • 故障隔离
  • 缓存消息,流量削峰填谷

安装

docker run -d --restart=always -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8-management

访问http://localhost:15672/

账号密码默认为guest

 

 

rabbitMq流程:

名词解释:

  • publisher:消息发送者
  • consumer:消息的消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由消息

还有一个概念叫virtual-host,即虚拟主机,起到数据隔离的作用!比如不同项目使用不同虚拟主机,隔离数据。

 

我们先创建个用户

点击Admin

这里可以选择权限

选择admin(所以权限)

添加

 

成功如下,只不过还没绑定virtual-host

退出再重新使用刚刚创建的新用户

点击virtual-host

添加

如图:

再新建个队列dreams.queue

完成

 

2.快速入门

Spring AMQP 是 Spring 框架对 AMQP 协议的支持。提供了在 Spring 应用中使用消息队列的便捷方式,借助 Spring AMQP可以更容易地集成 RabbitMQ 等消息代理系统到应用中。

官网:Spring AMQP

新建个微服务项目,在项目下新建两个模块consumer和publisher。

使用rabbitmq那消费端和发送端都要导入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

使用rabbitmq要加入application.yml配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /dreams
    username: dreams
    password: 123456

这里先简单演示直接发送到队列接收

首先在发送方发送消息

package com.dreams;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class DreamsQueue {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void QueueDreams() {
        String queueName = "dreams.queue";
        String message = "hello,Dreams!";
        rabbitTemplate.convertAndSend(queueName,message);
    }

}

运行

点击Get message

成功发送

接收端代码:

package com.dreams.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class QueueListener {

    @RabbitListener(queues = "dreams.queue")
    public void ListenerQueue(String message){
        System.out.println("消费端接收到消息: " + message);
    }
}

运行

接收后就不存在消息了

 

默认情况下RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但是消费者处理信息速度不一致,可能出现消息堆积,而却存在空闲消费者。

改造发送方代码,改为for循环:

@Test
public void QueueDreams() {
    String queueName = "dreams.queue";
    for (int i = 0; i < 20; i++) {
        String message = "hello,Dreams!This message " + i;
        rabbitTemplate.convertAndSend(queueName,message);
    }
}

消费方

@Component
public class QueueListener {

    @RabbitListener(queues = "dreams.queue")
    public void ListenerQueue1(String message) throws InterruptedException {
        System.out.println("消费端1接收到消息: " + message);
        Thread.sleep(200);
    }

    @RabbitListener(queues = "dreams.queue")
    public void ListenerQueue2(String message) throws InterruptedException {
        System.err.println("消费端2接收到消息: " + message);
        Thread.sleep(2000);
    }
}

消息的堆积如图:

解决方法就是配置每次只能获取一条消息

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

这样就不会有消息的堆积。

 

3.Fanout交换机

为了使多个消费者都能接收到同一条消息,就有了交换机。

Fanout 交换机是 RabbitMQ 中的一种常见的交换机类型,它将消息发送到绑定到该交换机上的所有队列中。Fanout 交换机并不关心消息的路由键,因此无论发送的消息的路由键是什么,它都会将消息广播到所有与之绑定的队列中。

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

比如订单服务启动了多个实例,每个实例处理一条消息。而在其他队列依旧存在同一条消息,由其他服务处理。

新建两个交换机

添加一个交换机

 

选择fanout类型

 

要绑定交换机和队列

绑定

如图:

 

代码使用很简单

发送端,只要指定交换机名字即可,convertAndSend第二个参数是路由,这里先不指定。

@Test
public void QueueDreams() {
    String exchangeName = "dreams.fanout";
    String message = "hello,Dreams!";
    rabbitTemplate.convertAndSend(exchangeName,null, message);
}

消费端没什么变化,同样只是指定队列名字

@RabbitListener(queues = "fanout.queue1")
public void ListenerQueue1(String message) throws InterruptedException {
    System.out.println("消费端1接收到消息: " + message);
}

@RabbitListener(queues = "fanout.queue2")
public void ListenerQueue2(String message) throws InterruptedException {
    System.out.println("消费端2接收到消息: " + message);
}

可以看到消费端接收到了同一条消息。

 

 

4.Direct交换机

Direct 交换机是 RabbitMQ 中最简单的一种交换机类型之一,它基于消息的路由键(Routing Key)将消息发送到与之匹配的队列中。具体来说,当生产者发送一条带有特定路由键的消息时,Direct 交换机会将该消息路由到绑定了相同路由键的队列中。

Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

添加两个队列

添加交换机时,指定type为direct

在绑定时添加Routing Key,一个red,一个blue。

消费端没什么变化

@RabbitListener(queues = "direct.queue1")
public void ListenerQueue1(String message) throws InterruptedException {
    System.out.println("消费端1接收到消息: " + message);
}

@RabbitListener(queues = "direct.queue2")
public void ListenerQueue2(String message) throws InterruptedException {
    System.out.println("消费端2接收到消息: " + message);
}

发送方指定Routing Key

@Test
public void QueueDreams() {
    String exchangeName = "dreams.direct";
    String message1 = "hello,Dreams!this id red";
    String message2 = "hello,Dreams!this id blue";
    rabbitTemplate.convertAndSend(exchangeName,"red", message1);
    rabbitTemplate.convertAndSend(exchangeName,"blue", message2);
}

可以看到Exchange只将消息路由到BindingKey与消息RoutingKey一致的队列。

 

5.Topic交换机

Topic 交换机是 RabbitMQ 中一种灵活且强大的交换机类型,它可以根据消息的路由键和通配符模式将消息路由到一个或多个队列中。Topic 交换机支持通配符匹配,使得消息的路由更加灵活和动态。

Topic交换机与Direct交换机类似,区别在于routingKey可以是多个单词的列表,并且以.分割,可使用通配符。

  • #:代指0个或多个单词
  • *:代指一个单词

添加两个队列

添加交换机

指定Routing Key

绑定如下:

消费端:

@RabbitListener(queues = "topic.queue1")
public void ListenerQueue1(String message) throws InterruptedException {
    System.out.println("消费端1接收到消息: " + message);
}

@RabbitListener(queues = "topic.queue2")
public void ListenerQueue2(String message) throws InterruptedException {
    System.out.println("消费端2接收到消息: " + message);
}

发送端

@Test
public void QueueDreams() {
    String exchangeName = "dreams.topic";
    String message1 = "hello,Dreams!this id user.order.add";
    String message2 = "hello,Dreams!this id user.add";
    rabbitTemplate.convertAndSend(exchangeName,"user.order.add", message1);
    rabbitTemplate.convertAndSend(exchangeName,"user.add", message2);
}

运行

 

6.声明方式

方式一

package com.dreams.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    // 声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        //方式1
        //return ExchangeBuilder.fanoutExchange("dreams.code.fanout").build();

        //方式2
        return new FanoutExchange("dreams.code.fanout");
    }

    // 声明第1个队列
    @Bean
    public Queue fanoutQueue1() {
        //方式1
        //return QueueBuilder.durable("fanout.code.queue").build();

        //方式2
        return new Queue("fanout.code.queue");
    }

    //绑定队列1和交换机方式1
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /*
    //绑定队列1和交换机方式2
    @Bean public Binding bindingQueue2() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    **/

}

成功

成功

成功

其余交换机同理

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {
    // 声明DirectExchange交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("dreams.code.direct");
    }

    // 声明第1个队列
    @Bean
    public Queue directQueue1() {
        return new Queue("direct.code.queue");
    }

    //绑定队列1和交换机方式1
    @Bean
    public Binding bindingQueue1(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
}

 

方式二

package com.dreams.config;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectAuto {
   // 声明DirectExchange交换机
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "fanout.code.queue",declare = "true"),
            exchange = @Exchange(name = "dreams.code.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void ListenerQueue1(String message) throws InterruptedException {
        System.out.println("消费端1接收到消息: " + message);
    }
}

这种方式更简单。

 

7.消息转换器

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化。

方式端和消费端都要添加

添加依赖:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

加入Bean,注意不要导错包。

package com.dreams.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

 

8.可靠性

发送方可靠性

有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

在发送方配置:

spring:
  rabbitmq:
    connection-timeout: 1s #设置MQ连续超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后初始等待时间
        multiplier: 1 # 失败后下次等待时长倍数,下次等待时长等于 initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

 

注意:

SpringAMQP提供的重试机制是阻塞式的重试,在这之下的代码会一直等待而得不到执行,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,可以考虑使用异步线程来执行发送消息的代码。

 

RabbitMQ了Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功。
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功。
  • 其它情况都会返回NACK,告知投递失败。

同样在发送方配置

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return 机制

publisher-confirm-type有三种模式可选:

  • none: 关闭confirm机制
  • simple:同步阻塞等待MQ的回执消息
  • correlated: MQ异步回调方式返回回执消息

 

代码接收:

package com.dreams.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback(((message, replyCode, replyTest, exchange, routingKey) -> {
            log.info("消息发送失败,应答码:{},原因{},交换机{},路由键{},消息{}",replyCode,replyTest,exchange,routingKey,message.toString());
        }));
    }
}

以及单个确认

@Test
public void QueueDreams() {
    String exchangeName = "dreams.fanout.non";
    String message = "hello,Dreams!";

    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

    correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            //spring内部错误,基本不会发生
            log.error("message ack fail",ex);
        }

        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // Futurn接收到回执的处理逻辑,参数的result就是回执内容
            if (result.isAck()){
                log.debug("发送消息成功,收到 ack !");
            }else {
                log.error("发送消息失败,收到nack , reason :{}",result.getReason());
                
            }
        }
    });
    rabbitTemplate.convertAndSend(exchangeName,null, message,correlationData);
}

 

消费方可靠性

消费者确认机制为了确认消费者是否成功处理消息, RabbitMQ提供了消费者确认机制(Consumer Acknowledgement) 。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。

回执有三种可选值:

  • ack:成功处理消息, RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,

即参数acknowledge-mode有三种方式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
  • auto:自动模式(推荐)。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果:如果是业务异常,会自动返回nack,如果是消息处理或校验异常,自动返回reject。

配置

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        acknowledge-mode: none

失败重试机制:当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列:

配置如下:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        acknowledge-mode: auto
        retry:
          enabled: true # 开启超时重试机制
          initial-interval: 1000ms # 失败后初始等待时间
          multiplier: 1 # 失败后下次等待时长倍数,下次等待时长等于 initial-interval * multiplier
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态:false有状态。如果业务中包含事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

使用方式就是添加一个Bean,第二个参数是errorExchange,第三个参数是errorRoutingKey

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
}

不过这要求开启重试机制,所有在该配置类上添加该值,@ConditionalOnProperty 是 Spring Boot 中常用的一个条件注解,用于根据配置文件中的属性值来控制 Bean 的加载行为。当指定的属性存在并且取值符合条件时,被注解的 Bean 才会被创建;

@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name = "enabled",havingValue = "true")

开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

 

MQ可靠性

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞

持久化,java默认创建为持久化,

但是客户端需要手动配置

队列

交换机

消息

还有另一种效率更高的另一种惰性队列。

Lazy Queue:

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储
  • 在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

 

要设置一个队列为有性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:

@RabbitListener(queuesToDeclare = @Queue(name = "fanout.code.queue",
    declare = "true",
    arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void ListenerQueue(String message) throws InterruptedException {
    System.out.println("消费端1接收到消息: " + message);
}

@Bean
public Queue fanoutQueue1() {
    return QueueBuilder.durable("fanout.code.queue")
        .lazy()
        .build();
}

 

 

参考:

RabbitMQ: easy to use, flexible messaging and streaming | RabbitMQ

黑马程序员相关课程

尚硅谷相关课程

暂无评论

发送评论 编辑评论

|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇