当前位置: 首页 > news >正文

集成RabbitMQ+MQ常用操作 - 实践

文章目录

1.环境搭建

1.Docker安装RabbitMQ
1.拉取镜像
docker pull rabbitmq:3.8-management
2.安装命令
docker run -e RABBITMQ_DEFAULT_USER=sun-e RABBITMQ_DEFAULT_PASS=mq-v mq-plugins:/plugins--name mq--hostname mq-p 15672:15672-p 5672:5672-d 699038cb2b96 # 注意这里是镜像id,需要替换
3.开启5672和15672端口
4.登录控制台

15672端口

2.整合Spring AMQP
1.sun-common模块下创建新模块

CleanShot 2024-08-02 at 13.55.01@2x

2.引入amqp依赖和fastjson

4.0.0com.sunxianshengsun-common1.0-SNAPSHOTsun-common-rabbitmq${children.version}org.springframework.bootspring-boot-starter-amqpcom.alibabafastjson
3.新建一个mq-demo的模块
1.在sun-frame下创建mq-demo

CleanShot 2024-08-02 at 14.10.42@2x

2.然后在mq-demo下创建生产者和消费者子模块

CleanShot 2024-08-02 at 14.16.31@2x

CleanShot 2024-08-02 at 14.16.48@2x

3.查看是否交给父模块管理了

CleanShot 2024-08-02 at 14.18.56@2x

4.在mq-demo模块引入sun-common-rabbitmq依赖
    com.sunxianshengsun-common-rabbitmq1.0-SNAPSHOT
5.publisher引入sun-common-test依赖
    com.sunxianshengsun-common-test1.0-SNAPSHOT
6.将sun-common-rabbitmq clean-install一下
7.给consumer和publisher都创建主类
1.ConsumerApplication.java
package com.sunxiansheng.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("com.sunxiansheng")
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}
}
2.PublisherApplication.java
package com.sunxiansheng.publisher;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class);}
}
4.测试MQ
1.application.yml mq的最基本配置
spring:# RabbitMQ 配置rabbitmq:# 服务器地址host: ip# 用户名username: sunxiansheng# 密码password: rabbitmq# 虚拟主机virtual-host: /# 端口port: 5672
2.consumer
1.TestConfig.java MQ配置
package com.sunxiansheng.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** Description: 最基本的MQ测试* @Author sun* @Create 2024/8/2 14:34* @Version 1.0*/
@Configuration
public class TestConfig {/*** 创建一个fanout类型的交换机* @return*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout.exchange.test");}/*** 创建一个队列* @return*/@Beanpublic Queue fanoutQueueTest() {return new Queue("fanout.queue.test");}/*** 交换机和队列绑定*/@Beanpublic Binding binding() {return BindingBuilder.bind(fanoutQueueTest()).to(fanoutExchange());}
}
2.TestConfigListener.java 监听队列
package com.sunxiansheng.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/*** Description: 最基本的MQ测试* @Author sun* @Create 2024/8/2 14:34* @Version 1.0*/
@Component
public class TestConfigListener {@RabbitListener(queues = "fanout.queue.test")public void receive(String message) {System.out.println("接收到的消息:" + message);}
}
3.publisher
1.TestConfig.java 测试(注意指定启动类)
package com.sunxiansheng.consumer.config;
import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/*** Description: 最基本的MQ测试* @Author sun* @Create 2024/8/2 14:34* @Version 1.0*/
@SpringBootTest(classes = PublisherApplication.class) // 指定启动类
public class TestConfig {@Resourceprivate AmqpTemplate amqpTemplate;@Testpublic void send() {// 交换机String exchange = "fanout.exchange.test";// 路由键String routingKey = "";// 消息String message = "hello fanout";// 发送消息amqpTemplate.convertAndSend(exchange, routingKey, message);}
}
2.结果

CleanShot 2024-08-02 at 14.46.49@2x

2.基本交换机

1.Fanout
1.FanoutConfig.java 交换机配置
package com.sunxiansheng.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** Description: Fanout交换机* @Author sun* @Create 2024/7/29 15:06* @Version 1.0*/
@Configuration
public class FanoutConfig {@Beanpublic FanoutExchange fanoutExchange1() {// 创建一个fanout类型的交换机return new FanoutExchange("fanout.exchange");}@Beanpublic Queue fanoutQueue1() {// 创建一个队列return new Queue("fanout.queue1");}@Beanpublic Queue fanoutQueue2() {// 创建一个队列return new Queue("fanout.queue2");}// 两个队列绑定到交换机上@Beanpublic Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange1) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);}@Beanpublic Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange1) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange1);}
}
2.FanoutConfigListener.java 监听者
package com.sunxiansheng.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/*** Description: Fanout交换机* @Author sun* @Create 2024/7/29 15:06* @Version 1.0*/
@Component
public class FanoutConfigListener {@RabbitListener(queues = "fanout.queue1")public void receive1(String message) {System.out.println("fanout.queue1接收到的消息:" + message);}@RabbitListener(queues = "fanout.queue2")public void receive2(String message) {System.out.println("fanout.queue2接收到的消息:" + message);}
}
3.FanoutConfig.java 生产者
package com.sunxiansheng.consumer.config;
import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/*** Description: Fanout交换机* @Author sun* @Create 2024/7/29 15:06* @Version 1.0*/
@SpringBootTest(classes = PublisherApplication.class) // 指定启动类
public class FanoutConfig {@Resourceprivate AmqpTemplate amqpTemplate;@Testpublic void send() {// 交换机String exchange = "fanout.exchange";// 路由键String routingKey = "";// 消息String message = "hello fanout";// 发送消息amqpTemplate.convertAndSend(exchange, routingKey, message);}
}
2.Direct
1.DirectConfig.java 交换机配置
package com.sunxiansheng.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** Description: Direct交换机* @Author sun* @Create 2024/7/29 15:06* @Version 1.0*/
@Configuration
public class DirectConfig {@Beanpublic DirectExchange directExchange() {// 创建一个direct类型的交换机return new DirectExchange("direct.exchange");}@Beanpublic Queue directQueue1() {// 创建一个队列return new Queue("direct.queue1");}@Beanpublic Queue directQueue2() {// 创建一个队列return new Queue("direct.queue2");}// 两个队列绑定到交换机上,这里需要指定routingKey@Beanpublic Binding bindingDirectQueue1(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("black");}@Beanpublic Binding bindingDirectQueue2(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("green");}
}
2.DirectConfigListener.java 监听者
package com.sunxiansheng.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/*** Description: Direct交换机* @Author sun* @Create 2024/7/29 15:06* @Version 1.0*/
@Component
public class DirectConfigListener {@RabbitListener(queues = "direct.queue1")public void receive1(String message) {System.out.println("direct.queue1接收到的消息:" + message);}@RabbitListener(queues = "direct.queue2")public void receive2(String message) {System.out.println("direct.queue2接收到的消息:" + message);}
}
3.DirectConfig.java 生产者
package com.sunxiansheng.consumer.config;
import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/*** Description: Direct交换机* @Author sun* @Create 2024/7/29 15:06* @Version 1.0*/
@SpringBootTest(classes = PublisherApplication.class) // 指定启动类
public class DirectConfig {@Resourceprivate AmqpTemplate amqpTemplate;@Testpublic void send() {// 交换机String exchange = "direct.exchange";// 路由键String routingKey = "black";// 消息String message = "hello direct";// 发送消息amqpTemplate.convertAndSend(exchange, routingKey, message);}
}
http://www.rkmt.cn/news/147292.html

相关文章:

