RabbitMQ简介
RabbitMQ 是部署最广泛的开源消息代理,他是一种消息队列(Message Queue),是一种跨进程的通信机制,用于上下游传递消息。
MQ作为消息中间件,最主要的作用系统之间的信息传递进行“解耦”,MQ是数据可靠性的重要保障。
RabbitMQ支持几乎所有的操作系统与编程语言。Rabbit提供了高并发、高可用的成熟方案,支持多种消息协议,易于部署与使用。
官网:https://www.rabbitmq.com/
RabbitMQ与其他MQ的对比
RabbitMQ | ACTIVEMQ | Kafka | RocketMQ | |
---|---|---|---|---|
社区活跃度 | 非常活跃 | 非常活跃 | 活跃 | 不活跃 |
持久化 | 支持 | 支持 | 支持 | 支持 |
并发吞吐量 | 高 | 一般 | 极高 | 极高 |
数据可靠性 | 极高 | 一般 | 高 | 高 |
生态完整性 | 很好 | 很好 | 很好 | 一般 |
用户总量 | 多 | 多->一般 | 较多 | 少 |
应用场景 | 分布式、高可靠交易系统 | 传统业务系统 | 日志处理及大数据应用 | 互联网高并发、高可用应用 |
RabbitMQ使用Erlang开发 ,Erlang是一种通用的面向并发的编程语言, Erlang是一个结构化 ,动态类型编程语言,内建并行计算支持。使用Erlang来编写
分布式应用要简单的多 ,Erlang运行时环境是一个虚拟机 ,有点像Java虚拟机 ,这样代码一经编译 ,同样可以随处运行。
应用场景
异构系统的数据传递
高并发程序的流量控制
基于P2P,P2PPP的程序
分布式系统的事务一致性TCC
高可靠性的交易系统
......
安装配置
注意RabbitMQ版本和Erlang版本是对应关系,相应RabbitMQ版本对应的Erlangan安装版本OTP
详见:https://www.rabbitmq.com/docs/which-erlang
windows
#双击安装
otp_win64-xxx.exe
rabbitmq-server-xxx.exe
激活管理后台命令:
rabbitmq-plugins.bat enable rabbitmq_management
基本概念
**Producer:**生产者,消息的提供者
**Consumer:**消费者,消息的使用者
**Message:**消息,程序间的通信的数据
**Queue:**队列,消息存放的容器,消息先进先出
**Vhost:**虚拟主机,相当于MQ的“数据库”,用于存储队列
创建Vhost
测试连接示例代码
public class RabbitMQProducer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//factory.setVirtualHost("/test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}
发送消息示例代码
生产者
public class RabbitMQProducer {
/**
* 程序入口主方法。
* 主要功能是读取rabbitmq配置文件,建立连接,并发送消息到指定队列。
* @param args 命令行参数
* @throws IOException 如果读取配置文件或通信过程中发生IO错误
* @throws TimeoutException 如果建立连接超时
*/
public static void main(String[] args) throws IOException, TimeoutException {
// 初始化ConnectionFactory,用于创建RabbitMQ连接
//connectionFactory用于创建连接池
ConnectionFactory connectionFactory = new ConnectionFactory();
// 获取类加载器和配置文件路径
//获取配置文件
ClassLoader classLoader = RabbitMQProducer.class.getClassLoader();
String resourceName = "config/rabbitmq.properties";
// 创建Properties对象用于读取配置文件
Properties properties = new Properties();
// 尝试从配置文件中加载属性
try (InputStream inputStream = classLoader.getResourceAsStream(resourceName)) {
// 如果输入流为空,则抛出异常,表示配置文件未找到
if (inputStream == null) {
throw new IllegalStateException("Resource not found: " + resourceName);
}
// 加载配置文件到Properties对象中
properties.load(inputStream);
// 遍历并打印所有配置属性,用于调试和确认配置是否正确加载
// Print the properties
for (String key : properties.stringPropertyNames()) {
System.out.println(key + " = " + properties.getProperty(key));
}
} catch (IOException ex) {
// 打印IO异常堆栈跟踪,用于错误日志记录
ex.printStackTrace();
}
// 从Properties中读取RabbitMQ连接和队列配置
// 连接工厂的URL和虚拟机地址
String url = properties.getProperty("url");
String virtualHost = properties.getProperty("vhost");
String user = properties.getProperty("user");
String password = properties.getProperty("password");
String queue= properties.getProperty("queue");
// 配置ConnectionFactory,设置连接参数
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setUsername(user);
connectionFactory.setPassword(password);
// 声明变量用于存储连接和通道对象
Connection connection = null;
Channel channel = null;
// 尝试创建连接和通道,声明队列,并发送消息
// 创建连接
try {
// 创建连接
connection = connectionFactory.newConnection();
// 创建通道
channel=connection.createChannel();
// 声明队列
/**
* 声明一个队列。
*
* 在RabbitMQ中,队列是消息的消费者。此方法用于在服务器上声明一个队列。队列声明不会自动创建队列,
* 如果队列已存在,服务器将返回确认。队列的属性可以通过参数进行配置。
* @param queue 队列名称。如果为null,则服务器将生成一个随机名称。
* @param durable 是否将队列声明为持久化。如果为true,则队列在服务器重启后仍然存在。
* @param exclusive 是否将队列声明为独占队列。如果为true,则该队列只能被当前连接的消费者使用。
* @param autoDelete 是否自动删除队列。如果为true,并且当最后一个消费者取消订阅后,队列将自动删除。
* @param arguments 用于提供额外的队列属性。例如,可以使用此参数来设置队列的生存时间(TTL)。
* @return DeclareOk对象,包含服务器返回的队列名称和已声明的队列数量。
* @throws IOException 如果与RabbitMQ服务器的通信发生错误。
*/
channel.queueDeclare(queue, true, false, false, null);
// 循环发送多条消息
for (int i=1;i<10;i++){
// 发布消息到队列
channel.basicPublish("", queue, null, ("hello:"+i).getBytes());
}
} catch (IOException e) {
// 打印IO异常堆栈跟踪,用于错误日志记录
e.printStackTrace();
} catch (TimeoutException e) {
// 打印超时异常堆栈跟踪,用于错误日志记录
e.printStackTrace();
}finally{
// 注释掉的关闭通道和连接的代码,可能是之前用于资源释放的尝试
if(channel!=null) {
channel.close();
}
if(connection!=null) {
connection.close();
}
}
}
}
消费者
public class RabbitMQComsuer {
/**
* 程序入口主方法。
* 主要功能是读取rabbitmq配置文件,建立连接,并接受消息到指定队列。
*
* @param args 命令行参数
* @throws IOException 如果读取配置文件或通信过程中发生IO错误
* @throws TimeoutException 如果建立连接超时
*/
public static void main(String[] args) throws IOException, TimeoutException {
// 初始化ConnectionFactory,用于创建RabbitMQ连接
//connectionFactory用于创建连接池
ConnectionFactory connectionFactory = new ConnectionFactory();
// 获取类加载器和配置文件路径
//获取配置文件
ClassLoader classLoader = RabbitMQComsuer.class.getClassLoader();
String resourceName = "config/rabbitmq.properties";
// 创建Properties对象用于读取配置文件
Properties properties = new Properties();
// 尝试从配置文件中加载属性
try (InputStream inputStream = classLoader.getResourceAsStream(resourceName)) {
// 如果输入流为空,则抛出异常,表示配置文件未找到
if (inputStream == null) {
throw new IllegalStateException("Resource not found: " + resourceName);
}
// 加载配置文件到Properties对象中
properties.load(inputStream);
// 遍历并打印所有配置属性,用于调试和确认配置是否正确加载
// Print the properties
for (String key : properties.stringPropertyNames()) {
System.out.println(key + " = " + properties.getProperty(key));
}
} catch (IOException ex) {
// 打印IO异常堆栈跟踪,用于错误日志记录
ex.printStackTrace();
}
// 从Properties中读取RabbitMQ连接和队列配置
// 连接工厂的URL和虚拟机地址
String url = properties.getProperty("url");
String virtualHost = properties.getProperty("vhost");
String user = properties.getProperty("user");
String password = properties.getProperty("password");
String queue= properties.getProperty("queue");
// 配置ConnectionFactory,设置连接参数
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setUsername(user);
connectionFactory.setPassword(password);
// 声明变量用于存储连接和通道对象
Connection connection = null;
Channel channel = null;
// 尝试创建连接和通道,声明队列,并发送消息
// 创建连接
try {
// 创建连接
connection = connectionFactory.newConnection();
// 创建通道
channel=connection.createChannel();
// 声明队列
/**
* 声明一个队列。
*
* 在RabbitMQ中,队列是消息的消费者。此方法用于在服务器上声明一个队列。队列声明不会自动创建队列,
* 如果队列已存在,服务器将返回确认。队列的属性可以通过参数进行配置。
* @param queue 队列名称。如果为null,则服务器将生成一个随机名称。
* @param durable 是否将队列声明为持久化。如果为true,则队列在服务器重启后仍然存在。
* @param exclusive 是否将队列声明为独占队列。如果为true,则该队列只能被当前连接的消费者使用。
* @param autoDelete 是否自动删除队列。如果为true,并且当最后一个消费者取消订阅后,队列将自动删除。
* @param arguments 用于提供额外的队列属性。例如,可以使用此参数来设置队列的生存时间(TTL)。
* @return DeclareOk对象,包含服务器返回的队列名称和已声明的队列数量。
* @throws IOException 如果与RabbitMQ服务器的通信发生错误。
*/
channel.queueDeclare(queue, true, false, false, null);
/**
* 开始消费队列中的消息。
* 使用自动确认模式(acknowledgeMode),即消费者在处理完消息后自动确认消息已被接收。
* 这种方式下,如果消费者处理消息过程中发生异常,消息不会丢失,因为它还没有被确认。
* @param queue 要消费的队列名称。
* @param autoAck 是否使用自动确认模式。
* @param consumer 实现了Consumer接口的匿名类,用于处理接收到的消息。
*/
channel.basicConsume(queue,true, new Consumer() {
/**
* 处理消费确认的回调方法。
* 当消费者处理完消息并成功确认时,RabbitMQ会调用此方法。
*
* @param consumerTag 消费者的标签,用于标识消费者。
*/
@Override
public void handleConsumeOk(String consumerTag) {
// 处理消费确认的逻辑代码(如果有的话)
}
/**
* 处理消费者取消订阅确认的回调方法。
* 当消费者成功取消订阅时,RabbitMQ会调用此方法。
*
* @param consumerTag 消费者的标签,用于标识消费者。
*/
@Override
public void handleCancelOk(String consumerTag) {
// 处理取消订阅确认的逻辑代码(如果有的话)
}
/**
* 处理消费者取消订阅的回调方法。
* 当消费者取消订阅操作失败时,RabbitMQ会调用此方法。
*
* @param consumerTag 消费者的标签,用于标识消费者。
* @throws IOException 如果处理取消订阅时发生IO错误。
*/
@Override
public void handleCancel(String consumerTag) throws IOException {
// 处理取消订阅失败的逻辑代码(如果有的话)
}
/**
* 处理关闭信号的回调方法。
* 当RabbitMQ连接关闭或出现异常时,会调用此方法。
*
* @param consumerTag 消费者的标签,用于标识消费者。
* @param sig 关闭连接的异常信息。
*/
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// 处理关闭信号的逻辑代码(如果有的话)
}
/**
* 处理消息恢复确认的回调方法。
* 当消费者重启后,RabbitMQ会调用此方法来确认消费者已经恢复。
*
* @param consumerTag 消费者的标签,用于标识消费者。
*/
@Override
public void handleRecoverOk(String consumerTag) {
// 处理消息恢复确认的逻辑代码(如果有的话)
}
/**
* 处理消息投递的回调方法。
* 当有新消息到达时,RabbitMQ会调用此方法将消息传递给消费者。
*
* @param consumerTag 消费者的标签,用于标识消费者。
* @param envelope 消息的信封,包含消息的基本信息,如消息ID、发布者等。
* @param properties 消息的属性,包含消息的额外元数据。
* @param body 消息的主体内容。
* @throws IOException 如果处理消息时发生IO错误。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 处理消息的逻辑代码(解码消息体、处理业务逻辑等)
System.out.println(new String(body));
}
});
} catch (IOException e) {
// 打印IO异常堆栈跟踪,用于错误日志记录
e.printStackTrace();
} catch (TimeoutException e) {
// 打印超时异常堆栈跟踪,用于错误日志记录
e.printStackTrace();
}finally{
// 注释掉的关闭通道和连接的代码,可能是之前用于资源释放的尝试
// if(channel!=null) {
// channel.close();
// }
// if(connection!=null) {
// connection.close();
// }
}
}
}