RabbitMQ之路由Routing模式

2024-07-20 18:50

发布(Publish)/订阅(Subscribe)模式

1.发布/订阅模式中,生产者不再直接与队列绑定,而是将数据发送至“交换机Exchange”

2.交换机Exchange用于将数据按某种规则送入与之绑定的队列,进而供消费者使用。

3.发布/订阅模式中,交换机将无差别的将所有消息送入与之绑定的队列,所有消费者拿到的消息完全相同,交换机的类型被称为fanout。

应用场景

​ 发布订阅模式因为所有消费者获得相同的消息,所以特别适合“数据提供商与应用商”。

​ 例如:今日头条提供“新闻”送入交换机,网易、新浪、百度、搜狐等门户接入通过队列绑定到该交换机,自动获取今日头条推送的新闻数据。

示例

1.创建交换机

image.png

image.png

创建队列

image.png

示例代码:**


发布者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

public class PublishMain {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 初始化ConnectionFactory,用于创建RabbitMQ连接
        //connectionFactory用于创建连接池
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 获取类加载器和配置文件路径
        //获取配置文件
        ClassLoader classLoader = RabbitMQProducer.class.getClassLoader();
        String resourceName = "config/rabbitmq.properties";
        // 创建Properties对象用于读取配置文件
        Properties properties = new Properties();
        // 尝试从配置文件中加载属性
        try (InputStream inputStream = classLoader.getResourceAsStream(resourceName)) {
            // 如果输入流为空,则抛出异常,表示配置文件未找到
            if (inputStream == null) {
                throw new IllegalStateException("Resource not found: " + resourceName);
            }
            // 加载配置文件到Properties对象中
            properties.load(inputStream);
            // 遍历并打印所有配置属性,用于调试和确认配置是否正确加载
            // Print the properties
            for (String key : properties.stringPropertyNames()) {
                System.out.println(key + " = " + properties.getProperty(key));
            }
        } catch (IOException ex) {
            // 打印IO异常堆栈跟踪,用于错误日志记录
            ex.printStackTrace();
        }
        // 从Properties中读取RabbitMQ连接和队列配置
        // 连接工厂的URL和虚拟机地址
        String url = properties.getProperty("url");
        String virtualHost = properties.getProperty("vhost");
        String user = properties.getProperty("user");
        String password = properties.getProperty("password");
        String queue = properties.getProperty("queue");
        // 配置ConnectionFactory,设置连接参数
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setUsername(user);
        connectionFactory.setPassword(password);
        // 声明变量用于存储连接和通道对象
        Connection connection = null;
        Channel channel = null;
        // 尝试创建连接和通道,声明队列,并发送消息
        // 创建连接
        try {
            // 创建连接
            connection = connectionFactory.newConnection();
            // 创建通道
            channel = connection.createChannel();
            for(int i=0;i<10;i++)
               // 向指定的交换机发送一条消息
             channel.basicPublish("news","",null,("新闻信息"+i).getBytes());
        } catch (IOException e) {
            // 打印IO异常堆栈跟踪,用于错误日志记录
            e.printStackTrace();
        } catch (TimeoutException e) {
            // 打印超时异常堆栈跟踪,用于错误日志记录
            e.printStackTrace();
        } finally {
            // 注释掉的关闭通道和连接的代码,可能是之前用于资源释放的尝试
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

订阅者

import com.rabbitmq.client.*;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

public class SubscribeMain {

    public static void main(String[] args) throws IOException, TimeoutException {
            // 初始化ConnectionFactory,用于创建RabbitMQ连接
            //connectionFactory用于创建连接池
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 获取类加载器和配置文件路径
            //获取配置文件
            ClassLoader classLoader = RabbitMQProducer.class.getClassLoader();
            String resourceName = "config/rabbitmq.properties";
            // 创建Properties对象用于读取配置文件
            Properties properties = new Properties();
            // 尝试从配置文件中加载属性
            try (InputStream inputStream = classLoader.getResourceAsStream(resourceName)) {
                // 如果输入流为空,则抛出异常,表示配置文件未找到
                if (inputStream == null) {
                    throw new IllegalStateException("Resource not found: " + resourceName);
                }
                // 加载配置文件到Properties对象中
                properties.load(inputStream);
                // 遍历并打印所有配置属性,用于调试和确认配置是否正确加载
                // Print the properties
                for (String key : properties.stringPropertyNames()) {
                    System.out.println(key + " = " + properties.getProperty(key));
                }
            } catch (IOException ex) {
                // 打印IO异常堆栈跟踪,用于错误日志记录
                ex.printStackTrace();
            }
            // 从Properties中读取RabbitMQ连接和队列配置
            // 连接工厂的URL和虚拟机地址
            String url = properties.getProperty("url");
            String virtualHost = properties.getProperty("vhost");
            String user = properties.getProperty("user");
            String password = properties.getProperty("password");
            String queue= properties.getProperty("queue");
            // 配置ConnectionFactory,设置连接参数
            connectionFactory.setVirtualHost(virtualHost);
            connectionFactory.setUsername(user);
            connectionFactory.setPassword(password);
            // 声明变量用于存储连接和通道对象
            Connection connection = null;
            Channel channel = null;
            // 尝试创建连接和通道,声明队列,并发送消息
            // 创建连接
            try {
                // 创建连接
                connection = connectionFactory.newConnection();
                // 创建通道
                channel=connection.createChannel();
                /**
                 * 将队列绑定到交换机。
                 * <p>
                 * 此操作将指定的队列绑定到一个交换机上,确保队列能够接收到来自该交换机的消息。
                 * 绑定的关键在于路由键,它决定了消息是否应该被路由到这个队列。
                 * 在此示例中,使用的路由键为空字符串,这可能表示所有消息都将被路由到这个队列,
                 * 或者特定的消息类型通过空字符串作为标识。
                 * </p>
                 * @param queueName 队列的名称。在此处为空字符串,可能表示使用默认队列。
                 * @param exchangeName 交换机的名称。此处为空字符串,可能表示使用默认交换机。
                 * @param routingKey 路由键,用于决定消息是否应该被路由到这个队列。此处为空字符串,可能表示匹配所有消息。
                 */
                channel.queueBind("baidu","news","");
                /* 设置消费者处理消息的速率 */
                channel.basicQos(1);

                Channel finalChannel = channel;

                channel.basicConsume("baidu", false, new Consumer() {
                    /**
                     * 处理消费确认的回调方法。
                     * 当RabbitMQ接收到消费者对消息的确认时,会调用此方法。
                     * @param consumerTag 消费者的标签,用于标识消费者。
                     */
                    @Override
                    public void handleConsumeOk(String consumerTag) {
                        // 处理消费确认的逻辑代码(如果有的话)
                    }

                    /**
                     * 处理消费者取消订阅确认的回调方法。
                     * 当消费者成功取消订阅时,RabbitMQ会调用此方法。
                     * @param consumerTag 消费者的标签,用于标识消费者。
                     */
                    @Override
                    public void handleCancelOk(String consumerTag) {
                        // 处理取消订阅确认的逻辑代码(如果有的话)
                    }

                    /**
                     * 处理消费者取消订阅的回调方法。
                     * 当消费者取消订阅时,如果出现异常,RabbitMQ会调用此方法。
                     * @param consumerTag 消费者的标签,用于标识消费者。
                     * @throws IOException 如果取消订阅操作引发IO异常。
                     */
                    @Override
                    public void handleCancel(String consumerTag) throws IOException {
                        // 处理取消订阅的异常逻辑代码(如果有的话)
                    }

                    /**
                     * 处理关闭信号的回调方法。
                     * 当RabbitMQ连接关闭或出现异常时,会调用此方法。
                     * @param consumerTag 消费者的标签,用于标识消费者。
                     * @param sig 关闭连接的异常信息。
                     */
                    @Override
                    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
                        // 处理关闭信号的逻辑代码(如果有的话)
                    }

                    /**
                     * 处理消息恢复确认的回调方法。
                     * 当消费者重启后,RabbitMQ会调用此方法来确认消费者已经恢复。
                     * @param consumerTag 消费者的标签,用于标识消费者。
                     */
                    @Override
                    public void handleRecoverOk(String consumerTag) {
                        // 处理消息恢复确认的逻辑代码(如果有的话)
                    }

                    /**
                     * 处理消息投递的回调方法。
                     * 当有新消息到达时,RabbitMQ会调用此方法将消息传递给消费者。
                     * @param consumerTag 消费者的标签,用于标识消费者。
                     * @param envelope 消息的信封,包含消息的ID、发布者等信息。
                     * @param properties 消息的属性,包含消息的标题、优先级等信息。
                     * @param body 消息的主体。
                     * @throws IOException 如果处理消息时发生IO异常。
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        // 处理消息投递的逻辑代码(解码消息体等)
                        System.out.println(new String(body));
                        // 执行消息处理逻辑后,确认消息已被消费
                        finalChannel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });

            } catch (IOException e) {
                // 打印IO异常堆栈跟踪,用于错误日志记录
                e.printStackTrace();
            } catch (TimeoutException e) {
                // 打印超时异常堆栈跟踪,用于错误日志记录
                e.printStackTrace();
            }finally{
                // 注释掉的关闭通道和连接的代码,可能是之前用于资源释放的尝试
//                if(channel!=null) {
//                    channel.close();
//                }
//                if(connection!=null) {
//                    connection.close();
//                }
            }
    }
}

相关新闻
热点
视频
投票
查看结果
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量:

© 2025 个人网站 版权所有

备案号:苏ICP备2024108837号

苏公网安备32011302322151号