本文共 7131 字,大约阅读时间需要 23 分钟。
RabbitMQ 是一款开源的消息中间件,基于 AMQP 协议,用于实现消息的高效传输与处理。其核心特点在于支持多种消息模式,适用于分布式系统中的消息队列需求。
AMQP(Advanced Message Queuing Protocol)是一个开放标准协议,旨在为消息中间件提供统一的消息传输接口。RabbitMQ 等工具基于 AMQP 实现,能够无缝连接不同消息系统,突破了传统消息队列工具的局限。
RabbitMQ 支持多种消息传输模式,包括:
RabbitMQ 的核心概念包括:
交换机根据不同的调度策略转发消息:
docker pull rabbitmq:3-management
docker run \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=123456 \ --name rabbitmq \ -v rabbitmq-plugin:/plugins \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
docker ps
访问地址:localhost:15672
账号:admin
密码:123456
用户购买商品后触发消息发送,将商品信息和积分信息发布至消息队列,消费者接收并处理消息,完成商品扣减和积分增加。
org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web org.project.lombok lombok mysql mysql-connector-java com.baomidou mybatis-plus-boot-starter
// Good 实体类@Data@AllArgsConstructor@NoArgsConstructor@TableName("good")public class Good implements Serializable { private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) private int id; private String name; private int num;}// Integral 实体类@Data@AllArgsConstructor@NoArgsConstructor@TableName("integral")public class Integral implements Serializable { private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) private int id; @TableField("user_name") private String userName; private int integral;}// Orders 实体类@Data@AllArgsConstructor@NoArgsConstructor@TableName("orders")public class Orders implements Serializable { private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) private int id; @TableField("user_name") private String userName; @TableField("good_name") private String goodName; @TableField("good_num") private int goodNum;}// Buy DTO 对象@Data@AllArgsConstructor@NoArgsConstructorpublic class BuyDto { private String userName; private String goodName; private int num;} @RestControllerpublic class BuyController { @Autowired private BuyService buyService; @PostMapping("/buy") public String buy(@RequestBody BuyDto buyDto) { Boolean flag = buyService.buy(buyDto); return flag ? "buy ok" : "no buy"; }} public interface BuyService { Boolean buy(BuyDto buyDto);}@Servicepublic class BuyServiceImpl implements BuyService { @Resource private OrdersMapper ordersMapper; @Resource private GoodMapper goodMapper; @Resource private RabbitTemplate rabbitTemplate; @Transactional public Boolean buy(BuyDto buyDto) { String goodName = buyDto.getGoodName(); LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper () .eq(Good::getName, goodName); Good good = goodMapper.selectOne(queryWrapper); if (good == null || good.getNum() <= buyDto.getNum()) { return false; } Orders orders = new Orders(); BeanUtils.copyProperties(buyDto, orders); orders.setGoodNum(buyDto.getNum()); int insert = ordersMapper.insert(orders); good.setNum(buyDto.getNum()); String goodString = JSON.toJSONString(good); rabbitTemplate.convertAndSend("buy", "good", goodString); Integral integral = new Integral(); integral.setUserName(buyDto.getUserName()); integral.setIntegral(200); String integralString = JSON.toJSONString(integral); rabbitTemplate.convertAndSend("buy", "integral", integralString); return insert > 0; }} @Component@Slf4jpublic class GoodListener { @Resource private GoodMapper goodMapper; @RabbitListener(queues = {"good"}) public void listen(String message, @Header(AmqpHeaders.DeliveryTag) long deliveryTag, Channel channel) throws IOException { log.info("商品消息接收成功: " + message); Good good = JSON.parseObject(message, Good.class); LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper () .eq(Good::getName, good.getName); Good orderGood = goodMapper.selectOne(queryWrapper); if (orderGood == null) { throw new RuntimeException("商品不存在"); } orderGood.setNum(orderGood.getNum() - good.getNum()); int update = goodMapper.update(orderGood, queryWrapper); if (update > 0) { channel.basicAck(deliveryTag, true); log.info("商品消息处理成功"); } else { channel.basicAck(deliveryTag, false); log.info("商品消息处理失败"); } }}@Component@Slf4jpublic class IntegralListener { @Resource private IntegralMapper integralMapper; @RabbitListener(queues = {"integral"}) public void listen(String message, @Header(AmqpHeaders.DeliveryTag) long deliveryTag, Channel channel) throws IOException { log.info("积分消息接收成功: " + message); Integral integral = JSON.parseObject(message, Integral.class); LambdaQueryWrapper query = new LambdaQueryWrapper () .eq(Integral::getUserName, integral.getUserName()); Integral queryIntegral = integralMapper.selectOne(query); int update = 0; if (queryIntegral == null) { update = integralMapper.insert(integral); } else { integral.setIntegral(queryIntegral.getIntegral() + integral.getIntegral()); update = integralMapper.update(integral, query); } if (update > 0) { channel.basicAck(deliveryTag, true); log.info("积分消息处理成功"); } else { channel.basicAck(deliveryTag, false); log.info("积分消息处理失败"); } }} 使用 Postman 发送 HTTP 请求:
/buy 端点,返回 "buy ok"。通过以上步骤,完成 RabbitMQ 在 Spring Boot 项目中的整合与应用。
转载地址:http://aiqfk.baihongyu.com/