博客
关于我
RabbitMQ入门
阅读量:797 次
发布时间:2023-03-22

本文共 7131 字,大约阅读时间需要 23 分钟。

RabbitMQ 原理与 Spring Boot 整合案例

一、RabbitMQ 原理

RabbitMQ 是一款开源的消息中间件,基于 AMQP 协议,用于实现消息的高效传输与处理。其核心特点在于支持多种消息模式,适用于分布式系统中的消息队列需求。

1. AMQP 协议

AMQP(Advanced Message Queuing Protocol)是一个开放标准协议,旨在为消息中间件提供统一的消息传输接口。RabbitMQ 等工具基于 AMQP 实现,能够无缝连接不同消息系统,突破了传统消息队列工具的局限。

2. 功能范围

RabbitMQ 支持多种消息传输模式,包括:

  • 存储转发:单个消息接收者,多个消息发送者。
  • 分布式事务:确保多个消息的原子性传输。
  • 发布订阅:消息发送者与多个订阅者建立关联。
  • 基于内容的路由:根据消息内容进行路由决策。
  • 文件传输队列:用于文件的分发与接收。
  • 点对点连接:建立直接的消息传输通道。

3. 概念

RabbitMQ 的核心概念包括:

  • 信道(Channel):虚拟化的通信通道,支持消息的双向传输。
  • 消息生产者(Producer):向消息队列发送消息的应用程序。
  • 消息消费者(Consumer):从消息队列中获取消息的应用程序。
  • 消息(Message):包含消息头和消息体的数据结构。
  • 路由键(Routing Key):决定消息的路由路径,影响交换机的转发规则。
  • 消息队列(Queue):存储临时消息,等待消费者处理。
  • 交换机(Exchange):负责消息的路由与转发,支持多种调度策略。
  • 绑定(Binding):连接交换机与消息队列的路由规则。
  • 代理(Broker):RabbitMQ 服务器实体,提供消息中介服务。

4. Exchange 消息调度策略

交换机根据不同的调度策略转发消息:

  • Direct(直接模式):精确匹配路由键与队列绑定键,消息直接转发至目标队列。
  • Fanout(扇出模式):将消息发送至所有绑定到交换机的队列,无需路由键匹配。
  • Topic(主题模式):基于正则表达式进行模糊匹配,支持更复杂的路由规则。
  • Headers(头模式):根据消息头属性进行路由,无需依赖路由键或绑定键。

二、Docker 部署 RabbitMQ

1. 拉取镜像

docker pull rabbitmq:3-management

2. 创建容器

docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name rabbitmq \
-v rabbitmq-plugin:/plugins \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

3. 查看容器状态

docker ps

访问地址:localhost:15672

账号:admin

密码:123456

三、Spring Boot 整合 RabbitMQ 案例

1. 案例介绍

用户购买商品后触发消息发送,将商品信息和积分信息发布至消息队列,消费者接收并处理消息,完成商品扣减和积分增加。

2. 数据表设计

  • 商品表:存储商品信息,包括商品ID、名称和库存数量。
  • 积分表:记录用户的积分信息,包括用户名称和积分数量。
  • 订单表:记录用户的购买订单,包括用户名称、商品名称和数量。

3. 项目搭建

1. 导入依赖

org.springframework.boot
spring-boot-starter
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-web
org.project.lombok
lombok
mysql
mysql-connector-java
com.baomidou
mybatis-plus-boot-starter

2. 实体类设计

// Good 实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("good")
public class Good implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private int id;
private String name;
private int num;
}
// Integral 实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("integral")
public class Integral implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private int id;
@TableField("user_name")
private String userName;
private int integral;
}
// Orders 实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("orders")
public class Orders implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private int id;
@TableField("user_name")
private String userName;
@TableField("good_name")
private String goodName;
@TableField("good_num")
private int goodNum;
}
// Buy DTO 对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BuyDto {
private String userName;
private String goodName;
private int num;
}

3. Controller

@RestController
public class BuyController {
@Autowired
private BuyService buyService;
@PostMapping("/buy")
public String buy(@RequestBody BuyDto buyDto) {
Boolean flag = buyService.buy(buyDto);
return flag ? "buy ok" : "no buy";
}
}

4. Service

