springboot研究五:springboot整合rabbitmq
rabbitmq是当下非常流行的消息队列,本文主要介绍springboot中如何配置使用rabbitmq。
文中代码基于springboot2.1.6,源代码见文末地址。
1.为了自己玩方便,可以用docker安装rabbitmq,见专栏内文章
《docker安装rabbitmq》
2.相关配置
spring.rabbitmq.host=192.168.59.128spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=admin#这个如果不配置,就会默认找"/"spring.rabbitmq.virtual-host=my_vhost#指定心跳超时,单位秒,0为不指定;默认60sspring.rabbitmq.requested-heartbeat=20#是否启用【发布确认】spring.rabbitmq.publisher-confirms=true#是否启用【发布返回】spring.rabbitmq.publisher-returns=true#连接超时,单位毫秒,0表示无穷大,不超时spring.rabbitmq.connection-timeout=10
3.pom依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
4.rabbitmq有4种exchange
Exchange type | Default pre-declared names |
---|---|
Direct exchange | (Empty string) and amq.direct |
Fanout exchange | amq.fanout |
Topic exchange | amq.topic |
Headers exchange | amq.match (and amq.headers in RabbitMQ) |
a.direct exchange使用routing key进行消息传输,如下图,routing key其实就是queue和exchange的绑定。适用于多工作者协同工作的场景。
绑定代码如下:代码中queue名称和routing key名称都是"direct"
@Configurationpublic class DirectRabbitConfig { @Bean public Queue direct() { return new Queue("direct"); } @Bean public DirectExchange directExchange() { return new DirectExchange("directExchange"); } @Bean public Binding directBindingExchange(Queue direct, DirectExchange directExchange) { return BindingBuilder.bind(direct).to(directExchange).with("direct"); }}
sender如下:
@Servicepublic class DirectSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Resource private AmqpTemplate rabbitTemplate; public void sendString(String message) { logger.info("direct sender : " + message); rabbitTemplate.convertAndSend("directExchange", "direct", message); } public void sendObject(Object message) { String messageStr = JSONObject.toJSONString(message); logger.info(messageStr); rabbitTemplate.convertAndSend("directExchange", "direct", messageStr); }}
receiver:
@RabbitHandler@RabbitListener(queues = {"direct"})public void processDirect(Message message) { logger.info("Receiver direct: {}", new String(message.getBody()));}
b.fanout exchange就是广播模式,把消息路有给所有的绑定队列,可以适用于群聊天的场景。
配置代码如下:其中有3个队列绑定一个fanout exchange
@Configurationpublic class FanoutRabbitConfig { @Bean public Queue queueA(){ return new Queue("fanout.a"); } @Bean public Queue queueB(){ return new Queue("fanout.b"); } @Bean public Queue queueC(){ return new Queue("fanout.c"); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean public Binding bindingExchangeA(Queue queueA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueA).to(fanoutExchange); } @Bean public Binding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueB).to(fanoutExchange); } @Bean public Binding bindingExchangeC(Queue queueC, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueC).to(fanoutExchange); }}
sender:
@Servicepublic class FanoutSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private AmqpTemplate rabbitTemplate; public void send(String message) { logger.info("fanout sender : {}", message); rabbitTemplate.convertAndSend("fanoutExchange","", message); }}
receiver:
@RabbitHandler @RabbitListener(queues = {"fanout.a", "fanout.b", "fanout.c"}) public void processFanout1(Message message) { logger.info("Receiver fanout: {}", new String(message.getBody())); }
c.topic exchange通过routing key和通配符来路由消息,适用于发布订阅场景。
配置代码:
@Configurationpublic class TopicRabbitConfig { @Bean public Queue queueMessage() { return new Queue("topic.message"); } @Bean public Queue queueMessage2() { return new Queue("topic.message2"); } /** * 将队列绑定到Topic交换器 * @return */ @Bean public TopicExchange exchange() { return new TopicExchange("topicExchange"); } /** * 将队列绑定到Topic交换器 * @param queueMessage * @param exchange * @return */ @Bean public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } /** * 将队列绑定到Topic交换器 采用#的方式 * @param exchange * @param queueMessage2 * @return */ @Bean Binding bindingExchangeMessage2(TopicExchange exchange, Queue queueMessage2) { return BindingBuilder.bind(queueMessage2).to(exchange).with("topic.#"); }}
sender:
@Servicepublic class TopicSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Resource private AmqpTemplate rabbitTemplate; public void send1(String message) { logger.info("topic sender1 : " + message); rabbitTemplate.convertAndSend("topicExchange", "topic.message", message); } public void send2(String message) { logger.info("topic sender2 : " + message); rabbitTemplate.convertAndSend("topicExchange", "topic.message2", message); }}
接受
@RabbitHandler @RabbitListener(queues = {"topic.message"}) public void processTopic(Message message) { logger.info("Receiver topic: {}", new String(message.getBody())); } @RabbitHandler @RabbitListener(queues = {"topic.message2"}) public void processTopic2(Message message) { logger.info("Receiver topic2: {}", new String(message.getBody()));}
d.header exchange忽略routing key参数,用header来取代
配置
@Configurationpublic class HeadersRabbitConfig { @Bean public Queue headerQueue() { return new Queue("headerQueue"); } @Bean public Queue headerQueue2() { return new Queue("headerQueue2"); } @Bean public HeadersExchange headerExchange() { return new HeadersExchange("headerExchange"); } @Bean public HeadersExchange headerExchange2() { return new HeadersExchange("headerExchange2"); } @Bean public Binding bindingExchange(Queue headerQueue, HeadersExchange headerExchange) { Map<String,Object> headerValues = new HashMap<>(3); headerValues.put("param1", "value1"); headerValues.put("param2", "value2"); return BindingBuilder.bind(headerQueue).to(headerExchange).whereAll(headerValues).match(); } @Bean public Binding bindingExchange2(Queue headerQueue2, HeadersExchange headerExchange2) { Map<String,Object> header = new HashMap<>(3); header.put("param1", "value1"); header.put("param2", "value2"); return BindingBuilder.bind(headerQueue2).to(headerExchange2).whereAny(header).match(); }}
发送:
@Servicepublic class HeadersSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Resource private AmqpTemplate rabbitTemplate; public void headerSend(Map<String, Object> head, String msg){ logger.info("header send message: "+msg); rabbitTemplate.convertAndSend("headerExchange", "headerQueue", getMessage(head, msg)); } public void headerSend2(Map<String, Object> head, String msg){ logger.info("header1 send message: "+msg); rabbitTemplate.convertAndSend("headerExchange2", "headerQueue2", getMessage(head, msg)); } private Message getMessage(Map<String, Object> head, Object msg){ MessageProperties messageProperties = new MessageProperties(); for (Map.Entry<String, Object> entry : head.entrySet()) { messageProperties.setHeader(entry.getKey(), entry.getValue()); } MessageConverter messageConverter = new SimpleMessageConverter(); return messageConverter.toMessage(msg, messageProperties); }}
接收:
@RabbitHandler @RabbitListener(queues = {"headerQueue"}) public void processHeaders(Message message) { logger.info("Receiver header: {}", new String(message.getBody())); } @RabbitHandler @RabbitListener(queues = {"headerQueue2"}) public void processHeaders1(Message message) { logger.info("Receiver header2: {}", new String(message.getBody())); }
5.测试,测试代码写在RabbitMqController中,启动Application即可进行url测试。见源码。
说明:
a.topic exchange,浏览器输入http://localhost:8082/mq/topic后,topic.#的routing key收到了2条消息,topic.message的routing key收到了1条,可以看出通配符的作用
b.headers exchange:浏览器输入http://localhost:8082/mq/headers,发送了4条消息,但是第1条没有收到。因为headerExchange绑定时使用了whereAll,headerExchange2绑定时使用了whereAny。
©著作权归作者所有:来自51CTO博客作者朱晋君的原创作品,如需转载,请注明出处,否则将追究法律责任
更多相关文章
- 【故障处理】队列等待之TX - allocate ITL entry引起的死锁处理(
- 【故障处理】队列等待之enq: US - contention案例
- 【故障处理】队列等待之enq IV - contention案例
- 【故障处理】队列等待之enq: TX - row lock contention
- 【故障处理】队列等待之TX - allocate ITL entry案例
- vue2基础语法2与监听属性和计算属性
- Vue自学之路10-简单的计算器
- Oracle绑定变量分级(Bind Graduation)
- 手把手教Linux驱动9-等待队列waitq