当前位置: 首页 > news >正文

RabbitMq

RabbitMq

1.0 跳过路由器直接传到队列

package com.itheima.publisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testAmqp(){String queueName="simple.queue";String msg="hello";rabbitTemplate.convertAndSend(queueName,msg);}
}

在配置好我们的rabbitMq之后,直接可以向队列发送消息而不用经过路由器

那如何从消息队列拿消息呢

package com.itheima.consumer.listeners;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){log.info("get message {} ",msg);}
}

使用 @RabbitListener(queues = "simple.queue")

指定队列名称,然后启动监听器,当目前指定的队列中存在消息后,Spring就会自动将从消息队列取到的值放到

传入的参数中

1.1 Work模型

WorkQueue是指一个queue绑定多个消费者去消费

我们这里产生五十条消息:

package com.itheima.publisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testAmqp(){String queueName="simple.queue";String msg="hello";rabbitTemplate.convertAndSend(queueName,msg);}@Testpublic void testWorkQueue(){String queueName="WorkQueue";for (int i=1;i<=50;i++){try {Thread.sleep(200);} catch (InterruptedException e) {throw new RuntimeException(e);}String msg="hello :message " +i;rabbitTemplate.convertAndSend(queueName,msg);}}
}

创建两个消费者去消费

package com.itheima.consumer.listeners;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){log.info("get message {} ",msg);}@RabbitListener(queues = "WorkQueue")public void listenWorkQueue1(String msg){log.info("listener1 get message {} ",msg);}@RabbitListener(queues = "WorkQueue")public void listenWorkQueue2(String msg){log.info("listener2 get message {} ",msg);}
}

我们发现两个消费者平分了这50条消息,每一个分到25条

我们通过在消费过程中加sleep来控制消费者的消费速度,就会发现,无论消费者速度是快还是慢,消费的消息数

一开始就定好了,消费完自己的也不管消息队列中还有没有消息,就直接停止了,这样肯定是效率很低的

因为默认消息绑定到消费者是轮询的,一人分一个再下一轮,没有考虑到具体的处理能力

当前任务的产生原因如下:

我们处理任务的过程是,任务进入到队列中,由MQ将任务通知到消费者,MQ做这个任务的分配者,默认的任务

传递策略是有一批任务来了,比如五十个,MQ采取轮询的方式分配任务,不管你先前的任务执行完还是没执行

完,你一个我一个的分配完任务他就不管了,分配的任务到了对应消费者的缓冲区中等待执行,这种情况就会产生

任务堆积,如果我们在设置中启动了下面的选项

        prefetch: 1

此时这个分配策略就变了,会先给一人分一个消息,然后检测现在谁的消息处理完了就再给他分一个,而不是之前

一股脑的平分,这样就解决了上面的问题

这时候创建两个不同效率的消费者

package com.itheima.publisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testAmqp(){String queueName="simple.queue";String msg="hello";rabbitTemplate.convertAndSend(queueName,msg);}@Testpublic void testWorkQueue(){String queueName="WorkQueue";for (int i=1;i<=50;i++){try {Thread.sleep(200);} catch (InterruptedException e) {throw new RuntimeException(e);}String msg="hello :message " +i;rabbitTemplate.convertAndSend(queueName,msg);}}
}

测试下来发现谁空闲谁就处理,大大提高了效率

1.2 Fanout交换机

我们上面没有使用交换机,消息直接发送到了队列中,此时被任意的一个消费者消费完这个消息就结束了,但对于

我们的很多业务,比如支付业务,一个消息需要被多个消费者处理,比如支付成功的消息需要被通知服务,保存服

务等等服务处理,这里只使用队列就不够了,就应该使用交换机由交换机来分发消息,这里先学习Fanout交换机

该交换机会将接受的消息广播的发送给与其相连的所有队列,所有的队列都能收到消息

