package com.shi.rout;import java.io.IOException;import java.util.concurrent.TimeoutException;import org.junit.Test;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConsumerCancelledException;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.QueueingConsumer.Delivery;import com.rabbitmq.client.ShutdownSignalException;import com.shi.util.RabbitMqUtils;/** * 路由模式 direct * @author SHF * @version 创建时间:2018年7月4日 下午4:17:20 */public class RoutTest { //交换机名称 private final static String EXCHANGE_NAME = "exchange_direct"; //路由 key private final static String KEY_1 ="a"; private final static String KEY_2 ="b"; private final static String KEY_3 ="a"; //队列名称 private final static String QUEUE_1 ="queue_direct_1"; private final static String QUEUE_2 ="queue_direct_2"; /** * 生产者 - 路由模式 * KEY_1 ="a"; * @author SHF * @version 创建时间:2018年7月4日 下午4:20:39 * @throws TimeoutException * @throws IOException */ @Test public void send() throws IOException, TimeoutException { //1 获取链接及mq 通道 Connection connection = RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); //2 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //3 消息内容 String message = " 施爷 路由模式direct 向你发送了一条消息...."; channel.basicPublish(EXCHANGE_NAME, KEY_1, null, message.getBytes()); System.out.println(" [x] sent:"+message); //4关闭通道及连接 channel.close(); connection.close(); } /** * 消费者1 - 路由模式 * KEY_2 ="b"; * @author SHF * @version 创建时间:2018年7月4日 下午4:33:55 * @throws TimeoutException * @throws IOException * @throws InterruptedException * @throws ConsumerCancelledException * @throws ShutdownSignalException */ @Test public void reic1() throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { //1 获取连接 及 通道 Connection connection = RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); //2 声明队列 channel.queueDeclare(QUEUE_1, false, false, false, null); //3 绑定交换机,指定路由 channel.queueBind(QUEUE_1, EXCHANGE_NAME, KEY_2); //4 同一个服务器只会发送一条消息给消费者 channel.basicQos(1); //5 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //6 监听队列,手动返回完成 channel.basicConsume(QUEUE_1, false,consumer); //7 获取消息 while(true) { Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println( "[x] reiv1 :" + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } /** * 消费者2 - 路由模式 * KEY_3 ="a" * @author SHF * @version 创建时间:2018年7月4日 下午4:33:55 * @throws TimeoutException * @throws IOException * @throws InterruptedException * @throws ConsumerCancelledException * @throws ShutdownSignalException */ @Test public void reic2() throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { //1 获取连接 及 通道 Connection connection = RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); //2 声明队列 channel.queueDeclare(QUEUE_2, false, false, false, null); //3 绑定交换机,指定路由 channel.queueBind(QUEUE_2, EXCHANGE_NAME, KEY_3); //4 同一个服务器只会发送一条消息给消费者 channel.basicQos(1); //5 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //6 监听队列,手动返回完成 channel.basicConsume(QUEUE_2, false,consumer); //7 获取消息 while(true) { Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println( "[x] reiv2 :" + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }