Redis结合kafka实现高并发抢单
实现环境:kafka、redis、mysql8 使用Spring Boot搭建环境,数据库采用乐观锁
缓存和数据一致性分析
缓存和 DB 的一致性是一个讨论很多的问题,推荐看参考中的 使用缓存的正确姿势,首先看下先更新数据库,再更新缓存策略, 假设 A、B 两个线程,A 成功更新数据,在要更新缓存时,A 的时间片用完了,B 更新了数据库接着更新了缓存,这是 CPU 再分配给 A,则 A 又更新了缓存,这种情况下缓存中就是脏数据.
那么,如果避免这个问题呢?就是缓存不做更新,仅做删除,先更新数据库再删除缓存。
对于上面的问题,A 更新了数据库,还没来得及删除缓存,B 又更新了数据库,接着删除了缓存,然后 A 删除了缓存, 这样只有下次缓存未命中时,才会从数据库中重建缓存,避免了脏数据。 但是,也会有极端情况出现脏数据, A 做查询操作,没有命中缓存,从数据库中查询,但是还没来得及更新缓存,B 就更新了数据库,接着删除了缓存,然后 A 又重建了缓存,这时 A 中的就是脏数据,但是这种极端情况需要数据库的写操作前进入数据库,又晚于写操作删除缓存来更新缓存,发生的概率极其小,不过为了避免这种情况,可以为缓存设置过期时间。
高并发控制实现
1.Redis的限流控制
根据前面的优化分析,假设现在有 10 个商品,有 1000 个并发秒杀请求,最终只有 10 个订单会成功创建,也就是说有 990 的请求是无效的,这些无效的请求也会给数据库带来压力,因此可以在在请求落到数据库之前就将无效的请求过滤掉,将并发控制在一个可控的范围,这样落到数据库的压力就小很多
由于计数限流实现起来比较简单,因此采用计数限流,限流的实现可以直接使用 Guava 的 RateLimit 方法,但是由于后续需要将实例通过 Nginx 实现负载均衡,这里选用 Redis 实现分布式限流
在 RedisPool 中对 Jedis 线程池进行了简单的封装,封装了初始化和关闭方法,同时在 RedisPoolUtil 中对 Jedis 常用 API 进行简单封装,每个方法调用完毕则关闭 Jedis 连接。
限流要保证写入 Redis 操作的原子性,因此利用 Redis 的单线程机制,通过 LUA 脚本来完成。
-- 计数限流
-- 每次请求都将当前时间,精确到秒作为 key 放入 Redis 中,超时时间设置为 2s, Redis 将该 key 的值进行自增
-- 当达到阈值时返回错误,表示请求被限流
-- 写入 Redis 的操作用 Lua 脚本来完成,利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性
-- 资源唯一标志位
local key = KEYS[1]
-- 限流大小
local limit = tonumber(ARGV[1])
-- 获取当前流量大小
local currentLimit = tonumber(redis.call('get', key) or "0")
if currentLimit + 1 > limit then
-- 达到限流大小 返回
return 0;
else
-- 没有达到阈值 value + 1
redis.call("INCRBY", key, 1)
-- 设置过期时间
redis.call("EXPIRE", key, 2)
return currentLimit + 1
end
@Slf4j
public class RedisLimit {
private static final int FAIL_CODE = 0;
private static Integer limit = 5;
/**
* Redis 限流
*/
public static Boolean limit() {
Jedis jedis = null;
Object result = null;
try {
// 获取 jedis 实例
jedis = RedisPool.getJedis();
// 解析 Lua 文件
String script = ScriptUtil.getScript("limit.lua");
// 请求限流
String key = String.valueOf(System.currentTimeMillis() / 1000);
// 计数限流
result = jedis.eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
System.out.println("限流的值:"+result);
if (FAIL_CODE != (Long) result) {
log.info("成功获取令牌");
return true;
}
} catch (Exception e) {
log.error("limit 获取 Jedis 实例失败:", e);
} finally {
RedisPool.jedisPoolClose(jedis);
}
return false;
}
}
2.抢单流程
请求时先获取令牌
> 每个请求到来先取令牌,获取到令牌再执行后续操作,获取不到直接返回 ERROR/** * 限流 + Redis 缓存库存 + Kafka异步下单 * @param sid */ @RequestMapping(value = "createOrderWithLimitAndRedisAndKafka", method = RequestMethod.POST) public String createOrderWithLimitAndRedisAndKafka(HttpServletRequest request, int sid) { try { if (RedisLimit.limit()) { orderService.createOrderWithLimitAndRedisAndKafka(sid); } } catch (Exception e) { log.error("Exception: " + e); } return "秒杀请求正在处理,排队中"; }
验证库存,保存请求到kafka
@Override public void createOrderWithLimitAndRedisAndKafka(int sid) throws Exception { // 校验库存 Stock stock = checkStockWithRedis(sid); String json = gson.toJson(stock); // 下单请求发送至 kafka,需要序列化 stock kafkaTemplate.send(kafkaTopic, gson.toJson(stock)); log.info("消息发送至 Kafka 成功"); }
kafka消费
@KafkaListener(topics = "SECONDS-KILL-TOPIC") public void listen(ConsumerRecord<String, String> record) throws Exception { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); System.out.println("抢单的值:"+record.value()); // Object -> String String message = (String) kafkaMessage.get(); // 反序列化 Stock stock = gson.fromJson(message, Stock.class); // 创建订单 orderService.consumerTopicToCreateOrderWithKafka(stock); }