接下来我们来测试,首先我们创建一个fanout交换机,绑定两个队列

    @RabbitListener(queues = "fanout.q1")public void fanOutq1(String msg){log.info("listener fanOutq1 get message {} ",msg);}@RabbitListener(queues = "fanout.q2")public void fanOutq2(String msg){log.info("listener2 fanOutq2 get message {} ",msg);}
    @Testpublic void testSendFanout(){String exchangeName="hmal.fanout";for (int i=1;i<=4;i++){String msg="helloeveryone :message " +i;rabbitTemplate.convertAndSend(exchangeName,null,msg);}}

发送消息,我们的消息被广播传到了每个队列中

1.2 Direct交换机

Direct交换机基于一定的规则将消息路由到不同的队列中

这个规则是什么呢? 是基于路由key进行的匹配,我们在创建队列时绑定一个路由key 在发送者发送消息时带上路

由key,到交换机时就会自动根据这个路由Key进行发送,对于路由Key与消息相同的队列进行消息发送

我们创建一个direct交换机,在绑定队列的时候指明路由key 然后再发送的时候传递路由Key

    @RabbitListener(queues = "direct.q1")public void directq1(String msg){log.info("listener directq1 get message {} ",msg);}@RabbitListener(queues = "direct.q2")public void directq2(String msg){log.info("listener directq2 get message {} ",msg);}
    @Testpublic void testSendDirect(){String exchangeName="hmal.direct";String msg="blue :message ";rabbitTemplate.convertAndSend(exchangeName,"blue",msg);msg="black:message";rabbitTemplate.convertAndSend(exchangeName,"black",msg);}

此时就是交换机根据路由key进行匹配分发消息,一个队列可以绑定多个路由key ,基于Direct交换机也可也完全

实现Fanout的功能

如果没有找到对应的路由,此时消息会暂存在队列中

1.3 Topic交换机

Direct交换机是严格匹配路由key ,但是我们很多情况下需队列是需要去匹配一类消息,如果每一种我们都手动去

添加,未免显得过于麻烦,这时候就需要使用Topic交换机了

使用该交换机,可以做到队列使用统配符去匹配路由key,

我们现在先创建一个Topic 交换机,再创建两个队列,绑定到交换机上,路由Key分别为 china.# 和 japan.#

我们这里可以使用的统配符为 #* 其中#指匹配0个或者多个单词,*代表匹配一个单词

比如现在我们发送一条消息,路由Key 为china.hefei 此时就会被路由器转发到china.#所在的那个队列中

如果是china 也会被转发

    @Testpublic void testSendTopic(){String exchangeName="hmal.topic";String msg="now routerkey is china.hefei";rabbitTemplate.convertAndSend(exchangeName,"china.hefei",msg);msg="now routerkey is japan.nagoya";rabbitTemplate.convertAndSend(exchangeName,"japan.nagoya",msg);}
    @RabbitListener(queues = "topic.q1")public void topic1(String msg){log.info("topic1 get message {} ",msg);}@RabbitListener(queues = "topic.q2")public void topic2(String msg){log.info("topic2 get message {} ",msg);}

1.4 声明队列交换机

之前我们都是手动在mq中创建队列和交换机,但实际上我们并不可能一个个的手动创建交换机,还是需要在java

代码中创建的

创建交换机,队列,和连接需要在配置类中将其声明为Bean,具体的方法如下:

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfiguration {@Beanpublic FanoutExchange fanoutExchange(){// ExchangeBuilder.fanoutExchange("").build();return new FanoutExchange("hmall.fanout2");}@Beanpublic Queue fanoutQueue3(){// QueueBuilder.durable("ff").build();return new Queue("fanout.queue3");}@Beanpublic Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}@Beanpublic Queue fanoutQueue4(){return new Queue("fanout.queue4");}@Beanpublic Binding fanoutBinding4(){return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}
}

这里需要提一下,我们发现最后一个函数是直接用fanoutQueue4()来获得对象的,但fanoutQueue4()

用Bean修饰,涉及到动态代理,这里会绕过代理吗,答案是不会,原因就是@Configuration 使用的动态代理

方式和我们之前用到的在业务层调用的动态代理方式不同,在业务层中,是基于AOP 在代理对象中调用实例对象

中的对应方法,我们直接this调用肯定就绕过了,但这里使用的是继承机制重写方法,调用的是重写后的方法,并

没有原始对象的实例,所以绕不过

除了上面这种基于配置类的注册方式,我们还可以直接在监听者用注解完成注册

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.q1",durable = "true"),exchange = @Exchange(name="hmal.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))

