rabbitmq如何保证消息的可靠传输(详解如何利用RabbitMQ生产一个简单的消息)

最近业务中有有这样一个场景,就是用户在商城下单之后,如果30分钟没有付款,那么就需要将这个订单处理掉,要么直接删除,要么直接标识为失效状态,为什么要这么做?

  • 1、库存,用户在下单之后,会锁定一个库存,如果用户一直不支付,那么就会占用库存,影响别的用户购买,
  • 2、随着业务的发展,用户量的增加,我们的订单数据会越来越多,那么我们要及时的清理无效的订单,提升系统的性能;
曾经的纯洁无瑕

首先说下,我曾经那些纯洁无瑕的想法,第一次看到这种需求的时候,如果要清理失效的订单,那我直接写一个定时任务,5分钟或者10分钟跑一次,删除过期的订单,增加库存。

定时任务确实可以解决上面的问题,但是存在一个现实的问题,那就是数据库压力,甚至影响整个系统的性能,如果你是几百、几千个订单还好,要是十几万、甚至上百万,那么此方法肯定是行不通的。

进阶版本

既然发现了上面的方法不行,那么就重新想办法,有什么办法可以不用查询数据库,就可以知道哪些订单快要过期,再这样的思考下,我用Redis做了一个消息队列,当生成订单的时候,生成一个延迟10分钟消息,10分钟结束的时候,就会从Redis的队列中将这个订单取出来,然后这样我就可以将这个订单删除,增加库存。

Redis做消息队列存在的问题

之前用一个全新的Redis做消息队列,存储的数据也不多,感觉还好,当随着业务增加,Redis存储超过90%的时候,大量的消息没有被消费,就是消息丢失很严重。那么这样肯定是不行的。

删除订单,增加库存这是不能有太多误差的事情,所以Redis消息队列已经不能满足我的需求,那么就需要可靠性高的消息队列,也就是我们这次要介绍的RabbitMQ。

RabbitMQ安装与面板介绍

这里我就不跟大家介绍如何安装RabbitMQ了,网上其实有很多这种教程,所以大家自行搜索吧。重点要跟大家说下,RabbitMQ的面板,我们的消息队列,以及消息都是可以在面板上看到的。我是用的MQ的版本是3.8,各个版本之间的面板多多少少可能有点不太一样。

rabbitmq如何保证消息的可靠传输(详解如何利用RabbitMQ生产一个简单的消息)(1)

第一次接触的话,我们不要想着全部我一次性都看懂,都知道是干嘛的,我觉得没必要,先熟悉最基础的,我在上面圈了两个地方Queue、Admin和Add a new queue 这个三个是最基本的,我们学习必须要用的东西。

  • Queue:这个就是我们声明的消息队列;
  • Admin:用户管理,RabbitMQ默认有一个用户是guest,但是RabbitMQ神奇的就是每个库都必须创建一个用户角色;
  • Add a new queue:这个就是创建一个新的队列,但是我们一般不这么直接创建,而是在代码中创建;

rabbitmq如何保证消息的可靠传输(详解如何利用RabbitMQ生产一个简单的消息)(2)

再来补充下Admin吧,首先要告诉大家一个基本的东西,就是如果想要声明一个队列,那么你必须要有一个库(比喻手法),队列存在于库中,可以想象下mysql,是不是得先有库,再有表。MQ也是这样的。

右边的Virtual Hosts就是创建库,Name就是库名,写的时候前面必须加/

如果细心的朋友可以看到第一张图中两个队列前面就是库名,标识队列存在与xiaoshuo库中。

一个简单的消息队列

rabbitmq如何保证消息的可靠传输(详解如何利用RabbitMQ生产一个简单的消息)(3)

当生产者生产出消息之后,发送到队列中,消费者监听到队列中有消息进行消费,那么我们本篇就先实现一个简单的消息队列。

代码

我们不使用SpringBoot框架,我们就从基本的写,人生原生的API,这样以后才懂得什么是怎么回事。

1、引入需要的pom文件

<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version></dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.10</version></dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version></dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version></dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version></dependency>

2、连接RabbitMQ

/** * @description: TODO MQ连接工厂 * @author: bingfeng * @create: 2020-05-07 08:55 */public class MQConnectUtil { public static Connection getConnection() throws IOException, TimeoutException { // 定义连接工厂 Connectionfactory factory = new ConnectionFactory(); // 设置连接地址 factory.setHost("127.0.0.1"); // 设置端口 factory.setPort(5672); // 选择 vhost factory.setVirtualHost("/xiaoshuo"); // 设置用户名 factory.setUsername("bingfeng"); // 密码 factory.setPassword("123"); return factory.newConnection(); }}

3、发送消息

/** * @description: TODO 模拟发送消息 * @author: bingfeng * @create: 2020-05-07 09:01 */public class Producer { /** * 队列名 */ public static final String QUEUE_NAME = "simple_queue_test"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MQConnectUtil.getConnection(); // 从连接中获取一个通道 channel channel = connection.createChannel(); // 创建队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容 String msg = "你好,冰峰!"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("发送消息:" msg); channel.close(); connection.close(); }}

4、消费消息

/** * @description: TODO 消费MQ * @author: bingfeng * @create: 2020-05-07 09:11 */public class Consumer { public static final String QUEUE_NAME = "simple_queue_test"; public static void main(String[] args) throws Exception { // 获取连接 Connection connection = MQConnectUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); // 队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("消费消息:" msg); } }; // 监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); }}

当我们发送一个消息之后,我们可以直接在面板看到队列中的消息数

rabbitmq如何保证消息的可靠传输(详解如何利用RabbitMQ生产一个简单的消息)(4)

点进来之后,拉到下面,Messages就是我们想要查看的消息数量,就是你想看几条消息填几就行了,填完之后点击下面的Get Messages,我们最近的消息就会显示在下面。

rabbitmq如何保证消息的可靠传输(详解如何利用RabbitMQ生产一个简单的消息)(5)

当我们消费了这个消息之后,队列中就没有这个消息了。

最后的话

今天就写到这,我打算把RabbitMQ从入门到项目中的实战用法全部一级一级分享给大家,一是防止自己以后忘记可以回来翻翻看,二是分享给大家有兴趣的朋友可以一起学习,后面我也会把我之前说的,订单过期删除的业务场景也会写一遍。

熟悉RabbitMQ的朋友,可能看下来解决写的东西很简单,但是毕竟也有很多人实际工作中并没有用过MQ,自己可能也没有了解过,对于没有了解过的朋友来说,我觉得入个门还是挺不错的。

有什么问题,欢迎大家下方solo。

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页