前言
先决条件 ✔️ 正确安装 RabbitMQ 并将其运行在 localhost:5672 上 ✔️ 已经了解了 RabbitMQ 中的一些基础概念
在上一个教程 中,我们改进了我们的日志系统。我们使用 direct exchange 替代了 fanout exchange,从只能盲目的广播消息改进为有可能选择性的接收日志。
尽管 direct exchange 能够改善我们的系统,但是它也有它的限制——没办法基于多个标准执行路由操作。
在我们的日志系统中,我们不只希望订阅基于严重程度的日志,同时还希望订阅基于发送来源的日志。Unix 工具 syslog 就是同时基于严重程度 -severity (info/warn/crit…) 和 设备 -facility (auth/cron/kern…) 来路由日志的。
如果这样的话,将会给予我们非常大的灵活性,我们既可以监听来源于”cron”的严重程度为”critical errors”的日志,也可以监听来源于”kern”的所有日志。
为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换器——topic exchange。
Topic exchange topic exchange 与 direct exchange 类似,也是将消息路由到 binding key 与 routing key 相匹配的 Queue 中,但这里的匹配规则有些不同,它约定:
routing key 为一个句点号.
分隔的字符串(我们将被句点号.
分隔开的每一段独立的字符串称为一个单词),如 “stock.usd.nyse”、”nyse.vmw”、”quick.orange.rabbit”
binding key 与 routing key 一样也是句点号.
分隔的字符串
binding key 中可以存在两种特殊字符*
与#
,用于做模糊匹配,其中*
用于匹配一个单词,#
用于匹配多个单词(可以是零个)
这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开。路由键里的第一个单词描述的是动物的速度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的:”速度.颜色.种类”。
我们创建了三个绑定:Q1 的 binding key 为”.orange. “,Q2 的 binding key 为”. .rabbit”和”lazy.#”。
这三个 binding key 被可以总结为:
Q1 对所有的桔黄色动物都感兴趣。
Q2 则是对所有的兔子和所有懒惰的动物感兴趣。
以上图中的配置为例: routingKey=”quick.orange.rabbit” 的消息会同时路由到 Q1 与 Q2 routingKey=”lazy.orange.fox” 的消息会路由到 Q1 与 Q2 routingKey=”lazy.brown.fox” 的消息会路由到 Q2 routingKey=”lazy.pink.rabbit” 的消息会路由到 Q2(只会投递给 Q2 一次,虽然这个 routingKey 与 Q2 的两个 bindingKey 都匹配) routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit” 的消息将会被丢弃,因为它们没有匹配任何 bindingKey
如果我们违反约定,发送了一个 routing key 为一个单词或者四个单词(”orange” or “quick.orange.male.rabbit”)的消息时,该消息不会投递给任何一个队列,而且会丢失掉。
但是,即使”lazy.orange.male.rabbit”有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。
topic exchange topic exchange 是强大的,它可以表现出跟其他 exchange 类似的行为。 当一个队列的 binding key 为 “#”(井号) 的时候,它会接收所有消息,而不考虑 routing key,就像 fanout exchange。 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时 topic exchange 会表现得像 direct exchange 一样。
代码整合 生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class Tut5Sender { @Autowired private AmqpTemplate template; @Autowired private TopicExchange topic; private int index; private int count; private final String[] keys = {"quick.orange.rabbit" , "lazy.orange.elephant" , "quick.orange.fox" , "lazy.brown.fox" , "lazy.pink.rabbit" , "quick.brown.fox" }; @Scheduled(fixedDelay = 1000, initialDelay = 500) public void send () { StringBuilder builder = new StringBuilder ("Hello to " ); if (++this .index == keys.length) { this .index = 0 ; } String key = keys[this .index]; builder.append(key).append(' ' ); builder.append(Integer.toString(++this .count)); String message = builder.toString(); template.convertAndSend(topic.getName(), key, message); System.out.println(" [x] Sent '" + message + "'" ); } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class Tut5Receiver { @RabbitListener(queues = "#{autoDeleteQueue1}") public void receiver1 (String in) throws InterruptedException { receive(in, 1 ); } @RabbitListener(queues = "#{autoDeleteQueue2}") public void receiver2 (String in) throws InterruptedException { receive(in, 2 ); } private void receive (String in, int instance) throws InterruptedException { StopWatch watch = new StopWatch (); watch.start(); System.out.println("instance " + instance + " [x] Received '" + in + "'" ); doWork(in); watch.stop(); System.out.println("instance " + instance + " [x] Done in " + watch.getTotalTimeSeconds() + "s" ); } private void doWork (String in) throws InterruptedException { for (char c : in.toCharArray()) { if (c == '.' ) { Thread.sleep(1000 ); } } } }
配置类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Profile({"tut5", "topics"}) @Configuration public class Tut5Config { @Bean public TopicExchange topic () { return new TopicExchange ("tut.topic" ); } @Profile("receiver") private static class ReceiverConfig { @Bean public Queue autoDeleteQueue1 () { return new AnonymousQueue (); } @Bean public Queue autoDeleteQueue2 () { return new AnonymousQueue (); } @Bean public Binding binding1a (TopicExchange topic, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(topic) .with("*.orange.*" ); } @Bean public Binding binding2a (TopicExchange topic, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(topic) .with("*.*.rabbit" ); } @Bean public Binding binding2b (TopicExchange topic, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(topic) .with("lazy.#" ); } @Bean public Tut5Receiver receiver () { return new Tut5Receiver (); } } @Profile("sender") @Bean public Tut5Sender sender () { return new Tut5Sender (); } }
运行 maven 编译
1 mvn clean package -Dmaven.test.skip=true
运行
1 2 java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut5,receiver --tutorial.client.duration=60000 java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut5,sender --tutorial.client.duration=60000
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 // Sender Ready ... running for 60000ms [x] Sent 'Hello to lazy.orange.elephant 1' [x] Sent 'Hello to quick.orange.fox 2' [x] Sent 'Hello to lazy.brown.fox 3' [x] Sent 'Hello to lazy.pink.rabbit 4' [x] Sent 'Hello to quick.brown.fox 5' [x] Sent 'Hello to quick.orange.rabbit 6' [x] Sent 'Hello to lazy.orange.elephant 7' [x] Sent 'Hello to quick.orange.fox 8' [x] Sent 'Hello to lazy.brown.fox 9' [x] Sent 'Hello to lazy.pink.rabbit 10' // Receiver Ready ... running for 60000ms instance 1 [x] Received 'Hello to lazy.orange.elephant 1' instance 2 [x] Received 'Hello to lazy.orange.elephant 1' instance 2 [x] Done in 2.004s instance 1 [x] Done in 2.004s instance 2 [x] Received 'Hello to lazy.brown.fox 3' instance 1 [x] Received 'Hello to quick.orange.fox 2' instance 1 [x] Done in 2.006s instance 2 [x] Done in 2.006s instance 2 [x] Received 'Hello to lazy.pink.rabbit 4' instance 1 [x] Received 'Hello to quick.orange.rabbit 6' instance 2 [x] Done in 2.006s instance 2 [x] Received 'Hello to quick.orange.rabbit 6' instance 1 [x] Done in 2.007s instance 1 [x] Received 'Hello to lazy.orange.elephant 7' instance 2 [x] Done in 2.006s instance 2 [x] Received 'Hello to lazy.orange.elephant 7' instance 1 [x] Done in 2.003s instance 1 [x] Received 'Hello to quick.orange.fox 8' instance 2 [x] Done in 2.005s instance 2 [x] Received 'Hello to lazy.brown.fox 9' instance 1 [x] Done in 2.005s instance 2 [x] Done in 2.004s instance 2 [x] Received 'Hello to lazy.pink.rabbit 10'
代码地址:https://github.com/zhaoyibo/rabbitmq-tutorial 相关文章:
RabbitMQ(零):基础概念
RabbitMQ(一):Hello World
RabbitMQ(二):工作队列(Work queues)
RabbitMQ(三):发布订阅(Publish/Subscribe)
RabbitMQ(四):路由(Routing)
RabbitMQ(五):主题(Topics)
RabbitMQ(六):远程过程调用(RPC)
参考 RabbitMQ Tutorial Five