此时如果不存在对应的交换机,或者队列或者绑定关系时,会自动的进行注册,如果已存在同名,但类型,key不

同的交换机或者队列时,会报错。

1.5 消息转换器

我们在发送消息的时候,本质发送的是字节流,发送的字符串在内部也经过消息转换器转成了字节流来发送,但是

对于对象来说,内部会使用消息转换器调用序列化器对其进行序列化,默认调用java自带的序列化器,这个序列化

器不好用是人尽皆知的,我们这里自己去重新定义序列化器

导入jackson-dataformat-xml后,在启动类或者配置类中添加

    @Beanpublic MessageConverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();}

就可以实现替换消息转化器,将对象序列化为json来传递

在监听器接受消息,发什么类型就接收什么类型,spring会自动转化为对应的格式

http://www.rkmt.cn/news/77835.html

相关文章:

  • Ubuntu 架构磁盘清理的手段
  • 第八次作业
  • Day57(27)-F:\vs_ai_work\vue-tlias-management
  • 完美的园艺配土
  • 数据库如何处理大量的交易流水记录
  • 4、HTML入门
  • 中山装修代卖公司哪家好?2025综合实力榜单
  • 2025年铝外壳加工厂家权威推荐榜:压铸铝外壳/LED铝外壳/充电宝铝外壳/电源铝外壳/精密铝外壳,匠心工艺与定制实力深度解析
  • 字节跳动企业管理有哪些先进的地方
  • 2025年geo优化软件推荐:AI驱动下的排名新利器
  • 2025年12月徐州刑事辩护/取保候审/刑事申诉律师,这五家不容错过
  • 消息积压怎么处理
  • 完整教程:基于单片机的交流功率测量仪设计与实现
  • 数据脱敏技术详解:类型、方法与最佳实践
  • 2025年Grok优化排名公司推荐:技术迭代下的精准选择指南
  • 精选!2025年声学成像仪厂家推荐:西安联丰讯声革新运维的领先企业
  • 2025年环境噪声检测设备厂家权威推荐榜单TOP5:技术驱动下的“机器听觉”新势力
  • 2025年下半年江苏徐州金属熔剂、金属添加剂、铝基中间合金厂家推荐榜单:专业解析与选购指南
  • 2025年下半年上海CE认证服务商综合实力排行榜与选择指南
  • 第十一篇:细粒度权限控制(RBAC)
  • 2025年权威盘点:十大优质机床钣金外壳生产商,评价高的机床钣金外壳口碑推荐榜睿意达发展迅速,实力雄厚
  • A*
  • AlexNet论文阅读笔记
  • 2025压铸油温机制造厂TOP5权威推荐:技术参数与性能评测
  • 2025年实验室反应釜厂家TOP5推荐,实验室反应釜认证厂家
  • 2025年知名电缆生产厂家推荐:十大知名品牌实力解析及选购指南名单(12月新版)
  • 2025年中国工业节能设备五大生产厂家推荐:河南丰华实力凸显
  • 2025年重庆电力总包资质代办和转让服务哪家强?五大权威机构
  • 2025年12月考公考编机构信赖之选:考公考编/培训机构/单位/辅导单位/线上单位/机构/学校/课程,北京国培众信领跑,这些机构用实力助你上岸
  • 2025家用座椅电梯品牌TOP5权威测评:附无障碍座椅电梯价