发布(Publish)/订阅(Subscribe)模式
1.发布/订阅模式中,生产者不再直接与队列绑定,而是将数据发送至“交换机Exchange”
2.交换机Exchange用于将数据按某种规则送入与之绑定的队列,进而供消费者使用。
3.发布/订阅模式中,交换机将无差别的将所有消息送入与之绑定的队列,所有消费者拿到的消息完全相同,交换机的类型被称为fanout。
应用场景
发布订阅模式因为所有消费者获得相同的消息,所以特别适合“数据提供商与应用商”。
例如:今日头条提供“新闻”送入交换机,网易、新浪、百度、搜狐等门户接入通过队列绑定到该交换机,自动获取今日头条推送的新闻数据。
示例
1.创建交换机
创建队列
示例代码:**
发布者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
public class PublishMain {
public static void main(String[] args) throws IOException, TimeoutException {
// 初始化ConnectionFactory,用于创建RabbitMQ连接
//connectionFactory用于创建连接池
ConnectionFactory connectionFactory = new ConnectionFactory();
// 获取类加载器和配置文件路径
//获取配置文件
ClassLoader classLoader = RabbitMQProducer.class.getClassLoader();
String resourceName = "config/rabbitmq.properties";
// 创建Properties对象用于读取配置文件
Properties properties = new Properties();
// 尝试从配置文件中加载属性
try (InputStream inputStream = classLoader.getResourceAsStream(resourceName)) {
// 如果输入流为空,则抛出异常,表示配置文件未找到
if (inputStream == null) {
throw new IllegalStateException("Resource not found: " + resourceName);
}
// 加载配置文件到Properties对象中
properties.load(inputStream);
// 遍历并打印所有配置属性,用于调试和确认配置是否正确加载
// Print the properties
for (String key : properties.stringPropertyNames()) {
System.out.println(key + " = " + properties.getProperty(key));
}
} catch (IOException ex) {
// 打印IO异常堆栈跟踪,用于错误日志记录
ex.printStackTrace();
}
// 从Properties中读取RabbitMQ连接和队列配置
// 连接工厂的URL和虚拟机地址
String url = properties.getProperty("url");
String virtualHost = properties.getProperty("vhost");
String user = properties.getProperty("user");
String password = properties.getProperty("password");
String queue = properties.getProperty("queue");
// 配置ConnectionFactory,设置连接参数
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setUsername(user);
connectionFactory.setPassword(password);
// 声明变量用于存储连接和通道对象
Connection connection = null;
Channel channel = null;
// 尝试创建连接和通道,声明队列,并发送消息
// 创建连接
try {
// 创建连接
connection = connectionFactory.newConnection();
// 创建通道
channel = connection.createChannel();
for(int i=0;i<10;i++)
// 向指定的交换机发送一条消息
channel.basicPublish("news","",null,("新闻信息"+i).getBytes());
} catch (IOException e) {
// 打印IO异常堆栈跟踪,用于错误日志记录
e.printStackTrace();
} catch (TimeoutException e) {
// 打印超时异常堆栈跟踪,用于错误日志记录
e.printStackTrace();
} finally {
// 注释掉的关闭通道和连接的代码,可能是之前用于资源释放的尝试
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
订阅者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
public class SubscribeMain {
public static void main(String[] args) throws IOException, TimeoutException {
// 初始化ConnectionFactory,用于创建RabbitMQ连接
//connectionFactory用于创建连接池
ConnectionFactory connectionFactory = new ConnectionFactory();
// 获取类加载器和配置文件路径
//获取配置文件
ClassLoader classLoader = RabbitMQProducer.class.getClassLoader();
String resourceName = "config/rabbitmq.properties";
// 创建Properties对象用于读取配置文件
Properties properties = new Properties();
// 尝试从配置文件中加载属性
try (InputStream inputStream = classLoader.getResourceAsStream(resourceName)) {
// 如果输入流为空,则抛出异常,表示配置文件未找到
if (inputStream == null) {
throw new IllegalStateException("Resource not found: " + resourceName);
}
// 加载配置文件到Properties对象中
properties.load(inputStream);
// 遍历并打印所有配置属性,用于调试和确认配置是否正确加载
// Print the properties
for (String key : properties.stringPropertyNames()) {
System.out.println(key + " = " + properties.getProperty(key));
}
} catch (IOException ex) {
// 打印IO异常堆栈跟踪,用于错误日志记录
ex.printStackTrace();
}
// 从Properties中读取RabbitMQ连接和队列配置
// 连接工厂的URL和虚拟机地址
String url = properties.getProperty("url");
String virtualHost = properties.getProperty("vhost");
String user = properties.getProperty("user");
String password = properties.getProperty("password");
String queue= properties.getProperty("queue");
// 配置ConnectionFactory,设置连接参数
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setUsername(user);
connectionFactory.setPassword(password);
// 声明变量用于存储连接和通道对象
Connection connection = null;
Channel channel = null;
// 尝试创建连接和通道,声明队列,并发送消息
// 创建连接
try {
// 创建连接
connection = connectionFactory.newConnection();
// 创建通道
channel=connection.createChannel();
/**
* 将队列绑定到交换机。
* <p>
* 此操作将指定的队列绑定到一个交换机上,确保队列能够接收到来自该交换机的消息。
* 绑定的关键在于路由键,它决定了消息是否应该被路由到这个队列。
* 在此示例中,使用的路由键为空字符串,这可能表示所有消息都将被路由到这个队列,
* 或者特定的消息类型通过空字符串作为标识。
* </p>
* @param queueName 队列的名称。在此处为空字符串,可能表示使用默认队列。
* @param exchangeName 交换机的名称。此处为空字符串,可能表示使用默认交换机。
* @param routingKey 路由键,用于决定消息是否应该被路由到这个队列。此处为空字符串,可能表示匹配所有消息。
*/
channel.queueBind("baidu","news","");
/* 设置消费者处理消息的速率 */
channel.basicQos(1);
Channel finalChannel = channel;
channel.basicConsume("baidu", false, new Consumer() {
/**
* 处理消费确认的回调方法。
* 当RabbitMQ接收到消费者对消息的确认时,会调用此方法。
* @param consumerTag 消费者的标签,用于标识消费者。
*/
@Override
public void handleConsumeOk(String consumerTag) {
// 处理消费确认的逻辑代码(如果有的话)
}
/**
* 处理消费者取消订阅确认的回调方法。
* 当消费者成功取消订阅时,RabbitMQ会调用此方法。
* @param consumerTag 消费者的标签,用于标识消费者。
*/
@Override
public void handleCancelOk(String consumerTag) {
// 处理取消订阅确认的逻辑代码(如果有的话)
}
/**
* 处理消费者取消订阅的回调方法。
* 当消费者取消订阅时,如果出现异常,RabbitMQ会调用此方法。
* @param consumerTag 消费者的标签,用于标识消费者。
* @throws IOException 如果取消订阅操作引发IO异常。
*/
@Override
public void handleCancel(String consumerTag) throws IOException {
// 处理取消订阅的异常逻辑代码(如果有的话)
}
/**
* 处理关闭信号的回调方法。
* 当RabbitMQ连接关闭或出现异常时,会调用此方法。
* @param consumerTag 消费者的标签,用于标识消费者。
* @param sig 关闭连接的异常信息。
*/
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// 处理关闭信号的逻辑代码(如果有的话)
}
/**
* 处理消息恢复确认的回调方法。
* 当消费者重启后,RabbitMQ会调用此方法来确认消费者已经恢复。
* @param consumerTag 消费者的标签,用于标识消费者。
*/
@Override
public void handleRecoverOk(String consumerTag) {
// 处理消息恢复确认的逻辑代码(如果有的话)
}
/**
* 处理消息投递的回调方法。
* 当有新消息到达时,RabbitMQ会调用此方法将消息传递给消费者。
* @param consumerTag 消费者的标签,用于标识消费者。
* @param envelope 消息的信封,包含消息的ID、发布者等信息。
* @param properties 消息的属性,包含消息的标题、优先级等信息。
* @param body 消息的主体。
* @throws IOException 如果处理消息时发生IO异常。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 处理消息投递的逻辑代码(解码消息体等)
System.out.println(new String(body));
// 执行消息处理逻辑后,确认消息已被消费
finalChannel.basicAck(envelope.getDeliveryTag(), false);
}
});
} catch (IOException e) {
// 打印IO异常堆栈跟踪,用于错误日志记录
e.printStackTrace();
} catch (TimeoutException e) {
// 打印超时异常堆栈跟踪,用于错误日志记录
e.printStackTrace();
}finally{
// 注释掉的关闭通道和连接的代码,可能是之前用于资源释放的尝试
// if(channel!=null) {
// channel.close();
// }
// if(connection!=null) {
// connection.close();
// }
}
}
}