RabbitMQ

2024-07-16 15:00

RabbitMQ简介

​ RabbitMQ 是部署最广泛的开源消息代理,他是一种消息队列(Message Queue),是一种跨进程的通信机制,用于上下游传递消息。

​ MQ作为消息中间件,最主要的作用系统之间的信息传递进行“解耦”,MQ是数据可靠性的重要保障。

​ RabbitMQ支持几乎所有的操作系统与编程语言。Rabbit提供了高并发、高可用的成熟方案,支持多种消息协议,易于部署与使用。

官网:https://www.rabbitmq.com/

RabbitMQ与其他MQ的对比


RabbitMQ

ACTIVEMQ

Kafka

RocketMQ

社区活跃度

非常活跃

非常活跃

活跃

不活跃

持久化

支持

支持

支持

支持

并发吞吐量

一般

极高

极高

数据可靠性

极高

一般

生态完整性

很好

很好

很好

一般

用户总量

多->一般

较多

应用场景

分布式、高可靠交易系统

传统业务系统

日志处理及大数据应用

互联网高并发、高可用应用

​ RabbitMQ使用Erlang开发 ,Erlang是一种通用的面向并发的编程语言, Erlang是一个结构化 ,动态类型编程语言,内建并行计算支持。使用Erlang来编写

分布式应用要简单的多 ,Erlang运行时环境是一个虚拟机 ,有点像Java虚拟机 ,这样代码一经编译 ,同样可以随处运行。

应用场景

​ 异构系统的数据传递

​ 高并发程序的流量控制

​ 基于P2P,P2PPP的程序

​ 分布式系统的事务一致性TCC

​ 高可靠性的交易系统

​ ......

安装配置

注意RabbitMQ版本和Erlang版本是对应关系,相应RabbitMQ版本对应的Erlangan安装版本OTP

image.png

详见:https://www.rabbitmq.com/docs/which-erlang

windows

#双击安装

otp_win64-xxx.exe

rabbitmq-server-xxx.exe

image.png

image.png

激活管理后台命令:

rabbitmq-plugins.bat enable rabbitmq_management

image.png

基本概念

**Producer:**生产者,消息的提供者

**Consumer:**消费者,消息的使用者

**Message:**消息,程序间的通信的数据

**Queue:**队列,消息存放的容器,消息先进先出

**Vhost:**虚拟主机,相当于MQ的“数据库”,用于存储队列

创建Vhost

image.png

image.png

测试连接示例代码

public class RabbitMQProducer {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            //factory.setVirtualHost("/test");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    }
}

image.png

image.png

image.png

发送消息示例代码

生产者

public class RabbitMQProducer {