  • 网安如何防御DDoS攻击?
  • 智谱清言AutoGLM黑科技揭秘:如何实现低代码大模型训练?
  • 基于微信小程序的快递代领系统的设计与实现开题报告
  • ssm基于Vue.js的在线购物系统的设计与实现
  • Ubuntu22.04 外接显示屏显示异常
  • 基于微信小程序的快递代领系统的设计与实现任务书
  • 基于微信小程序的快递代领系统的设计与实现外文
  • PaLM-E:具身智能的多模态语言模型新范式 - 详解
  • 【Open-AutoGLM插件深度解析】:揭秘浏览器AI自动化新纪元
  • 2025年评价高的五金精密铸造品牌厂家排行榜 - 品牌宣传支持者
  • 基于net高校一卡通管理系统的设计与实现
  • 学长亲荐9个AI论文工具,专科生搞定毕业论文+格式规范!
  • 2025年上海靠谱隔膜泵工厂排行榜,国产隔膜泵精品定制推荐 - 工业品牌热点
  • 基于ssm的人才招聘网站
  • Stimulsoft报表与仪表板产品重磅发布2026.1版本:进一步强化跨平台、数据可视化、合规及 AI 辅助设计等
  • 2025年知名的防火阀高评价厂家推荐榜 - 品牌宣传支持者
  • 基于微信小程序的快递代领系统的设计与实现开题报告(3)(1)
  • 家用呼吸机十大品牌解析:伟晴大健康领衔,精准守护呼吸健康 - 品牌评测分析
  • 大语言模型训练揭秘:通俗易懂地解析训练过程与数据准备!
  • 中望CAD机械版2025:修改直径标注的样式
  • Vehicle Data Reconstructor(VDR):车辆数据取证解决方案
  • 深度拷问:为什么一用AI,我的企业就“隐身”了? - 优质品牌推荐TOP榜
  • 2025年唐山GEO本地优化公司排行榜,新测评精选GEO精准优化服务商推荐 - myqiye
  • 2026年GEO优化服务商全景对比:平台工具 vs 代运营 vs 咨询培训,谁更适合你? - AIDSO爱搜
  • 江苏徐州电力设备源头厂家市场深度测评与推荐报告【2025-2026】 - 2025年品牌推荐榜
  • 2025年终盘点:冷冻干燥机品牌排行榜(最新权威测评) - 品牌推荐大师1
  • 计算机毕业设计springboot基于Java的智慧小区快递配送系统 SpringBoot+Java 的社区智能快递末端配送平台 基于 Java 技术的智慧住宅区快件集散管理系统
  • 汽车电动助力转向系统(EPS)功能介绍 - 实践
  • 2025年衬氟三合一过滤洗涤干燥机品牌排名:实验型过滤洗涤干燥机哪家强? - 工业推荐榜
  • 2025年上海真空烘箱采购指南:十大高口碑生产厂家全解析,非标干燥设备/高温电热鼓风干燥箱/泳池专用臭氧发生器真空烘箱企业口碑排行 - 品牌推荐师