rabbitmq是当下非常流行的消息队列,本文主要介绍springboot中如何配置使用rabbitmq。

文中代码基于springboot2.1.6,源代码见文末地址。

1.为了自己玩方便,可以用docker安装rabbitmq,见专栏内文章

《docker安装rabbitmq》

2.相关配置

spring.rabbitmq.host=192.168.59.128spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=admin#这个如果不配置,就会默认找"/"spring.rabbitmq.virtual-host=my_vhost#指定心跳超时,单位秒,0为不指定;默认60sspring.rabbitmq.requested-heartbeat=20#是否启用【发布确认】spring.rabbitmq.publisher-confirms=true#是否启用【发布返回】spring.rabbitmq.publisher-returns=true#连接超时,单位毫秒,0表示无穷大,不超时spring.rabbitmq.connection-timeout=10

3.pom依赖

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>

4.rabbitmq有4种exchange

Exchange typeDefault pre-declared names
Direct exchange(Empty string) and amq.direct
Fanout exchangeamq.fanout
Topic exchangeamq.topic
Headers exchangeamq.match (and amq.headers in RabbitMQ)

a.direct exchange使用routing key进行消息传输,如下图,routing key其实就是queue和exchange的绑定。适用于多工作者协同工作的场景。

绑定代码如下:代码中queue名称和routing key名称都是"direct"

@Configurationpublic class DirectRabbitConfig {        @Bean    public Queue direct() {        return new Queue("direct");    }    @Bean    public DirectExchange directExchange() {        return new DirectExchange("directExchange");    }    @Bean    public Binding directBindingExchange(Queue direct, DirectExchange directExchange) {        return BindingBuilder.bind(direct).to(directExchange).with("direct");    }}

sender如下:

@Servicepublic class DirectSenderService {    private Logger logger = LoggerFactory.getLogger(getClass());    @Resource    private AmqpTemplate rabbitTemplate;    public void sendString(String message) {        logger.info("direct sender : " + message);        rabbitTemplate.convertAndSend("directExchange", "direct", message);    }    public void sendObject(Object message) {        String messageStr = JSONObject.toJSONString(message);        logger.info(messageStr);        rabbitTemplate.convertAndSend("directExchange", "direct", messageStr);    }}

 

receiver:

@RabbitHandler@RabbitListener(queues = {"direct"})public void processDirect(Message message) {    logger.info("Receiver direct: {}", new String(message.getBody()));}

 

b.fanout exchange就是广播模式,把消息路有给所有的绑定队列,可以适用于群聊天的场景。

配置代码如下:其中有3个队列绑定一个fanout exchange

@Configurationpublic class FanoutRabbitConfig {    @Bean    public Queue queueA(){        return new Queue("fanout.a");    }    @Bean    public Queue queueB(){        return new Queue("fanout.b");    }    @Bean    public Queue queueC(){        return new Queue("fanout.c");    }    @Bean    public FanoutExchange fanoutExchange() {        return new FanoutExchange("fanoutExchange");    }    @Bean    public Binding bindingExchangeA(Queue queueA, FanoutExchange fanoutExchange) {        return BindingBuilder.bind(queueA).to(fanoutExchange);    }    @Bean    public Binding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) {        return BindingBuilder.bind(queueB).to(fanoutExchange);    }    @Bean    public Binding bindingExchangeC(Queue queueC, FanoutExchange fanoutExchange) {        return BindingBuilder.bind(queueC).to(fanoutExchange);    }}

 

sender:

@Servicepublic class FanoutSenderService {    private Logger logger = LoggerFactory.getLogger(getClass());    @Autowired    private AmqpTemplate rabbitTemplate;    public void send(String message) {        logger.info("fanout sender : {}", message);        rabbitTemplate.convertAndSend("fanoutExchange","", message);    }}

receiver:

    @RabbitHandler    @RabbitListener(queues = {"fanout.a", "fanout.b", "fanout.c"})    public void processFanout1(Message message) {        logger.info("Receiver fanout: {}", new String(message.getBody()));    }

c.topic exchange通过routing key和通配符来路由消息,适用于发布订阅场景。

配置代码:

@Configurationpublic class TopicRabbitConfig {    @Bean    public Queue queueMessage() {        return new Queue("topic.message");    }    @Bean    public Queue queueMessage2() {        return new Queue("topic.message2");    }    /**     * 将队列绑定到Topic交换器     * @return     */    @Bean    public TopicExchange exchange() {        return new TopicExchange("topicExchange");    }    /**     * 将队列绑定到Topic交换器     * @param queueMessage     * @param exchange     * @return     */    @Bean    public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");    }    /**     * 将队列绑定到Topic交换器 采用#的方式     * @param exchange     * @param queueMessage2     * @return     */    @Bean    Binding bindingExchangeMessage2(TopicExchange exchange, Queue queueMessage2) {        return BindingBuilder.bind(queueMessage2).to(exchange).with("topic.#");    }}

sender:

@Servicepublic class TopicSenderService {    private Logger logger = LoggerFactory.getLogger(getClass());    @Resource    private AmqpTemplate rabbitTemplate;    public void send1(String message) {        logger.info("topic sender1 : " + message);        rabbitTemplate.convertAndSend("topicExchange", "topic.message", message);    }    public void send2(String message) {        logger.info("topic sender2 : " + message);        rabbitTemplate.convertAndSend("topicExchange", "topic.message2", message);    }}

接受

@RabbitHandler    @RabbitListener(queues = {"topic.message"})    public void processTopic(Message message) {        logger.info("Receiver topic: {}", new String(message.getBody()));    }    @RabbitHandler    @RabbitListener(queues = {"topic.message2"})    public void processTopic2(Message message) {        logger.info("Receiver topic2: {}", new String(message.getBody()));}

 

d.header exchange忽略routing key参数,用header来取代

配置

@Configurationpublic class HeadersRabbitConfig {    @Bean    public Queue headerQueue() {        return new Queue("headerQueue");    }    @Bean    public Queue headerQueue2() {        return new Queue("headerQueue2");    }    @Bean    public HeadersExchange headerExchange() {          return new HeadersExchange("headerExchange");    }    @Bean    public HeadersExchange headerExchange2() {          return new HeadersExchange("headerExchange2");    }     @Bean    public Binding bindingExchange(Queue headerQueue, HeadersExchange headerExchange) {        Map<String,Object> headerValues = new HashMap<>(3);        headerValues.put("param1", "value1");        headerValues.put("param2", "value2");        return BindingBuilder.bind(headerQueue).to(headerExchange).whereAll(headerValues).match();    }    @Bean    public Binding bindingExchange2(Queue headerQueue2, HeadersExchange headerExchange2) {        Map<String,Object> header = new HashMap<>(3);        header.put("param1", "value1");        header.put("param2", "value2");        return BindingBuilder.bind(headerQueue2).to(headerExchange2).whereAny(header).match();    }}

发送:

@Servicepublic class HeadersSenderService {    private Logger logger = LoggerFactory.getLogger(getClass());    @Resource    private AmqpTemplate rabbitTemplate;    public void headerSend(Map<String, Object> head, String msg){        logger.info("header send message: "+msg);        rabbitTemplate.convertAndSend("headerExchange", "headerQueue", getMessage(head, msg));    }    public void headerSend2(Map<String, Object> head, String msg){        logger.info("header1 send message: "+msg);        rabbitTemplate.convertAndSend("headerExchange2", "headerQueue2", getMessage(head, msg));    }    private Message getMessage(Map<String, Object> head, Object msg){        MessageProperties messageProperties = new MessageProperties();        for (Map.Entry<String, Object> entry : head.entrySet()) {            messageProperties.setHeader(entry.getKey(), entry.getValue());        }        MessageConverter messageConverter = new SimpleMessageConverter();        return messageConverter.toMessage(msg, messageProperties);    }}

接收:

@RabbitHandler    @RabbitListener(queues = {"headerQueue"})    public void processHeaders(Message message) {        logger.info("Receiver header: {}", new String(message.getBody()));    }    @RabbitHandler    @RabbitListener(queues = {"headerQueue2"})    public void processHeaders1(Message message) {        logger.info("Receiver header2: {}", new String(message.getBody()));    }

5.测试,测试代码写在RabbitMqController中,启动Application即可进行url测试。见源码。

说明:

a.topic exchange,浏览器输入http://localhost:8082/mq/topic后,topic.#的routing key收到了2条消息,topic.message的routing key收到了1条,可以看出通配符的作用

b.headers exchange:浏览器输入http://localhost:8082/mq/headers,发送了4条消息,但是第1条没有收到。因为headerExchange绑定时使用了whereAll,headerExchange2绑定时使用了whereAny。

 

©著作权归作者所有:来自51CTO博客作者朱晋君的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 【故障处理】队列等待之TX - allocate ITL entry引起的死锁处理(
  2. 【故障处理】队列等待之enq: US - contention案例
  3. 【故障处理】队列等待之enq IV - contention案例
  4. 【故障处理】队列等待之enq: TX - row lock contention
  5. 【故障处理】队列等待之TX - allocate ITL entry案例
  6. vue2基础语法2与监听属性和计算属性
  7. Vue自学之路10-简单的计算器
  8. Oracle绑定变量分级(Bind Graduation)
  9. 手把手教Linux驱动9-等待队列waitq

随机推荐

  1. Android 5.X Activity过渡动画,以及漂亮的
  2. GridView中实现元素填充剩余空间(自适应)
  3. android基础--获取sdcard的总容量
  4. android studio中的文本替换
  5. 测试Android真机访问电脑主机web项目服务
  6. Android Studio每日小技巧
  7. Android Things:“1024工场”店铺开张啦!树
  8. 微信 Tinker 的一切都在这里,包括源码
  9. Android ListView监听上下滑动(判断是否
  10. Android本地数据存储之Sharedpreference