前言
先决条件 ✔️ 正确安装 RabbitMQ 并将其运行在 localhost:5672 上 ✔️ 已经了解了 RabbitMQ 中的一些基础概念
在上一个教程 中,我们构建了一个简单的 fanout exchange,从而能够向许多消费者广播消息。
在本文中,我们将实现另一个功能——只订阅一部分消息。例如,我们只需要把严重的错误日志信息写入日志文件(存储到磁盘),但同时仍然把所有的日志信息输出到控制台中
绑定(Binding) 在之前的例子中,我们已经创建了绑定。可以在我们的Tut3Config
文件中回忆一下这样的代码:
1 2 3 4 @Bean public Binding binding1 (FanoutExchange fanout, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1).to(fanout); }
绑定(binding)是指交换器(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这个交换器(exchange)的消息感兴趣。 绑定可以使用一个额外的参数routingKey
。我们将交换器和队列传入到BindingBuilder
,并将routingKey
绑定到交换器,如下所示:
1 2 3 4 5 6 @Bean public Binding binding1a (DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("orange" ); }
routingKey
含义取决于交换类型。比如我们之前使用的 fanout exchange,会忽略它的值。
Direct exchange 我们的日志系统广播所有的消息给所有的消费者(consumers)。我们打算扩展它,使其基于日志的严重程度进行消息过滤。例如我们也许只是希望将比较严重的错误(error)日志写入磁盘,以免在警告(warning)或者信息(info)日志上浪费磁盘空间。
我们使用的 fanout exchange 没有足够的灵活性——它能做的仅仅是广播。
我们将会使用 direct exchange 来代替。路由的算法很简单——交换器将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。
下图能够很好的描述这个场景:
在这个场景中,我们可以看到 direct exchange X 和两个队列进行了绑定。第一个队列使用 orange 作为绑定键,第二个队列有两个绑定,一个使用 black 作为绑定键,另外一个使用 green。
这样以来,当路由键为 orange 的消息发布到交换器,就会被路由到队列 Q1。路由键为 black 或者 green 的消息就会路由到 Q2。其他的所有消息都将会被丢弃。
多个绑定(Multiple bindings) 多个队列使用相同的绑定键是可以的。这个例子中,我们可以添加一个 X 和 Q1 之间的绑定,使用 black 绑定键。这样一来,direct exchange 就和 fanout exchange 的行为一样,会将消息广播到所有匹配的队列。带有 black 路由键的消息会同时发送到 Q1 和 Q2。
发布消息 我们将使用以上这个模型作为我们的路由系统,将消息发送到 direct exchange 而不是 fanout exchange。我们将使用颜色作为路由键,这样消费者将能通过选择想要接收(或订阅)的颜色来消费对应的消息。
我们在Tut4Config
中做一些 Spring 启动配置,需要先建立一个交换器
1 2 3 4 @Bean public DirectExchange direct () { return new DirectExchange ("tut.direct" ); }
接收消息的方式与上一个教程中的一样,但也有一些不同——我们需要为每个感兴趣的颜色创建一个新的绑定。
1 2 3 4 5 @Bean public Binding binding1a (DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("orange" );
代码整合
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Tut4Sender { @Autowired private AmqpTemplate template; @Autowired private DirectExchange direct; private int index; private int count; private final String[] keys = {"orange" , "black" , "green" }; @Scheduled(fixedDelay = 1000, initialDelay = 500) public void send () { StringBuilder builder = new StringBuilder ("Hello to " ); if (++this .index == 3 ) { this .index = 0 ; } String key = keys[this .index]; builder.append(key).append(' ' ).append(Integer.toString(++this .count)); String message = builder.toString(); template.convertAndSend(direct.getName(), key, message); System.out.println(" [x] Sent '" + message + "'" ); } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class Tut4Receiver { @RabbitListener(queues = "#{autoDeleteQueue1.name}") public void receiver1 (String in) throws InterruptedException { receiver(in, 1 ); } @RabbitListener(queues = "#{autoDeleteQueue2.name}") public void receiver2 (String in) throws InterruptedException { receiver(in, 2 ); } private void receiver (String in, int instance) throws InterruptedException { StopWatch watch = new StopWatch (); watch.start(); System.out.println("instance " + instance + " [x] Received '" + in + "'" ); doWork(in); watch.stop(); System.out.println("instance " + instance + " [x] Done in " + watch.getTotalTimeSeconds() + "s" ); } private void doWork (String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.' ) { Thread.sleep(1000 ); } } } }
配置类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 @Profile({"tut4", "routing"}) @Configuration public class Tut4Config { @Bean public DirectExchange direct () { return new DirectExchange ("tut.direct" ); } @Profile("receiver") private static class ReceiverConfig { @Bean public Queue autoDeleteQueue1 () { return new AnonymousQueue (); } @Bean public Queue autoDeleteQueue2 () { return new AnonymousQueue (); } @Bean public Binding binding1a (DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("orange" ); } @Bean public Binding binding1b (DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("black" ); } @Bean public Binding binding2a (DirectExchange direct, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(direct) .with("green" ); } @Bean public Binding binding2b (DirectExchange direct, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(direct) .with("black" ); } @Bean public Tut4Receiver receiver () { return new Tut4Receiver (); } } @Profile("sender") @Bean public Tut4Sender sender () { return new Tut4Sender (); } }
运行 maven 编译
1 mvn clean package -Dmaven.test.skip=true
运行
1 2 java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut4,receiver --tutorial.client.duration=60000 java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut4,sender --tutorial.client.duration=60000
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 // Sender Ready ... running for 60000ms [x] Sent 'Hello to black 1' [x] Sent 'Hello to green 2' [x] Sent 'Hello to orange 3' [x] Sent 'Hello to black 4' // Receiver Ready ... running for 60000ms instance 2 [x] Received 'Hello to black 1' instance 1 [x] Received 'Hello to black 1' instance 2 [x] Done in 0.0s instance 1 [x] Done in 0.0s instance 2 [x] Received 'Hello to green 2' instance 2 [x] Done in 0.0s instance 1 [x] Received 'Hello to orange 3' instance 1 [x] Done in 0.0s instance 1 [x] Received 'Hello to black 4' instance 1 [x] Done in 0.0s instance 2 [x] Received 'Hello to black 4' instance 2 [x] Done in 0.0s
代码地址:https://github.com/zhaoyibo/rabbitmq-tutorial 相关文章:
RabbitMQ(零):基础概念
RabbitMQ(一):Hello World
RabbitMQ(二):工作队列(Work queues)
RabbitMQ(三):发布订阅(Publish/Subscribe)
RabbitMQ(四):路由(Routing)
RabbitMQ(五):主题(Topics)
RabbitMQ(六):远程过程调用(RPC)
参考 RabbitMQ Tutorial Four