秒杀接口下单流程
- 某用户进入某商品界面
- 请求链接根据用户id,商品信息生成MD5值(防止抢购脚本提前拥有抢购接口)
- 请求链接根据用户id,商品信息同时生成一个token,存入redis中,key为token,value为用户id和商品信息
- 某用户点击下单发起请求,需要额外携带MD5字段
- 验证MD5值+其他下单必要条件,验证通过后,从redis中取出token,验证token中的用户id和商品信息是否和请求参数中的一致
- 验证码通过后限流,防止恶意请求压垮集群
- 通过令牌桶的限流后,进入下单流程,MySQL悲观锁或者乐观锁锁库存,forupdata事务锁库存,更新库存版本号
- 如果失败,返回失败信息,如果成功,进入二选一方案:
- 方案1.用户信息入表,扣减库存,释放MySQL锁,返回成功信息
方案2.用户信息发送给消息队列,消息队列异步写入订单表,扣减库存,释放MySQL锁,异步获取选购成功信息
数据库
首先建立一张秒杀商品表,包含以下字段:
Drop table if exists `stock`;
CREATE TABLE `stock` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL DEFAULT '' COMMENT '名称',
`count` int(11) NOT NULL DEFAULT '库存',
`sale` int(11) NOT NULL COMMENT '已售',
`version` int(11) NOT NULL COMMENT '乐观锁,版本号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
然后建立一张秒杀订单表,包含以下字段:
Drop table if exists `stock_order`;
CREATE TABLE `stock_order` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`sid` int(11) NOT NULL COMMENT '库存ID',
`name` varchar(30) NOT NULL DEFAULT '' COMMENT '商品名称',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
架构
Spring MVC + Mybatis + MySQL + Redis + RabbitMQ + Nginx
通过HTTP接口发起一次购买请求
- Controller层提供一个HTTP接口,参数为商品ID
@RequestMapping(value = "/createWrongOrder/{sid}", method = RequestMethod.GET)
@ResponseBody
public String createWrongOrder(@PathVariable int sid) {
LOGGER.info("购买物品编号sid=[{}]", sid);
int id = 0;
try {
id = orderService.createWrongOrder(sid);
LOGGER.info("创建订单id: [{}]", id);
} catch (Exception e) {
LOGGER.error("Exception", e);
}
return String.valueOf(id);
}
此时service层
@override
public int createWrongOrder(int sid) throws Exception{
// 校验库存
Stock stock = checkStock(sid);
// 扣库存
saleStock(stock);
// 创建订单
int id = createOrder(stock);
return id;
}
private Stock checkStock(int sid){
Stock stock = stockService.getStockById(sid);
if (stock.getSale().equals(stock.getCount())) {
throw new RuntimeException("库存不足");
}
return stock;
}
private int saleStock(Stock stock){
stock.setSale(stock.getSale() + 1);
return stockService.updateStockById(stock);
}
private int createOrder(Stock stock){
StockOrder order = new StockOrder();
order.setSid(stock.getId());
order.setName(stock.getName());
int id = orderService.insertSalective(order);
return id;
}
但是此时会出现超卖的情况,因为多个线程同时进入了checkStock方法,都判断库存充足,然后都执行了saleStock方法,导致超卖。因为mysql的事务隔离级别是可重复读,操作不是原子性的,所以会出现这种情况。
避免超卖:更新商品库存的版本号
最简单的方式就是在给service层共享的时候添加一个事务,这样每个线程更新请求的时候都会先去锁表,然后再去更新,这样就可以避免超卖了。但是这样做的话,会导致性能下降,因为每个线程都要去锁表,这样就会导致并发量下降。
使用乐观锁的方式,就是在数据库中添加一个version字段,每次更新的时候,version+1,这样就可以避免超卖了。
此时Controller层
/*
* 乐观锁更新库存
* @param sid
* @return
*/
@RequestMapping(value = "/createOptimisticOrder/{sid}", method = RequestMethod.GET)
@ResponseBody
public String createOptimisticOrder(@PathVariable int sid) {
int id;
try {
id = orderService.createOptimisticOrder(sid);
LOGGER.info("创建订单id: [{}]", id);
} catch (Exception e) {
LOGGER.error("购买失败:[{}]", e.getMessage());
return "购买失败,库存不足";
}
return String.valueOf(id);
}
此时service层
@override
public int createOptimisticOrder(int sid) throws Exception{
// 校验库存
Stock stock = checkStock(sid);
// 扣库存
saleStockOptimistic(stock);
// 创建订单
int id = createOrder(stock);
return stock.getCount() - (stock.getSale()+1);
}
private void saleStockOptimistic(Stock stock){
LOGGER.info("查询数据库,尝试更新库存");
int count = stockService.updateStockByOptimistic(stock);
if (count == 0) {
throw new RuntimeException("并发更新库存失败");
}
}
此时Mapper层
<update id="updateByOptimistic" parameterType="cn.monitor4all.miaoshadao.dao.Stock">
update stock
<set>
sale = sale + 1,
version = version + 1,
</set>
WHERE id = #{id,jdbcType=INTEGER}
AND version = #{version,jdbcType=INTEGER}
</update>
但是此时的系统在面对高并发的购买请求时,如果不对接口进行限流,可能会对后台系统造成很大的压力,过多的请求打到数据库会对系统的稳定性造成影响,所以需要对接口进行限流。
限流
令牌桶算法
令牌桶算法是一种比较常用的限流算法,它的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
Guava RateLimiter
Guava RateLimiter是Guava工具包中提供的限流工具,它的原理是基于令牌桶算法实现的,可以非常方便的实现限流功能。
使用乐观锁抢购接口上增加该令牌桶限流功能,代码如下:
@Controler
public class OrderController{
private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
@Autowired
private OrderService orderService;
@Autowired
private StockService stockService;
// 每秒放行10个请求,初始化令牌桶类
RateLimiter rateLimiter = RateLimiter.create(10);
@RequestMapping(value = "/createOptimisticOrder/{sid}", method = RequestMethod.GET)
@ResponseBody
public String createWrongOrder(@PathVariable int sid){
int id;
try{
id = orderService.createOptimisticOrder(sid);
LOGGER.info("创建订单id: [{}]", id);
}catch(Exception e){
LOGGER.error("Exception", e);
}
return String.valueOf(id);
}
/*
* 乐观锁更新库存+令牌桶限流
* @param sid
* @return
*/
@RequestMapping(value = "/createOptimisticOrder/{sid}", method = RequestMethod.GET)
@ResponseBody
public String createOptimisticOrder(@PathVariable int sid){
// 阻塞式获取令牌
//LOGGER.info("等待时间" + rateLimiter.acquire());
// 非阻塞式获取令牌
if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) {
LOGGER.warn("你被限流了,真不幸,直接返回失败");
return "你被限流了,真不幸,直接返回失败";
}
int id;
try{
id = orderService.createOptimisticOrder(sid);
LOGGER.info("购买成功,剩余库存为: [{}]", id);
}catch(Exception e){
LOGGER.error("购买失败:[{}]", e.getMessage());
return "购买失败,库存不足";
}
return String.format("购买成功,剩余库存为:%d", id);
}
}
使用悲观锁抢购接口上增加该令牌桶限流功能,代码如下:
在Controller层增加一个悲观锁卖出商品的接口
```java
/*
* 事务For Update更新库存
* @param sid
* @return
*/
@RequestMapping(value = "/createPessimisticOrder/{sid}", method = RequestMethod.GET)
@ResponseBody
public String createPessimisticOrder(@PathVariable int sid) {
int id;
try {
id = orderService.createPessimisticOrder(sid);
LOGGER.info("购买成功,剩余库存为: [{}]", id);
} catch (Exception e) {
LOGGER.error("购买失败:[{}]", e.getMessage());
return "购买失败,库存不足";
}
return String.format("购买成功,剩余库存为:%d", id);
}
此时service层加上给卖商品流程事务
@Transactional(rollbackFor=Exception.class, propagation=Propagation.REQUIRED)
@override
public int createPessimisticOrder(int sid){
// 校验库存
Stock stock = checkStockForUpdate(sid);
// 扣库存
saleStock(stock);
// 创建订单
int id = createOrder(stock);
return stock.getCount() - (stock.getSale());
}
/*
* 检查库存 For Update
* @param sid
* @return
*/
private Stock checkStockForUpdate(int sid){
Stock stock = stockService.getStockByIdForUpdate(sid);
if (stock.getSale().equals(stock.getCount())) {
throw new RuntimeException("库存不足");
}
return stock;
}
/*
* 扣库存
* @param stock
* @return
*/
private void saleStock(Stock stock){
stock.setSale(stock.getSale() + 1);
stockService.updateStockById(stock);
}
/*
* 创建订单
* @param stock
* @return
*/
private int createOrder(Stock stock){
StockOrder order = new StockOrder();
order.setSid(stock.getId());
order.setName(stock.getName());
int id = orderService.insertSalective(order);
return id;
}
抢购接口隐匿
为防止抢购接口被恶意用户刷爆,需要对抢购接口进行隐藏,具体做法:
- 每次点击秒杀按钮,先从服务器获取一个秒杀验证值(接口内判断是否到秒杀时间)
- Redis以缓存用户ID和商品ID为key,秒杀验证值为value,设置过期时间为60s
- 用户请求秒杀商品的时候,要带上秒杀验证值,后台验证秒杀验证值是否正确,如果正确,才能继续执行下单流程
此时的sql文件
DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(50) NOT NULL DEFAULT '' COMMENT '名称',
`count` int(11) NOT NULL COMMENT '库存',
`sale` int(11) NOT NULL COMMENT '已售',
`version` int(11) NOT NULL COMMENT '乐观锁,版本号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
INSERT INTO `stock` VALUES ('1', 'iphone', '50', '0', '0');
INSERT INTO `stock` VALUES ('2', 'mac', '10', '0', '0');
DROP TABLE IF EXISTS `stock_order`;
CREATE TABLE `stock_order` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`sid` int(11) NOT NULL COMMENT '库存ID',
`name` varchar(30) NOT NULL DEFAULT '' COMMENT '商品名称',
`user_id` int(11) NOT NULL DEFAULT '0',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`user_name` varchar(255) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES ('1', '张三');
获取秒杀验证值接口:该接口要求传用户id和商品id,返回验证值,并且该验证值
Controller中添加方法:
/**
* 获取秒杀验证值接口
* @param sid
* @param userId
* @return
*/
@RequestMapping(value = "/getVerifyHash", method = RequestMethod.GET)
@ResponseBody
public String getVerifyHash(@RequestParam(name = "sid") Integer sid,
@RequestParam(name = "userId") Integer userId) {
String hash;
try{
hash = orderService.getVerifyHash(sid, userId);
}catch(Exception e){
LOGGER.error("获取hash失败:原因:[{}]", e.getMessage());
return "获取验证hash失败";
}
return String.format("请求抢购验证hash值为:%s", hash);
}
UserService中添加方法:
/**
* 获取hash值
* @param sid
* @param userId
* @return
*/
@Override
public String getVerifyHash(Integer sid, Integer userId) throws Exception {
// 验证是否在抢购时间内
LOGGER.info("验证是否在抢购时间内");
if (!redisService.verifyHashInRedis(sid)) {
LOGGER.info("不在抢购时间内或者已经抢购结束");
throw new RuntimeException("不在抢购时间内或者已经抢购结束");
}
// 验证用户合法性
LOGGER.info("验证用户合法性");
User user = userMapper.selectByPrimaryKey(userId.longValue());
if (user == null) {
LOGGER.info("用户不存在");
throw new RuntimeException("用户不存在");
}
LOGGER.info("用户信息:[{}]", user.toString());
// 检测商品合法性
LOGGER.info("检测商品合法性");
Stock stock = stockService.getStockById(sid);
if (stock == null) {
LOGGER.info("商品不存在");
throw new RuntimeException("商品不存在");
}
LOGGER.info("商品信息:[{}]", stock.toString());
// 生成hash
LOGGER.info("生成hash");
String verify = SALT + sid + userId;
String verifyHash = DigestUtils.md5DigestAsHex(verify.getBytes());
// 将hash和用户商品信息存入redis
LOGGER.info("存入redis缓存");
String hashKey = CacheKey.HASH_KEY.getKey() + "_" + sid + "_" + userId;
stringRedisTemplate.opsForValue().set(hashKey, verifyHash, 3600, TimeUnit.SECONDS);
LOGGER.info("Redis写入:[{}] [{}]", hashKey, verifyHash);
return verifyHash;
}
一个CacheKey枚举类:
package cn.monitor4all.miaoshadao.utils;
public enum CacheKey {
HASH_KEY("miaosha_hash_");
LIMIT_KEY("miaosha_limit");
private String key;
private CacheKey(String key) {
this.key = key;
}
public String getKey() {
return key;
}
}
携带验证值下单接口:该接口要求传用户id和商品id,验证值,返回订单id
Controller中添加方法:
/**
* 携带验证值下单接口
* @param sid
* @param userId
* @param verifyHash
* @return
*/
@RequestMapping(value = "/createOrderWithVerifiedUrl", method = RequestMethod.GET)
@ResponseBody
public String createOrderWithVerifiedUrl(@RequestParam(name = "sid") Integer sid,
@RequestParam(name = "userId") Integer userId,
@RequestParam(name = "verifyHash") String verifyHash) {
int stockLeft;
try{
stockLeft = orderService.createVerifiedOrder(sid, userId, verifyHash);
LOGGER.info("购买成功,剩余库存为: [{}]", stockLeft);
}catch(Exception e){
LOGGER.error("购买失败:[{}]", e.getMessage());
return e.getMessage();
}
return String.format("购买成功,剩余库存为:%d", stockLeft);
}
OrderService中添加方法:
/**
* 携带验证值下单接口
* @param sid
* @param userId
* @param verifyHash
* @return
*/
@Override
public int createVerifiedOrder(Integer sid, Integer userId, String verifyHash) throws Exception {
// 验证是否在抢购时间内
LOGGER.info("验证是否在抢购时间内");
if (!redisService.verifyHashInRedis(sid)) {
LOGGER.info("不在抢购时间内或者已经抢购结束");
throw new RuntimeException("不在抢购时间内或者已经抢购结束");
}
// 验证hash合法性
LOGGER.info("验证hash合法性");
String hashKey = CacheKey.HASH_KEY.getKey() + "_" + sid + "_" + userId;
String verifyHashInRedis = stringRedisTemplate.opsForValue().get(hashKey);
if (!verifyHash.equals(verifyHashInRedis)) {
LOGGER.info("hash值与Redis中不符合");
throw new RuntimeException("hash值与Redis中不符合");
}
// 验证用户合法性
LOGGER.info("验证用户合法性");
User user = userMapper.selectByPrimaryKey(userId.longValue());
if (user == null) {
LOGGER.info("用户不存在");
throw new RuntimeException("用户不存在");
}
LOGGER.info("用户信息验证成功:[{}]", user.toString());
// 检测商品合法性
LOGGER.info("检测商品合法性");
Stock stock = stockService.getStockById(sid);
if (stock == null) {
LOGGER.info("商品不存在");
throw new RuntimeException("商品不存在");
}
LOGGER.info("商品信息验证成功:[{}]", stock.toString());
// 乐观锁更新库存
LOGGER.info("乐观锁更新库存");
saleStockOptimistic(stock);
LOGGER.info("乐观锁更新库存成功");
// 创建订单
LOGGER.info("创建订单");
createOrderWithUserInfo(stock, userId);
LOGGER.info("创建订单成功");
return stock.getCount() - (stock.getSale()+1);
}
防止sb用脚本刷接口,再给接口加上一个限制访问频率的功能:
使用Redis/Memcached来解决,在controller层加上方法:
/**
* 要求验证的抢购接口 + 单用户限制访问频率
* @param sid
* @return
*/
@RequestMapping(value = "/createOrderWithVerifiedUrlAndLimit", method = RequestMethod.GET)
@ResponseBody
public String createOrderWithVerifiedUrlAndLimit(@RequestParam(name = "sid") Integer sid,
@RequestParam(name = "userId") Integer userId,
@RequestParam(name = "verifyHash") String verifyHash) {
int stockLeft;
try{
int count = userService.addUserCount(userId);
LOGGER.info("用户截至该次的访问次数为: [{}]", count);
boolean isBanned = userService.getUserIsBanned(userId);
if (isBanned) {
LOGGER.info("购买失败,超过频率限制");
return "购买失败,超过频率限制";
}
stockLeft = orderService.createVerifiedOrderWithLimit(sid, userId, verifyHash);
LOGGER.info("购买成功,剩余库存为: [{}]", stockLeft);
}catch(Exception e){
LOGGER.error("购买失败:[{}]", e.getMessage());
return e.getMessage();
}
return String.format("购买成功,剩余库存为:%d", stockLeft);
}
UserService中添加两个方法:
- addUserCount:每当访问订单接口,则增加一次访问次数,写入redis
- getUserIsBanned: 从Redis读出该用户的访问次数,超过10次则不让购买
@Override
public int addUserCount(Integer userId) throws Exception {
String limitKey = CacheKey.LIMIT_KEY.getKey() + "_" + userId;
String limitNum = stringRedisTemplate.opsForValue().get(limitKey);
int limit = -1;
if (limitNum == null) {
stringRedisTemplate.opsForValue().set(limitKey, "0", 3600, TimeUnit.SECONDS);
} else {
limit = Integer.parseInt(limitNum) + 1;
stringRedisTemplate.opsForValue().set(limitKey, String.valueOf(limit), 3600, TimeUnit.SECONDS);
}
return limit;
}
@Override
public boolean getUserIsBanned(Integer userId) {
String limitKey = CacheKey.LIMIT_KEY.getKey() + "_" + userId;
String limitNum = stringRedisTemplate.opsForValue().get(limitKey);
if (limitNum != null) {
LOGGER.error("该用户没有访问申请验证值记录,疑似异常,限制访问");
return true;
}
return Integer.parseInt(limitNum) > ALLOW_COUNT;
}