    /**
     * 程序入口主方法。
     * 主要功能是读取rabbitmq配置文件,建立连接,并发送消息到指定队列。
     * @param args 命令行参数
     * @throws IOException 如果读取配置文件或通信过程中发生IO错误
     * @throws TimeoutException 如果建立连接超时
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        // 初始化ConnectionFactory,用于创建RabbitMQ连接
        //connectionFactory用于创建连接池
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 获取类加载器和配置文件路径
        //获取配置文件
        ClassLoader classLoader = RabbitMQProducer.class.getClassLoader();
        String resourceName = "config/rabbitmq.properties";
        // 创建Properties对象用于读取配置文件
        Properties properties = new Properties();
        // 尝试从配置文件中加载属性
        try (InputStream inputStream = classLoader.getResourceAsStream(resourceName)) {
            // 如果输入流为空,则抛出异常,表示配置文件未找到
            if (inputStream == null) {
                throw new IllegalStateException("Resource not found: " + resourceName);
            }
            // 加载配置文件到Properties对象中
            properties.load(inputStream);
            // 遍历并打印所有配置属性,用于调试和确认配置是否正确加载
            // Print the properties
            for (String key : properties.stringPropertyNames()) {
                System.out.println(key + " = " + properties.getProperty(key));
            }
        } catch (IOException ex) {
            // 打印IO异常堆栈跟踪,用于错误日志记录
            ex.printStackTrace();
        }
        // 从Properties中读取RabbitMQ连接和队列配置
        // 连接工厂的URL和虚拟机地址
        String url = properties.getProperty("url");
        String virtualHost = properties.getProperty("vhost");
        String user = properties.getProperty("user");
        String password = properties.getProperty("password");
        String queue= properties.getProperty("queue");
        // 配置ConnectionFactory,设置连接参数
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setUsername(user);
        connectionFactory.setPassword(password);
        // 声明变量用于存储连接和通道对象
        Connection connection = null;
        Channel channel = null;
        // 尝试创建连接和通道,声明队列,并发送消息
        // 创建连接
        try {
            // 创建连接
            connection = connectionFactory.newConnection();
            // 创建通道
            channel=connection.createChannel();
            // 声明队列
            /**
             * 声明一个队列。
             *
             * 在RabbitMQ中,队列是消息的消费者。此方法用于在服务器上声明一个队列。队列声明不会自动创建队列,
             * 如果队列已存在,服务器将返回确认。队列的属性可以通过参数进行配置。
             * @param queue 队列名称。如果为null,则服务器将生成一个随机名称。
             * @param durable 是否将队列声明为持久化。如果为true,则队列在服务器重启后仍然存在。
             * @param exclusive 是否将队列声明为独占队列。如果为true,则该队列只能被当前连接的消费者使用。
             * @param autoDelete 是否自动删除队列。如果为true,并且当最后一个消费者取消订阅后,队列将自动删除。
             * @param arguments 用于提供额外的队列属性。例如,可以使用此参数来设置队列的生存时间(TTL)。
             * @return DeclareOk对象,包含服务器返回的队列名称和已声明的队列数量。
             * @throws IOException 如果与RabbitMQ服务器的通信发生错误。
             */
            channel.queueDeclare(queue, true, false, false, null);
            // 循环发送多条消息
            for (int i=1;i<10;i++){
                // 发布消息到队列
                channel.basicPublish("", queue, null, ("hello:"+i).getBytes());
            }
        } catch (IOException e) {
            // 打印IO异常堆栈跟踪,用于错误日志记录
            e.printStackTrace();
        } catch (TimeoutException e) {
            // 打印超时异常堆栈跟踪,用于错误日志记录
            e.printStackTrace();
        }finally{
            // 注释掉的关闭通道和连接的代码,可能是之前用于资源释放的尝试
            if(channel!=null) {
                channel.close();
            }
            if(connection!=null) {
                connection.close();
            }
        }

    }
}

image.png

消费者

public class RabbitMQComsuer {