public interface BuyService {
Boolean buy(BuyDto buyDto);
}
@Service
public class BuyServiceImpl implements BuyService {
@Resource
private OrdersMapper ordersMapper;
@Resource
private GoodMapper goodMapper;
@Resource
private RabbitTemplate rabbitTemplate;
@Transactional
public Boolean buy(BuyDto buyDto) {
String goodName = buyDto.getGoodName();
LambdaQueryWrapper
queryWrapper = new LambdaQueryWrapper
()
.eq(Good::getName, goodName);
Good good = goodMapper.selectOne(queryWrapper);
if (good == null || good.getNum() <= buyDto.getNum()) {
return false;
}
Orders orders = new Orders();
BeanUtils.copyProperties(buyDto, orders);
orders.setGoodNum(buyDto.getNum());
int insert = ordersMapper.insert(orders);
good.setNum(buyDto.getNum());
String goodString = JSON.toJSONString(good);
rabbitTemplate.convertAndSend("buy", "good", goodString);
Integral integral = new Integral();
integral.setUserName(buyDto.getUserName());
integral.setIntegral(200);
String integralString = JSON.toJSONString(integral);
rabbitTemplate.convertAndSend("buy", "integral", integralString);
return insert > 0;
}
}

5. 消费者

@Component
@Slf4j
public class GoodListener {
@Resource
private GoodMapper goodMapper;
@RabbitListener(queues = {"good"})
public void listen(String message, @Header(AmqpHeaders.DeliveryTag) long deliveryTag, Channel channel) throws IOException {
log.info("商品消息接收成功: " + message);
Good good = JSON.parseObject(message, Good.class);
LambdaQueryWrapper
queryWrapper = new LambdaQueryWrapper
()
.eq(Good::getName, good.getName);
Good orderGood = goodMapper.selectOne(queryWrapper);
if (orderGood == null) {
throw new RuntimeException("商品不存在");
}
orderGood.setNum(orderGood.getNum() - good.getNum());
int update = goodMapper.update(orderGood, queryWrapper);
if (update > 0) {
channel.basicAck(deliveryTag, true);
log.info("商品消息处理成功");
} else {
channel.basicAck(deliveryTag, false);
log.info("商品消息处理失败");
}
}
}
@Component
@Slf4j
public class IntegralListener {
@Resource
private IntegralMapper integralMapper;
@RabbitListener(queues = {"integral"})
public void listen(String message, @Header(AmqpHeaders.DeliveryTag) long deliveryTag, Channel channel) throws IOException {
log.info("积分消息接收成功: " + message);
Integral integral = JSON.parseObject(message, Integral.class);
LambdaQueryWrapper
query = new LambdaQueryWrapper
()
.eq(Integral::getUserName, integral.getUserName());
Integral queryIntegral = integralMapper.selectOne(query);
int update = 0;
if (queryIntegral == null) {
update = integralMapper.insert(integral);
} else {
integral.setIntegral(queryIntegral.getIntegral() + integral.getIntegral());
update = integralMapper.update(integral, query);
}
if (update > 0) {
channel.basicAck(deliveryTag, true);
log.info("积分消息处理成功");
} else {
channel.basicAck(deliveryTag, false);
log.info("积分消息处理失败");
}
}
}

4. 测试

使用 Postman 发送 HTTP 请求:

  • 购买商品:发送 POST 请求至 /buy 端点,返回 "buy ok"。
  • 商品扣减:消息中包含商品信息,消费者处理后扣减库存。
  • 积分增加:消息中包含积分信息,消费者处理后增加用户积分。
  • 通过以上步骤,完成 RabbitMQ 在 Spring Boot 项目中的整合与应用。

    转载地址:http://aiqfk.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现正数num使用递归找到它的二进制算法(附完整源码)
    查看>>
    Objective-C实现水波纹显示效果(附完整源码)
    查看>>
    Objective-C实现求 1 到 20 的所有数整除的最小正数算法 (附完整源码)
    查看>>
    Objective-C实现求1000以内的全部亲密数(附完整源码)
    查看>>
    Objective-C实现求a的逆元x(附完整源码)
    查看>>
    Objective-C实现求squareDifference平方差算法 (附完整源码)
    查看>>
    Objective-C实现求一个数的位数之和算法(附完整源码)
    查看>>
    Objective-C实现求一个数的因子算法(附完整源码)
    查看>>
    Objective-C实现求一组数字的平均值算法(附完整源码)
    查看>>
    Objective-C实现求两个数组的中位数算法(附完整源码)
    查看>>
    Objective-C实现求两点间距离(附完整源码)
    查看>>
    Objective-C实现求中位数(附完整源码)
    查看>>
    Objective-C实现求中位数(附完整源码)
    查看>>
    Objective-C实现求众数(附完整源码)
    查看>>
    Objective-C实现求圆锥的体积(附完整源码)
    查看>>
    Objective-C实现求曲线在某点的导数(附完整源码)
    查看>>
    Objective-C实现求最大公约数 (GCD)的算法(附完整源码)
    查看>>
    Objective-C实现求梯形面积公式(附完整源码)
    查看>>
    Objective-C实现求模逆算法(附完整源码)
    查看>>
    Objective-C实现求正弦(附完整源码)
    查看>>