【RabbitMQ】路由模式(使用案例)

张开发
2026/4/20 4:45:41 15 分钟阅读

分享文章

【RabbitMQ】路由模式(使用案例)
文章目录1. Routing路由模式2、引入依赖3. 生产者代码编写3.1 创建交换机3.2 声明队列3.3 绑定交换机和队列3.4 发送消息3.5 完整代码4. 消费者代码编写4.1 消费者一4.1 消费者二5. 运行程序1. Routing路由模式队列和交换机的绑定不能是任意的绑定了而是要指定一个 BindingKeyRoutingKey 的一种消息的发送方在向 Exchange 发送消息时也需要指定消息的 RoutingKey。Exchange 也不再把消息交给每一个绑定的 key而是根据消息的 RoutingKey 进行判断只有队列绑定时的 BindingKey 和发送消息的 RoutingKey 完全一致才会接收到消息。接下来我们看看 Routing 模式的实现步骤1、引入依赖2、编写生产者代码3、编写消费者代码2、引入依赖先引入 rabbitmq 的依赖!-- Source: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.20.0/versionscopecompile/scope/dependency3. 生产者代码编写和发布订阅模式的区别是交换机类型不同绑定队列的 BindingKey 不同。那么先去 Constants.java 里面定义交换机和队列。// 路由模式publicstaticfinalStringDIRECT_EXCHANGEdirect.exchange;// 声明交换机publicstaticfinalStringDIRECT_QUEUE1direct.queue1;// 声明队列publicstaticfinalStringDIRECT_QUEUE2direct.queue2;// 声明队列3.1 创建交换机定义交换机类型为 BuiltinExchangeType.DIRECTchannel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true);3.2 声明队列代码如下所示channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);3.3 绑定交换机和队列代码如下所示channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,a);channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,a);channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,b);channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,c);我们就按照下面这个绑定关系来完成实验3.4 发送消息还是老样子发送消息和上面的图要对应起来。代码如下所示// 6. 发送消息StringmsgHello direct, my routing key is : a....;channel.basicPublish(Constants.DIRECT_EXCHANGE,a,null,msg.getBytes());Stringmsg_bHello direct, my routing key is : b....;channel.basicPublish(Constants.DIRECT_EXCHANGE,b,null,msg_b.getBytes());Stringmsg_cHello direct, my routing key is : c....;channel.basicPublish(Constants.DIRECT_EXCHANGE,c,null,msg_c.getBytes());System.out.println(消息发送成功);3.5 完整代码代码如下所示packagedirect;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactorynewConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnectionfactory.newConnection();// 2. 开启 channel 通道Channelchannelconnection.createChannel();// 3. 声明交换机使用内置的交换机即可channel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true);// 4. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);// 5. 绑定队列和交换机channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,a);channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,a);channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,b);channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,c);// 6. 发送消息StringmsgHello direct, my routing key is : a....;channel.basicPublish(Constants.DIRECT_EXCHANGE,a,null,msg.getBytes());Stringmsg_bHello direct, my routing key is : b....;channel.basicPublish(Constants.DIRECT_EXCHANGE,b,null,msg_b.getBytes());Stringmsg_cHello direct, my routing key is : c....;channel.basicPublish(Constants.DIRECT_EXCHANGE,c,null,msg_c.getBytes());System.out.println(消息发送成功);// 7. 资源释放channel.close();connection.close();}}4. 消费者代码编写Routing 模式的消费者代码 和 Publish / Subscribe代码 一样同样复制出来两份然后修改消费的队列名称就可以了。4.1 消费者一代码如下所示packagedirect;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactorynewConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnectionfactory.newConnection();// 2. 开启 channel 通道Channelchannelconnection.createChannel();// 3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);// 4. 接收消息并消费DefaultConsumerconsumernewDefaultConsumer(channel){// 从队列中收到消息后, 就会执行的方法OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{// 收到消息以后就进行打印System.out.println(接收到消息: newString(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);// 5. 不需要释放资源}}4.1 消费者二代码如下所示packagedirect;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer2{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactorynewConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnectionfactory.newConnection();// 2. 开启 channel 通道Channelchannelconnection.createChannel();// 3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);// 4. 接收消息并消费DefaultConsumerconsumernewDefaultConsumer(channel){// 从队列中收到消息后, 就会执行的方法OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{// 收到消息以后就进行打印System.out.println(接收到消息: newString(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);// 5. 不需要释放资源}}5. 运行程序先运行生产者代码可以看到 direct.queue1 队列中路由了一条消息。direct.queue2 队列中路由了三条消息对应图如下所示exchange 下队列和 Routing Key 的绑定关系如下所示然后运行 Consumer1 消费者代码运行 Consumer2 消费者代码同时可以看到队列中的消息已经被全部给消费完了

更多文章