    /**
     * 程序入口主方法。
     * 主要功能是读取rabbitmq配置文件,建立连接,并接受消息到指定队列。
     *
     * @param args 命令行参数
     * @throws IOException 如果读取配置文件或通信过程中发生IO错误
     * @throws TimeoutException 如果建立连接超时
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        // 初始化ConnectionFactory,用于创建RabbitMQ连接
        //connectionFactory用于创建连接池
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 获取类加载器和配置文件路径
        //获取配置文件
        ClassLoader classLoader = RabbitMQComsuer.class.getClassLoader();
        String resourceName = "config/rabbitmq.properties";
        // 创建Properties对象用于读取配置文件
        Properties properties = new Properties();
        // 尝试从配置文件中加载属性
        try (InputStream inputStream = classLoader.getResourceAsStream(resourceName)) {
            // 如果输入流为空,则抛出异常,表示配置文件未找到
            if (inputStream == null) {
                throw new IllegalStateException("Resource not found: " + resourceName);
            }
            // 加载配置文件到Properties对象中
            properties.load(inputStream);
            // 遍历并打印所有配置属性,用于调试和确认配置是否正确加载
            // Print the properties
            for (String key : properties.stringPropertyNames()) {
                System.out.println(key + " = " + properties.getProperty(key));
            }
        } catch (IOException ex) {
            // 打印IO异常堆栈跟踪,用于错误日志记录
            ex.printStackTrace();
        }
        // 从Properties中读取RabbitMQ连接和队列配置
        // 连接工厂的URL和虚拟机地址
        String url = properties.getProperty("url");
        String virtualHost = properties.getProperty("vhost");
        String user = properties.getProperty("user");
        String password = properties.getProperty("password");
        String queue= properties.getProperty("queue");
        // 配置ConnectionFactory,设置连接参数
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setUsername(user);
        connectionFactory.setPassword(password);
        // 声明变量用于存储连接和通道对象
        Connection connection = null;
        Channel channel = null;
        // 尝试创建连接和通道,声明队列,并发送消息
        // 创建连接
        try {
            // 创建连接
            connection = connectionFactory.newConnection();
            // 创建通道
            channel=connection.createChannel();
            // 声明队列
            /**
             * 声明一个队列。
             *
             * 在RabbitMQ中,队列是消息的消费者。此方法用于在服务器上声明一个队列。队列声明不会自动创建队列,
             * 如果队列已存在,服务器将返回确认。队列的属性可以通过参数进行配置。
             * @param queue 队列名称。如果为null,则服务器将生成一个随机名称。
             * @param durable 是否将队列声明为持久化。如果为true,则队列在服务器重启后仍然存在。
             * @param exclusive 是否将队列声明为独占队列。如果为true,则该队列只能被当前连接的消费者使用。
             * @param autoDelete 是否自动删除队列。如果为true,并且当最后一个消费者取消订阅后,队列将自动删除。
             * @param arguments 用于提供额外的队列属性。例如,可以使用此参数来设置队列的生存时间(TTL)。
             * @return DeclareOk对象,包含服务器返回的队列名称和已声明的队列数量。
             * @throws IOException 如果与RabbitMQ服务器的通信发生错误。
             */
            channel.queueDeclare(queue, true, false, false, null);
            /**
             * 开始消费队列中的消息。
             * 使用自动确认模式(acknowledgeMode),即消费者在处理完消息后自动确认消息已被接收。
             * 这种方式下,如果消费者处理消息过程中发生异常,消息不会丢失,因为它还没有被确认。
             * @param queue 要消费的队列名称。
             * @param autoAck 是否使用自动确认模式。
             * @param consumer 实现了Consumer接口的匿名类,用于处理接收到的消息。
             */
            channel.basicConsume(queue,true, new Consumer() {
                /**
                 * 处理消费确认的回调方法。
                 * 当消费者处理完消息并成功确认时,RabbitMQ会调用此方法。
                 *
                 * @param consumerTag 消费者的标签,用于标识消费者。
                 */
                @Override
                public void handleConsumeOk(String consumerTag) {
                    // 处理消费确认的逻辑代码(如果有的话)
                }

                /**
                 * 处理消费者取消订阅确认的回调方法。
                 * 当消费者成功取消订阅时,RabbitMQ会调用此方法。
                 *
                 * @param consumerTag 消费者的标签,用于标识消费者。
                 */
                @Override
                public void handleCancelOk(String consumerTag) {
                    // 处理取消订阅确认的逻辑代码(如果有的话)
                }

                /**
                 * 处理消费者取消订阅的回调方法。
                 * 当消费者取消订阅操作失败时,RabbitMQ会调用此方法。
                 *
                 * @param consumerTag 消费者的标签,用于标识消费者。
                 * @throws IOException 如果处理取消订阅时发生IO错误。
                 */
                @Override
                public void handleCancel(String consumerTag) throws IOException {
                    // 处理取消订阅失败的逻辑代码(如果有的话)
                }

                /**
                 * 处理关闭信号的回调方法。
                 * 当RabbitMQ连接关闭或出现异常时,会调用此方法。
                 *
                 * @param consumerTag 消费者的标签,用于标识消费者。
                 * @param sig         关闭连接的异常信息。
                 */
                @Override
                public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
                    // 处理关闭信号的逻辑代码(如果有的话)
                }

                /**
                 * 处理消息恢复确认的回调方法。
                 * 当消费者重启后,RabbitMQ会调用此方法来确认消费者已经恢复。
                 *
                 * @param consumerTag 消费者的标签,用于标识消费者。
                 */
                @Override
                public void handleRecoverOk(String consumerTag) {
                    // 处理消息恢复确认的逻辑代码(如果有的话)
                }

                /**
                 * 处理消息投递的回调方法。
                 * 当有新消息到达时,RabbitMQ会调用此方法将消息传递给消费者。
                 *
                 * @param consumerTag 消费者的标签,用于标识消费者。
                 * @param envelope    消息的信封,包含消息的基本信息,如消息ID、发布者等。
                 * @param properties  消息的属性,包含消息的额外元数据。
                 * @param body        消息的主体内容。
                 * @throws IOException 如果处理消息时发生IO错误。
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 处理消息的逻辑代码(解码消息体、处理业务逻辑等)
                    System.out.println(new String(body));
                }
            });
        } catch (IOException e) {
            // 打印IO异常堆栈跟踪,用于错误日志记录
            e.printStackTrace();
        } catch (TimeoutException e) {
            // 打印超时异常堆栈跟踪,用于错误日志记录
            e.printStackTrace();
        }finally{
            // 注释掉的关闭通道和连接的代码,可能是之前用于资源释放的尝试
//            if(channel!=null) {
//                channel.close();
//            }
//            if(connection!=null) {
//                connection.close();
//            }
        }

    }
}
相关新闻
热点
视频
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号