实现环境:kafka、redis、mysql8 使用Spring Boot搭建环境,数据库采用乐观锁

缓存和数据一致性分析

缓存和 DB 的一致性是一个讨论很多的问题,推荐看参考中的 使用缓存的正确姿势,首先看下先更新数据库,再更新缓存策略, 假设 A、B 两个线程,A 成功更新数据,在要更新缓存时,A 的时间片用完了,B 更新了数据库接着更新了缓存,这是 CPU 再分配给 A,则 A 又更新了缓存,这种情况下缓存中就是脏数据.

那么,如果避免这个问题呢?就是缓存不做更新,仅做删除,先更新数据库再删除缓存。

对于上面的问题,A 更新了数据库,还没来得及删除缓存,B 又更新了数据库,接着删除了缓存,然后 A 删除了缓存, 这样只有下次缓存未命中时,才会从数据库中重建缓存,避免了脏数据。 但是,也会有极端情况出现脏数据, A 做查询操作,没有命中缓存,从数据库中查询,但是还没来得及更新缓存,B 就更新了数据库,接着删除了缓存,然后 A 又重建了缓存,这时 A 中的就是脏数据,但是这种极端情况需要数据库的写操作前进入数据库,又晚于写操作删除缓存来更新缓存,发生的概率极其小,不过为了避免这种情况,可以为缓存设置过期时间。

高并发控制实现

1.Redis的限流控制

根据前面的优化分析,假设现在有 10 个商品,有 1000 个并发秒杀请求,最终只有 10 个订单会成功创建,也就是说有 990 的请求是无效的,这些无效的请求也会给数据库带来压力,因此可以在在请求落到数据库之前就将无效的请求过滤掉,将并发控制在一个可控的范围,这样落到数据库的压力就小很多

由于计数限流实现起来比较简单,因此采用计数限流,限流的实现可以直接使用 Guava 的 RateLimit 方法,但是由于后续需要将实例通过 Nginx 实现负载均衡,这里选用 Redis 实现分布式限流

在 RedisPool 中对 Jedis 线程池进行了简单的封装,封装了初始化和关闭方法,同时在 RedisPoolUtil 中对 Jedis 常用 API 进行简单封装,每个方法调用完毕则关闭 Jedis 连接。

限流要保证写入 Redis 操作的原子性,因此利用 Redis 的单线程机制,通过 LUA 脚本来完成。

-- 计数限流
-- 每次请求都将当前时间,精确到秒作为 key 放入 Redis 中,超时时间设置为 2s, Redis 将该 key 的值进行自增
-- 当达到阈值时返回错误,表示请求被限流
-- 写入 Redis 的操作用 Lua 脚本来完成,利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性

-- 资源唯一标志位
local key = KEYS[1]
-- 限流大小
local limit = tonumber(ARGV[1])

-- 获取当前流量大小
local currentLimit = tonumber(redis.call('get', key) or "0")

if currentLimit + 1 > limit then
    -- 达到限流大小 返回
    return 0;
else
    -- 没有达到阈值 value + 1
    redis.call("INCRBY", key, 1)
    -- 设置过期时间
    redis.call("EXPIRE", key, 2)
    return currentLimit + 1
end  
@Slf4j
public class RedisLimit {

    private static final int FAIL_CODE = 0;

    private static Integer limit = 5;

    /**
     * Redis 限流
     */
    public static Boolean limit() {
        Jedis jedis = null;
        Object result = null;
        try {
            // 获取 jedis 实例
            jedis = RedisPool.getJedis();
            // 解析 Lua 文件
            String script = ScriptUtil.getScript("limit.lua");
            // 请求限流
            String key = String.valueOf(System.currentTimeMillis() / 1000);
            // 计数限流
            result = jedis.eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
            System.out.println("限流的值:"+result);
            if (FAIL_CODE != (Long) result) {
                log.info("成功获取令牌");
                return true;
            }
        } catch (Exception e) {
            log.error("limit 获取 Jedis 实例失败:", e);
        } finally {
            RedisPool.jedisPoolClose(jedis);
        }
        return false;
    }
}

2.抢单流程

  1. 请求时先获取令牌
    > 每个请求到来先取令牌,获取到令牌再执行后续操作,获取不到直接返回 ERROR

    /**
     * 限流 + Redis 缓存库存 + Kafka异步下单
     * @param sid
     */
    @RequestMapping(value = "createOrderWithLimitAndRedisAndKafka", method = RequestMethod.POST)
    public String createOrderWithLimitAndRedisAndKafka(HttpServletRequest request, int sid) {
        try {
            if (RedisLimit.limit()) {
                orderService.createOrderWithLimitAndRedisAndKafka(sid);
            }
        } catch (Exception e) {
            log.error("Exception: " + e);
        }
        return "秒杀请求正在处理,排队中";
    }  
  2. 验证库存,保存请求到kafka

    @Override
    public void createOrderWithLimitAndRedisAndKafka(int sid) throws Exception {
        // 校验库存
        Stock stock = checkStockWithRedis(sid);
        String json = gson.toJson(stock);
        // 下单请求发送至 kafka,需要序列化 stock
        kafkaTemplate.send(kafkaTopic, gson.toJson(stock));
        log.info("消息发送至 Kafka 成功");
    }  
  3. kafka消费

    @KafkaListener(topics = "SECONDS-KILL-TOPIC")
    public void listen(ConsumerRecord<String, String> record) throws Exception {
    
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        System.out.println("抢单的值:"+record.value());
        // Object -> String
        String message = (String) kafkaMessage.get();
        // 反序列化
        Stock stock = gson.fromJson(message, Stock.class);
        // 创建订单
        orderService.consumerTopicToCreateOrderWithKafka(stock);
    }  

本文主要参考Github

如何设计一个秒杀系统