码上敲享录 > RabbitMQ教程(java) > Rabbitmq主题(topic)路由模式

Rabbitmq主题(topic)路由模式

上一章章节目录下一章 2019-12-17已有1786人阅读 评论(0)

Rabbitmq主题(topic)路由模式

目录

idea创建java项目整合rabbitmq

Rabbitmq简单模式

Rabbitmq工作队列模式(任务队列模式)

Rabbitmq发布/订阅模式

Rabbitmq直接(direct)路由模式

Rabbitmq主题(topic)路由模式

Rabbitmq发布者确认模式

Rabbitmq远程过程调用(RPC)模式


从前面可知直接路由模式(交换机的类型是direct)把一个消息只群发和路由键绑定的队列上,但是不是很灵活,生产者稍微对路由键更换一下,发布的消息就可能到达不了消息队列中,主题路由模式(交换机的类型是topic)就是解决这个问题的,路由键中的*可以代替一个单词#可以替代零个或多个单词


开始演示:

第一步、先执行消费者ReceiveLogsTopic.java,消费者声明的临时队列绑定的路由键为*.*.routeKey.#

第二步、执行生产者EmitLogTopic.java,生产者把10条消息发布到已绑定one.two.routeKey.three.four和one2.two2.routeKey.three2.four2路由键的交换机topic_logs。


消费者打印的结果:

消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one.two.routeKey.three.four':' Hello World!'

消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one2.two2.routeKey.three2.four2':' Hello World!'

消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one.two.routeKey.three.four':' Hello World!'

消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one2.two2.routeKey.three2.four2':' Hello World!'

消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one.two.routeKey.three.four':' Hello World!'

消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one2.two2.routeKey.three2.four2':' Hello World!'

消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one.two.routeKey.three.four':' Hello World!'

消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one2.two2.routeKey.three2.four2':' Hello World!'

消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one.two.routeKey.three.four':' Hello World!'

消费者:amq.ctag-Bu6NuhiK_A0PgiGjlTbnqQ,路由键和消息:one2.two2.routeKey.three2.four2':' Hello World!'


演示结论:

消息通过路由键存储在相应的队列中。


生产者代码:

package com.demo;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.util.ArrayList;

import java.util.List;

public class EmitLogTopic {

   private static final String EXCHANGE_NAME = "topic_logs";

   public static void main(String[] argv) throws Exception {

       //创建连接

       ConnectionFactory factory = new ConnectionFactory();

       factory.setHost("127.0.0.1");

       factory.setPort(5672);

       factory.setUsername("guest");

       factory.setPassword("guest");

       List<String> routeKeys = new ArrayList<>();

       routeKeys.add("one.two.routeKey.three.four");

       for(int i=0;i<5;i++) {

           try (Connection connection = factory.newConnection();

                Channel channel = connection.createChannel()) {

               channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

               String message = " Hello World!";

               for(String routeKey: routeKeys){

                  channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));

               }

           }

       }

   }

}


消费者代码:

package com.demo;

import com.rabbitmq.client.*;

import java.util.ArrayList;

import java.util.List;

public class ReceiveLogsTopic {

   private static final String EXCHANGE_NAME = "topic_logs";

   public static void main(String[] argv) throws Exception {

      ConnectionFactory factory = new ConnectionFactory();

       factory.setHost("127.0.0.1");

       factory.setPort(5672);

       factory.setUsername("guest");

       factory.setPassword("guest");

       Connection connection = factory.newConnection();

       Channel channel = connection.createChannel();

       channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

       String queueName = channel.queueDeclare().getQueue();

       channel.queueBind(queueName, EXCHANGE_NAME, "*.*.routeKey.#");

       DeliverCallback deliverCallback = (consumerTag, delivery) -> {

           String message = new String(delivery.getBody(), "UTF-8");

           System.out.println("消费者:"+ consumerTag+",路由键和消息:"+ delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");

       };

       channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {

       });

   }

}


0

有建议,请留言!

  • *您的姓名:

  • *所在城市:

  • *您的联系电话:

    *您的QQ:

  • 咨询问题:

  • 提 交