RabbitMQ的安装、配置和实战
RabbitMQ安装
docker run -d --name xd_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:3.8.15-management
#网络安全组记得开放端口
4369 erlang 发现口
5672 client 端通信口
15672 管理界面 ui 端口
25672 server 间内部通信口
访问管理界面
ip:15672
依赖引入
<!--引入AMQP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.xml
spring:
##----------rabbit配置--------------
rabbitmq:
host: 192.168.75.146
port: 5672
virtual-host: dev
username: admin
password: password
listener:
simple:
#消息确认方式,manual(手动ack) 和auto(自动ack) 。消息队列重试到达次数进入异常交换机--为实现,该策略要为auto
acknowledge-mode: auto
retry:
#开启重试,消费者代码不能添加try catch捕获不往外抛异常
enabled: true
#最大重试次数
max-attempts: 4
# 重试消息的时间间隔,5秒
max-interval: 5000
RabbitMQ配置文件 (一个交换机,两个队列,routingKey匹配规则适用于两队列)
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交换机
*/
private String shortLinkEventExchange="short_link.event.exchange";
/**
* 创建交换机 Topic类型
* 一般一个微服务一个交换机
* @return
*/
@Bean
public Exchange shortLinkEventExchange(){
return new TopicExchange(shortLinkEventExchange,true,false);
//return new FanoutExchange(shortLinkEventExchange,true,false);
}
//新增短链相关配置====================================
/**
* 新增短链 队列
*/
private String shortLinkAddLinkQueue="short_link.add.link.queue";
/**
* 新增短链映射 队列
*/
private String shortLinkAddMappingQueue="short_link.add.mapping.queue";
/**
* 新增短链具体的routingKey,【发送消息使用】
*/
private String shortLinkAddRoutingKey="short_link.add.link.mapping.routing.key";
/**
* topic类型的binding key,用于绑定队列和交换机,是用于 link 消费者
*/
private String shortLinkAddLinkBindingKey="short_link.add.link.*.routing.key";
/**
* topic类型的binding key,用于绑定队列和交换机,是用于 mapping 消费者
*/
private String shortLinkAddMappingBindingKey="short_link.add.*.mapping.routing.key";
/**
* 新增短链api队列和交换机的绑定关系建立
*/
@Bean
public Binding shortLinkAddApiBinding(){
return new Binding(shortLinkAddLinkQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange,shortLinkAddLinkBindingKey,null);
}
/**
* 新增短链mapping队列和交换机的绑定关系建立
*/
@Bean
public Binding shortLinkAddMappingBinding(){
return new Binding(shortLinkAddMappingQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange,shortLinkAddMappingBindingKey,null);
}
/**
* 新增短链Link普通队列,用于被监听
*/
@Bean
public Queue shortLinkAddLinkQueue(){
return new Queue(shortLinkAddLinkQueue,true,false,false);
}
/**
* 新增短链mapping 普通队列,用于被监听
*/
@Bean
public Queue shortLinkAddMappingQueue(){
return new Queue(shortLinkAddMappingQueue,true,false,false);
}
对应的两个消费者
@Component
@Slf4j
//@RabbitListener(queues = "short_link.add.link.queue")
@RabbitListener(queuesToDeclare = { @Queue("short_link.add.link.queue") })
public class ShortLinkAddLinkMQListener {
@Autowired
private ShortLinkService shortLinkService;
@RabbitHandler
public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel) throws IOException {
log.info("ShortLinkAddLinkMQListener message:{}",message);
try{
eventMessage.setEventMessageType(EventMessageType.SHORT_LINK_ADD_LINK.name());
shortLinkService.handlerAddShortLink(eventMessage);
}catch (Exception e){
// 处理业务异常,等其他操作
log.error("消费失败:{}",eventMessage);
throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION);
}
log.info("消费成功:{}",eventMessage);
//确认消息消费成功
// channel.basicAck(tag,false);
}
}
@Component
@Slf4j
//@RabbitListener(queues = "short_link.add.mapping.queue")
@RabbitListener(queuesToDeclare = { @Queue("short_link.add.mapping.queue") })
public class ShortLinkAddMappingMQListener {
@Autowired
private ShortLinkService shortLinkService;
@RabbitHandler
public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel) throws IOException {
log.info("ShortLinkAddMappingMQListener message:{}",message);
try{
eventMessage.setEventMessageType(EventMessageType.SHORT_LINK_ADD_MAPPING.name());
shortLinkService.handlerAddShortLink(eventMessage);
}catch (Exception e){
// 处理业务异常,等其他操作
log.error("消费失败:{}",eventMessage);
throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION);
}
log.info("消费成功:{}",eventMessage);
//确认消息消费成功
// channel.basicAck(tag,false);
}
}
实战,这里直接在controller层中编码。(应该在Service实现)
@RestController
@RequestMapping("/api/link/v1")
public class ShortLinkController {
@Autowired
private ShortLinkService shortLinkService;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 新增短链
* @param shortLinkAddRequest
* @return
*/
@PostMapping("add")
public JsonData createShortLink(@RequestBody ShortLinkAddRequest shortLinkAddRequest){
//参数:交换机、匹配规则Key、信息对象
EventMessage eventMessage = EventMessage.builder().[设置对象各字段信息]
.build();
//生成MQ。
rabbitTemplate.convertAndSend(rabbitMQConfig.getShortLinkEventExchange(), rabbitMQConfig.getShortLinkAddRoutingKey(), eventMessage);
return jsonData;
}
}
热门相关:恭喜你被逮捕了 用身体缓解失恋的性中毒女 买妻种田:山野夫君,强势宠! 未来兽世:买来的媳妇,不生崽 本法官萌萌哒