跳到主要内容

实战与面试

一、秒杀系统设计

1.1 需求分析

核心问题:

  • 高并发:瞬间大量请求(百万级 QPS)
  • 超卖:库存扣减准确性
  • 性能:响应时间要求(< 100ms)
  • 可靠性:防止数据丢失

技术挑战:

  1. 数据库扛不住高并发
  2. 库存扣减的原子性
  3. 防刷和重复购买
  4. 订单处理异步化

1.2 整体架构

┌─────────────┐
│ 用户请求 │
└──────┬──────┘

┌─────────────┐
│ CDN + WAF │ ← 静态资源、防刷
└──────┬──────┘

┌─────────────┐
│ 负载均衡 │ ← Nginx/LVS
└──────┬──────┘

┌─────────────┐
│ Redis 缓存 │ ← 库存扣减、去重
└──────┬──────┘

┌─────────────┐
│ 消息队列 MQ │ ← 削峰填谷
└──────┬──────┘

┌─────────────┐
│ 订单服务 │ ← 异步处理
└──────┬──────┘

┌─────────────┐
│ 数据库 │ ← 持久化存储
└─────────────┘

1.3 核心代码实现

1.3.1 库存预热

@Service
public class SeckillPreheatService {
@Autowired
private StringRedisTemplate redisTemplate;

@Autowired
private ProductService productService;

/**
* 预热库存到 Redis
*/
public void preloadStock(Long productId, int stock) {
String stockKey = "seckill:stock:" + productId;
String userSetKey = "seckill:users:" + productId;

// 设置库存
redisTemplate.opsForValue().set(stockKey, String.valueOf(stock));

// 初始化已购买用户集合(用于去重)
redisTemplate.delete(userSetKey);

log.info("Preloaded stock for product {}: {}", productId, stock);
}

/**
* 批量预热
*/
@PostConstruct
public void preloadAllStocks() {
List<Product> products = productService.getSeckillProducts();
for (Product product : products) {
preloadStock(product.getId(), product.getStock());
}
}
}

1.3.2 秒杀核心逻辑

@Service
public class SeckillService {
@Autowired
private StringRedisTemplate redisTemplate;

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* Lua 脚本:原子性扣减库存 + 记录用户
*/
private static final String SECKILL_SCRIPT =
"local stockKey = KEYS[1] " +
"local userSetKey = KEYS[2] " +
"local userId = ARGV[1] " +
"local orderId = ARGV[2] " +

// 1. 检查用户是否已购买
"if redis.call('SISMEMBER', userSetKey, userId) == 1 then " +
" return -1 " + // 重复购买
"end " +

// 2. 检查并扣减库存
"local stock = tonumber(redis.call('GET', stockKey)) " +
"if stock == nil or stock <= 0 then " +
" return 0 " + // 库存不足
"end " +

"redis.call('DECR', stockKey) " +
"redis.call('SADD', userSetKey, userId) " +

// 3. 记录订单 ID
"local orderKey = 'seckill:orders:' .. userId " +
"redis.call('LPUSH', orderKey, orderId) " +
"redis.call('EXPIRE', orderKey, 3600) " +

"return 1 " + // 成功
"end";

/**
* 执行秒杀
*/
public SeckillResult doSeckill(Long userId, Long productId) {
String stockKey = "seckill:stock:" + productId;
String userSetKey = "seckill:users:" + productId;
String orderId = generateOrderId(userId, productId);

// 执行 Lua 脚本
DefaultRedisScript<Long> script = new DefaultRedisScript<>(SECKILL_SCRIPT, Long.class);
Long result = redisTemplate.execute(
script,
Arrays.asList(stockKey, userSetKey),
userId.toString(),
orderId
);

SeckillResult seckillResult = new SeckillResult();

if (result == 1) {
// 秒杀成功,发送消息到 MQ
SeckillMessage message = new SeckillMessage(userId, productId, orderId);
rabbitTemplate.convertAndSend("seckill.queue", message);

seckillResult.setSuccess(true);
seckillResult.setMessage("秒杀成功!");
seckillResult.setOrderId(orderId);
} else if (result == 0) {
seckillResult.setSuccess(false);
seckillResult.setMessage("抱歉,库存不足!");
} else if (result == -1) {
seckillResult.setSuccess(false);
seckillResult.setMessage("您已购买过该商品!");
}

return seckillResult;
}

/**
* 生成订单 ID
*/
private String generateOrderId(Long userId, Long productId) {
return String.format("%d%d%d",
System.currentTimeMillis(),
productId,
userId);
}
}

1.3.3 订单消息消费者

@Component
public class SeckillOrderConsumer {
@Autowired
private OrderService orderService;

@Autowired
private ProductService productService;

@RabbitListener(queues = "seckill.queue")
public void handleSeckillOrder(SeckillMessage message) {
try {
log.info("Processing seckill order: {}", message);

// 1. 创建订单
Order order = new Order();
order.setId(message.getOrderId());
order.setUserId(message.getUserId());
order.setProductId(message.getProductId());
order.setStatus(OrderStatus.CREATED);
order.setCreateTime(new Date());

orderService.createOrder(order);

// 2. 扣减数据库库存
productService.decreaseStock(message.getProductId(), 1);

// 3. 更新订单状态
order.setStatus(OrderStatus.PAID);
orderService.updateOrder(order);

log.info("Seckill order processed successfully: {}", message.getOrderId());
} catch (Exception e) {
log.error("Failed to process seckill order: {}", message, e);

// 失败处理:补偿库存
redisTemplate.opsForValue().increment("seckill:stock:" + message.getProductId());
redisTemplate.opsForSet().remove("seckill:users:" + message.getProductId(), message.getUserId().toString());
}
}
}

1.3.4 接口限流

@Component
public class RateLimiter {
@Autowired
private StringRedisTemplate redisTemplate;

/**
* 限流检查
* @param userId 用户 ID
* @param limit 限流数量
* @param window 时间窗口(秒)
* @return true=允许访问,false=被限流
*/
public boolean allowRequest(Long userId, int limit, int window) {
String key = "ratelimit:user:" + userId;
long now = System.currentTimeMillis();

// Lua 脚本:滑动窗口限流
String luaScript =
"redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1] - " + (window * 1000) + ") " +
"local count = redis.call('ZCARD', KEYS[1]) " +
"if count < tonumber(ARGV[2]) then " +
" redis.call('ZADD', KEYS[1], ARGV[3], ARGV[3]) " +
" redis.call('EXPIRE', KEYS[1], " + window + ") " +
" return 1 " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(
script,
Collections.singletonList(key),
String.valueOf(now),
String.valueOf(limit),
String.valueOf(now)
);

return result == 1;
}
}

// 使用
@RestController
@RequestMapping("/seckill")
public class SeckillController {
@Autowired
private RateLimiter rateLimiter;

@Autowired
private SeckillService seckillService;

@PostMapping("/{productId}")
public ResponseEntity<?> seckill(
@PathVariable Long productId,
@RequestHeader("X-User-Id") Long userId) {

// 限流:每个用户每秒最多请求 5 次
if (!rateLimiter.allowRequest(userId, 5, 1)) {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
.body("请求过于频繁,请稍后再试");
}

SeckillResult result = seckillService.doSeckill(userId, productId);
return ResponseEntity.ok(result);
}
}

1.4 优化方案

1.4.1 本地缓存

@Component
public class LocalCache {
private final Cache<Long, Boolean> cache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(1, TimeUnit.MINUTES)
.build();

public boolean isSeckilled(Long userId, Long productId) {
String key = userId + ":" + productId;
return cache.getIfPresent(key) != null;
}

public void markSeckilled(Long userId, Long productId) {
String key = userId + ":" + productId;
cache.put(key, true);
}
}

1.4.2 库存分段

/**
* 将库存分散到多个 key,减少竞争
*/
public void segmentedStockPreheat(Long productId, int totalStock) {
int segmentCount = 10; // 分成 10 段
int stockPerSegment = totalStock / segmentCount;

for (int i = 0; i < segmentCount; i++) {
String key = "seckill:stock:" + productId + ":" + i;
redisTemplate.opsForValue().set(key, String.valueOf(stockPerSegment));
}
}

/**
* 随机选择一段扣减
*/
public boolean decreaseSegmentedStock(Long productId) {
int segmentCount = 10;
int randomSegment = ThreadLocalRandom.current().nextInt(segmentCount);
String key = "seckill:stock:" + productId + ":" + randomSegment;

Long result = redisTemplate.opsForValue().decrement(key);
if (result != null && result >= 0) {
return true;
} else {
// 扣减失败,回滚
redisTemplate.opsForValue().increment(key);
return false;
}
}

1.5 面试题汇总

Q1:秒杀系统如何解决超卖问题?

答案:

  1. Redis 原子操作

    • 使用 Lua 脚本保证原子性
    • DECR 命令是原子的
  2. 数据库乐观锁

UPDATE stock
SET count = count - 1
WHERE product_id = ? AND count > 0
  1. 分布式锁
    • Redis SET NX
    • ZooKeeper 临时节点

Q2:秒杀系统如何应对高并发?

答案:

  1. 多级缓存

    • 浏览器缓存
    • CDN 缓存
    • 本地缓存
    • Redis 缓存
  2. 异步处理

    • 消息队列削峰
    • 订单异步创建
  3. 限流降级

    • 接口限流
    • 用户限流
    • IP 限流
  4. 水平扩展

    • Redis 集群
    • 应用集群
    • 数据库读写分离

二、分布式锁实现

2.1 基础实现

@Service
public class RedisLockService {
@Autowired
private StringRedisTemplate redisTemplate;

private static final String LOCK_PREFIX = "lock:";
private static final long LOCK_EXPIRE = 30; // 锁过期时间(秒)

/**
* 获取锁
* @param lockKey 锁的 key
* @param requestId 唯一标识(用于释放锁时验证)
* @return 是否获取成功
*/
public boolean tryLock(String lockKey, String requestId) {
String key = LOCK_PREFIX + lockKey;

Boolean result = redisTemplate.opsForValue().setIfAbsent(
key,
requestId,
LOCK_EXPIRE,
TimeUnit.SECONDS
);

return Boolean.TRUE.equals(result);
}

/**
* 释放锁(Lua 脚本保证原子性)
*/
public boolean unlock(String lockKey, String requestId) {
String key = LOCK_PREFIX + lockKey;

String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList(key), requestId);

return result == 1;
}
}

2.2 可重入锁

@Service
public class RedisReentrantLock {
@Autowired
private StringRedisTemplate redisTemplate;

private final ThreadLocal<Map<String, Integer>> lockers =
ThreadLocal.withInitial(HashMap::new);

/**
* 可重入锁
*/
public boolean lock(String lockKey, long expireTime) {
String key = "lock:" + lockKey;
String value = String.valueOf(Thread.currentThread().getId());

// 检查是否已持有锁
Map<String, Integer> locks = lockers.get();
if (locks.containsKey(key)) {
locks.put(key, locks.get(key) + 1);
return true;
}

// 尝试获取锁
Boolean acquired = redisTemplate.opsForValue().setIfAbsent(
key,
value,
expireTime,
TimeUnit.SECONDS
);

if (Boolean.TRUE.equals(acquired)) {
locks.put(key, 1);
return true;
}

return false;
}

/**
* 释放可重入锁
*/
public boolean unlock(String lockKey) {
String key = "lock:" + lockKey;
Map<String, Integer> locks = lockers.get();

Integer count = locks.get(key);
if (count == null) {
return false;
}

count--;
if (count > 0) {
locks.put(key, count);
return true;
}

locks.remove(key);

String value = String.valueOf(Thread.currentThread().getId());
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList(key), value);

return result == 1;
}
}

2.3 Redlock 算法(Redis 官方推荐)

@Service
public class RedlockService {
private List<RedisTemplate<String, String>> redisTemplates;
private static final long LOCK_EXPIRE = 30000; // 30 秒

/**
* 获取锁(需要在大多数节点上成功)
*/
public boolean lock(String lockKey, String requestId) {
int successCount = 0;
int requiredCount = redisTemplates.size() / 2 + 1; // 多数节点

long startTime = System.currentTimeMillis();

for (RedisTemplate<String, String> template : redisTemplates) {
try {
Boolean result = template.opsForValue().setIfAbsent(
lockKey,
requestId,
LOCK_EXPIRE,
TimeUnit.MILLISECONDS
);

if (Boolean.TRUE.equals(result)) {
successCount++;
}
} catch (Exception e) {
log.error("Failed to acquire lock on node", e);
}
}

// 计算获取锁消耗的时间
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed >= LOCK_EXPIRE) {
// 获取锁耗时超过锁的过期时间,释放所有已获取的锁
unlock(lockKey, requestId);
return false;
}

// 检查是否在大多数节点上成功
return successCount >= requiredCount;
}

/**
* 释放锁
*/
public void unlock(String lockKey, String requestId) {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

for (RedisTemplate<String, String> template : redisTemplates) {
try {
DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
template.execute(script, Collections.singletonList(lockKey), requestId);
} catch (Exception e) {
log.error("Failed to release lock on node", e);
}
}
}
}

2.4 看门狗机制(自动续期)

@Service
public class WatchDogLock {
@Autowired
private StringRedisTemplate redisTemplate;

private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);

private final Map<String, Future<?>> watchDogs = new ConcurrentHashMap<>();

/**
* 获取锁并启动看门狗
*/
public boolean lockWithWatchDog(String lockKey, String requestId) {
// 1. 尝试获取锁
Boolean acquired = redisTemplate.opsForValue().setIfAbsent(
"lock:" + lockKey,
requestId,
30,
TimeUnit.SECONDS
);

if (!Boolean.TRUE.equals(acquired)) {
return false;
}

// 2. 启动看门狗线程,定期续期
Future<?> future = scheduler.scheduleAtFixedRate(() -> {
// 检查锁是否还存在
String value = redisTemplate.opsForValue().get("lock:" + lockKey);
if (requestId.equals(value)) {
// 延长锁的过期时间
redisTemplate.expire("lock:" + lockKey, 30, TimeUnit.SECONDS);
log.debug("Watch dog extended lock: {}", lockKey);
} else {
// 锁已经不存在了,停止看门狗
Future<?> f = watchDogs.get(lockKey);
if (f != null) {
f.cancel(false);
watchDogs.remove(lockKey);
}
}
}, 10, 10, TimeUnit.SECONDS); // 每 10 秒续期一次

watchDogs.put(lockKey, future);
return true;
}

/**
* 释放锁并停止看门狗
*/
public void unlock(String lockKey, String requestId) {
// 1. 停止看门狗
Future<?> future = watchDogs.remove(lockKey);
if (future != null) {
future.cancel(false);
}

// 2. 释放锁
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
redisTemplate.execute(script, Collections.singletonList("lock:" + lockKey), requestId);
}
}

2.5 分布式锁对比

特性Redis SET NXZooKeeper数据库etcd
性能
可靠性
实现复杂度
持久化可配置持久化持久化持久化
适用场景高并发、高性能高可靠性简单场景分布式协调

2.6 面试题

Q1:分布式锁如何保证原子性?

答案:

  1. SET NX + EX:Redis 原子命令
  2. Lua 脚本:保证多个命令原子执行
  3. WATCH + MULTI/EXEC:乐观锁机制

Q2:分布式锁失效怎么办?

答案:

  1. 看门狗机制:自动续期
  2. 唯一标识:释放时验证
  3. 超时重试:获取失败自动重试
  4. Redlock 算法:多节点冗余

三、排行榜系统设计

3.1 需求分析

核心功能:

  • 实时排名
  • 分数更新
  • 范围查询(Top N)
  • 用户排名查询
  • 周围排名查询

**技术选型:**Redis Sorted Set(ZSet)

3.2 核心实现

@Service
public class LeaderboardService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final String LEADERBOARD_KEY = "leaderboard:";

/**
* 增加分数
*/
public Double addScore(String leaderboard, String member, double score) {
String key = LEADERBOARD_KEY + leaderboard;
return redisTemplate.opsForZSet().incrementScore(key, member, score);
}

/**
* 获取用户排名
*/
public Long getRank(String leaderboard, String member) {
String key = LEADERBOARD_KEY + leaderboard;
Long rank = redisTemplate.opsForZSet().reverseRank(key, member);
return rank != null ? rank + 1 : null; // 排名从 1 开始
}

/**
* 获取用户分数
*/
public Double getScore(String leaderboard, String member) {
String key = LEADERBOARD_KEY + leaderboard;
return redisTemplate.opsForZSet().score(key, member);
}

/**
* 获取 Top N
*/
public List<RankingEntry> getTopN(String leaderboard, int n) {
String key = LEADERBOARD_KEY + leaderboard;

Set<ZSetOperations.TypedTuple<Object>> set =
redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, n - 1);

List<RankingEntry> result = new ArrayList<>();
int rank = 1;
for (ZSetOperations.TypedTuple<Object> tuple : set) {
RankingEntry entry = new RankingEntry();
entry.setRank(rank++);
entry.setMember(tuple.getValue().toString());
entry.setScore(tuple.getScore());
result.add(entry);
}

return result;
}

/**
* 获取用户周围排名(前后各 n 名)
*/
public List<RankingEntry> getAroundRank(String leaderboard, String member, int n) {
String key = LEADERBOARD_KEY + leaderboard;

// 获取用户排名
Long rank = redisTemplate.opsForZSet().reverseRank(key, member);
if (rank == null) {
return Collections.emptyList();
}

// 计算范围
long start = Math.max(0, rank - n);
long end = Math.min(redisTemplate.opsForZSet().size(key) - 1, rank + n);

// 获取范围内的用户
Set<Object> members = redisTemplate.opsForZSet().reverseRange(key, start, end);
if (members == null || members.isEmpty()) {
return Collections.emptyList();
}

// 查询分数
List<RankingEntry> result = new ArrayList<>();
for (Object m : members) {
String mem = m.toString();
Double score = redisTemplate.opsForZSet().score(key, mem);
Long r = redisTemplate.opsForZSet().reverseRank(key, mem) + 1;

RankingEntry entry = new RankingEntry();
entry.setRank(r);
entry.setMember(mem);
entry.setScore(score);
result.add(entry);
}

return result;
}

/**
* 批量获取用户排名
*/
public Map<String, Long> batchGetRank(String leaderboard, List<String> members) {
String key = LEADERBOARD_KEY + leaderboard;

Map<String, Long> result = new HashMap<>();
for (String member : members) {
Long rank = redisTemplate.opsForZSet().reverseRank(key, member);
if (rank != null) {
result.put(member, rank + 1);
}
}

return result;
}

@Data
public static class RankingEntry {
private Long rank;
private String member;
private Double score;
}
}

3.3 实时排行榜优化

@Service
public class RealTimeLeaderboardService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private LeaderboardRepository leaderboardRepository;

/**
* 定时同步 Redis 排行榜到数据库(每 5 分钟)
*/
@Scheduled(fixedDelay = 300000)
public void syncToDatabase() {
String pattern = LEADERBOARD_KEY + "*";
Set<String> keys = redisTemplate.keys(pattern);

for (String key : keys) {
String leaderboard = key.substring(LEADERBOARD_KEY.length());

// 获取 Top 1000
Set<ZSetOperations.TypedTuple<Object>> top1000 =
redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, 999);

// 转换并保存到数据库
List<LeaderboardEntry> entries = top1000.stream()
.map(tuple -> {
LeaderboardEntry entry = new LeaderboardEntry();
entry.setLeaderboard(leaderboard);
entry.setMember(tuple.getValue().toString());
entry.setScore(tuple.getScore());
return entry;
})
.collect(Collectors.toList());

leaderboardRepository.saveAll(entries);
}
}

/**
* 分页查询(使用缓存)
*/
public List<RankingEntry> getLeaderboardPage(String leaderboard, int page, int pageSize) {
String cacheKey = String.format("leaderboard:page:%s:%d:%d", leaderboard, page, pageSize);

// 先从缓存获取
List<RankingEntry> cached = (List<RankingEntry>) redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return cached;
}

// 缓存未命中,从 Redis ZSet 获取
String key = LEADERBOARD_KEY + leaderboard;
long start = (long) (page - 1) * pageSize;
long end = start + pageSize - 1;

Set<ZSetOperations.TypedTuple<Object>> set =
redisTemplate.opsForZSet().reverseRangeWithScores(key, start, end);

List<RankingEntry> result = set.stream()
.map(tuple -> {
RankingEntry entry = new RankingEntry();
entry.setRank(start + 1); // 简化处理,实际需要计算准确排名
entry.setMember(tuple.getValue().toString());
entry.setScore(tuple.getScore());
return entry;
})
.collect(Collectors.toList());

// 写入缓存(5 分钟)
redisTemplate.opsForValue().set(cacheKey, result, 5, TimeUnit.MINUTES);

return result;
}
}

3.4 多维度排行榜

/**
* 支持多个维度的排行榜
*/
@Service
public class MultiDimensionLeaderboard {
@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 更新多个维度分数
*/
public void updateScores(String member, Map<String, Double> dimensions) {
for (Map.Entry<String, Double> entry : dimensions.entrySet()) {
String dimension = entry.getKey();
Double score = entry.getValue();
String key = "leaderboard:" + dimension;

redisTemplate.opsForZSet().incrementScore(key, member, score);
}
}

/**
* 获取用户在各个维度的排名
*/
public Map<String, Long> getDimensionRanks(String member, List<String> dimensions) {
Map<String, Long> result = new HashMap<>();

for (String dimension : dimensions) {
String key = "leaderboard:" + dimension;
Long rank = redisTemplate.opsForZSet().reverseRank(key, member);
if (rank != null) {
result.put(dimension, rank + 1);
}
}

return result;
}

/**
* 综合排名(加权)
*/
public List<RankingEntry> getCompositeLeaderboard(
Map<String, Double> weights, int n) {

// 计算综合分数:score = w1*score1 + w2*score2 + ...
Map<String, Double> compositeScores = new HashMap<>();

for (Map.Entry<String, Double> weight : weights.entrySet()) {
String dimension = weight.getKey();
Double weightValue = weight.getValue();
String key = "leaderboard:" + dimension;

Set<ZSetOperations.TypedTuple<Object>> scores =
redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, -1);

for (ZSetOperations.TypedTuple<Object> tuple : scores) {
String member = tuple.getValue().toString();
Double score = tuple.getScore();

compositeScores.merge(member, score * weightValue, Double::sum);
}
}

// 排序并返回 Top N
return compositeScores.entrySet().stream()
.sorted(Map.Entry.<String, Double>comparingByValue().reversed())
.limit(n)
.map(entry -> {
RankingEntry rankingEntry = new RankingEntry();
rankingEntry.setMember(entry.getKey());
rankingEntry.setScore(entry.getValue());
return rankingEntry;
})
.collect(Collectors.toList());
}
}

四、高并发缓存架构

4.1 多级缓存架构

┌─────────────────┐
│ L1: 浏览器缓存 │ → 强缓存、协商缓存
└────────┬────────┘
↓ Miss
┌─────────────────┐
│ L2: CDN 缓存 │ → 静态资源、图片
└────────┬────────┘
↓ Miss
┌─────────────────┐
│ L3: Nginx 缓存 │ → 反向代理缓存
└────────┬────────┘
↓ Miss
┌─────────────────┐
│ L4: 本地缓存 │ → Caffeine/Guava
└────────┬────────┘
↓ Miss
┌─────────────────┐
│ L5: Redis 缓存 │ → 分布式缓存
└────────┬────────┘
↓ Miss
┌─────────────────┐
│ 数据库 │ → MySQL/PostgreSQL
└─────────────────┘

4.2 本地缓存 + Redis 实现

@Service
public class MultiLevelCacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;

// 本地缓存
private final Cache<String, Object> localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build();

/**
* 多级缓存查询
*/
public Object get(String key) {
// L1: 本地缓存
Object value = localCache.getIfPresent(key);
if (value != null) {
log.debug("Cache hit in L1 (local): {}", key);
return value;
}

// L2: Redis 缓存
value = redisTemplate.opsForValue().get(key);
if (value != null) {
log.debug("Cache hit in L2 (Redis): {}", key);
// 写入本地缓存
localCache.put(key, value);
return value;
}

// L3: 数据库
log.debug("Cache miss, fetching from database: {}", key);
value = loadFromDatabase(key);

if (value != null) {
// 写入 Redis
redisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES);
// 写入本地缓存
localCache.put(key, value);
}

return value;
}

/**
* 更新缓存(删除所有层级的缓存)
*/
public void update(String key, Object value) {
// 1. 更新数据库
updateDatabase(key, value);

// 2. 删除本地缓存
localCache.invalidate(key);

// 3. 删除 Redis 缓存
redisTemplate.delete(key);

// 4. 通知其他节点删除本地缓存
publishCacheInvalidation(key);
}

/**
* 发布缓存失效消息
*/
private void publishCacheInvalidation(String key) {
String channel = "cache:invalidate";
redisTemplate.convertAndSend(channel, key);
}

/**
* 监听缓存失效消息
*/
@RedisMessageListener(topic = "cache:invalidate")
public void handleCacheInvalidation(String message) {
log.info("Received cache invalidation message: {}", message);
localCache.invalidate(message);
}
}

4.3 缓存预热

@Component
public class CacheWarmer {
@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private ProductService productService;

/**
* 应用启动时预热缓存
*/
@PostConstruct
public void warmUpCache() {
log.info("Starting cache warm-up...");

// 1. 预热热门商品
List<Product> hotProducts = productService.getHotProducts();
for (Product product : hotProducts) {
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, product, 1, TimeUnit.HOURS);
}

// 2. 预热商品分类
List<Category> categories = productService.getAllCategories();
for (Category category : categories) {
String key = "category:" + category.getId();
redisTemplate.opsForValue().set(key, category, 1, TimeUnit.HOURS);
}

log.info("Cache warm-up completed. Total {} items loaded.",
hotProducts.size() + categories.size());
}

/**
* 定时刷新缓存(每小时)
*/
@Scheduled(fixedDelay = 3600000)
public void refreshCache() {
log.info("Refreshing cache...");
warmUpCache();
}
}

4.4 缓存一致性方案

方案 1:Cache Aside Pattern

// 读:先读缓存,没有则读数据库,再写入缓存
public Product get(Long id) {
Product product = (Product) redisTemplate.opsForValue().get("product:" + id);
if (product == null) {
product = productRepository.findById(id);
if (product != null) {
redisTemplate.opsForValue().set("product:" + id, product, 1, TimeUnit.HOURS);
}
}
return product;
}

// 写:先更新数据库,再删除缓存
public void update(Product product) {
productRepository.save(product);
redisTemplate.delete("product:" + product.getId());
}

问题:并发时可能出现不一致

方案 2:延迟双删

public void updateWithDoubleDelete(Product product) {
// 1. 删除缓存
redisTemplate.delete("product:" + product.getId());

// 2. 更新数据库
productRepository.save(product);

// 3. 延迟再删除缓存
Thread.sleep(1000); // 延迟 1 秒
redisTemplate.delete("product:" + product.getId());
}

方案 3:订阅 Binlog(Canal)

@Component
public class CanalConsumer {
@Autowired
private RedisTemplate<String, Object> redisTemplate;

@KafkaListener(topics = "canal-topic")
public void handleCanalMessage(CanalMessage message) {
if ("UPDATE".equals(message.getType()) || "DELETE".equals(message.getType())) {
String tableName = message.getTable();
String id = message.getId();

// 删除相关缓存
String cacheKey = tableName + ":" + id;
redisTemplate.delete(cacheKey);

log.info("Invalidated cache: {}", cacheKey);
}
}
}

五、常见面试题汇总

5.1 基础题

Q1:Redis 为什么快?

答案:

  1. 纯内存操作:内存访问速度快(纳秒级)
  2. 单线程模型:避免线程切换和锁竞争
  3. IO 多路复用:epoll/kqueue 实现高并发
  4. 高效数据结构:跳表、压缩列表等

Q2:Redis 是单线程的吗?

答案:

  • 网络请求处理:单线程(Redis 6.0 之前)
  • 持久化:fork 子进程处理
  • Redis 6.0+:引入多线程处理网络 IO(命令执行仍是单线程)

Q3:Redis 的数据结构及应用场景?

答案:

数据结构应用场景
String缓存、计数器、分布式锁
Hash对象存储、购物车
List消息队列、最新列表
Set唯一性、标签、交集/并集
Sorted Set排行榜、范围查询、优先级队列

Q4:Redis 持久化方式?

答案:

  • RDB:快照持久化,适合备份,恢复快
  • AOF:日志持久化,数据安全性高
  • 混合持久化:RDB + AOF,兼顾性能和安全

5.2 进阶题

Q5:缓存穿透、击穿、雪崩的区别及解决方案?

答案:

问题定义解决方案
缓存穿透查询不存在的数据布隆过滤器、缓存空值
缓存击穿热点 key 过期互斥锁、热点数据永不过期
缓存雪崩大量 key 同时过期过期时间加随机值、多级缓存

Q6:如何保证缓存和数据库一致性?

答案:

  1. Cache Aside Pattern

    • 读:先读缓存,未命中读数据库,写入缓存
    • 写:先更新数据库,再删除缓存
  2. 延迟双删

    • 删除缓存 → 更新数据库 → 延迟 → 删除缓存
  3. 订阅 Binlog

    • 监听 MySQL binlog
    • 解析变更后删除缓存
    • 最终一致性

Q7:Redis 的过期策略?

答案:

  1. 惰性删除:访问时检查是否过期
  2. 定期删除:每秒 10 次随机抽查
  3. 主动删除:内存不足时 + 淘汰策略

Q8:Redis 的内存淘汰策略?

答案:

  • noeviction:不淘汰,写入返回错误
  • allkeys-lru:淘汰最少使用的 key
  • allkeys-lfu:淘汰最不经常使用的 key
  • volatile-lru:淘汰设置了过期时间的 key(LRU)
  • volatile-ttl:淘汰即将过期的 key

5.3 架构题

Q9:Redis 主从复制的原理?

答案:

  1. 建立连接:从节点发送 SYNC 命令
  2. 全量同步:主节点生成 RDB 发送给从节点
  3. 增量同步:主节点记录写命令到复制缓冲区
  4. 命令传播:主节点执行写命令后发送给从节点

Q10:哨兵模式的工作原理?

答案:

  1. 主观下线(SDOWN):单个哨兵认为主节点下线
  2. 客观下线(ODOWN):多个哨兵(quorum)认为主节点下线
  3. 故障转移
    • 选举领头哨兵
    • 选择新主节点(优先级、偏移量、run ID)
    • 从节点升级为主节点
    • 通知其他从节点和新主节点建立复制

Q11:Redis Cluster 的原理?

答案:

  1. 分片:16384 个槽位
  2. Key 分布CRC16(key) % 16384
  3. 节点分配:每个节点负责部分槽位
  4. 高可用:每个主节点配置从节点
  5. 自动故障转移:主节点故障时从节点升级

Q12:如何设计分布式锁?

答案:

  1. 基础实现SET key value NX EX 30
  2. 可重入锁:记录线程 + 计数
  3. 看门狗:自动续期
  4. Redlock:多节点实现
  5. 释放锁:Lua 脚本保证原子性

5.4 场景题

Q13:如何设计一个秒杀系统?

答案:

架构设计:

用户 → CDN → 负载均衡 → Redis(库存扣减) → MQ → 订单服务 → 数据库

核心要点:

  1. 库存预热:提前加载到 Redis
  2. Lua 脚本:保证原子性
  3. 用户去重:Set 记录已购买用户
  4. MQ 异步:削峰填谷
  5. 限流降级:保护系统

Q14:如何实现排行榜?

答案:

使用 Redis Sorted Set:

// 增加分数
zadd leaderboard score member

// 获取排名
zrevrank leaderboard member

// 获取 Top N
zrevrange leaderboard 0 n-1 withscores

// 获取用户周围排名
zrevrange leaderboard start end withscores

优化:

  • 分页缓存
  • 定时同步到数据库
  • 多维度排行榜

Q15:如何设计限流器?

答案:

  1. 固定窗口:简单但边界问题
  2. 滑动窗口:推荐方案
  3. 令牌桶:平滑限流
  4. 漏桶:恒定速率

实现(滑动窗口):

redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)
if count < limit then
redis.call('ZADD', key, now, now)
return 1
else
return 0
end

Q16:如何实现点赞功能?

答案:

使用 Sorted Set:

// 点赞
zadd post:likes timestamp user:1001

// 取消点赞
zrem post:likes user:1001

// 统计点赞数
zcard post:likes

// 检查是否点赞
zscore post:likes user:1001

// 获取点赞用户列表
zrevrange post:likes 0 9 withscores

优化:

  • 缓存点赞数
  • 异步更新数据库
  • BitMap 存储点赞状态

5.5 性能优化题

Q17:如何监控 Redis 性能?

答案:

监控工具:

  1. redis-cli

    • INFO 命令
    • SLOWLOG 慢查询
    • MONITOR 实时监控
  2. Prometheus + Grafana

    • redis_exporter 导出指标
    • 可视化监控

关键指标:

  • 内存使用率:< 80%
  • QPS:监控命令执行频率
  • 慢查询:记录并优化
  • 命中率:缓存命中率 > 90%
  • 连接数:避免连接数过多

Q18:如何优化 Redis 性能?

答案:

  1. 避免 bigkey

    • 使用 UNLINK 替代 DEL
    • 分批删除
  2. 批量操作

    • Pipeline 减少网络开销
    • Lua 脚本保证原子性
  3. 选择合适数据结构

    • Hash 替代多个 String
    • ZSet 替代 List 排序
  4. 控制 key 生命周期

    • 设置合理过期时间
    • 避免内存泄漏
  5. 主从读写分离

    • 主节点写
    • 从节点读
  6. 集群化部署

    • Redis Cluster 分片
    • 减少单节点压力

Q19:Redis 大 key 如何处理?

答案:

危害:

  • 删除时阻塞主线程
  • 网络传输慢
  • 内存占用大

解决方案:

  1. 拆分:将大 key 拆分成多个小 key
  2. 压缩:使用序列化压缩
  3. 分批删除:使用 SCAN/HSCAN
  4. UNLINK:异步删除
  5. 监控:使用 redis-cli --bigkeys 发现

5.6 高级题

Q20:Redis 集群方案对比?

答案:

方案优点缺点适用场景
主从复制简单主节点单点读写分离
哨兵自动故障转移主节点写压力大中小规模
Cluster自动分片运维复杂大规模
Twemproxy代理分片单点已淘汰
Codis功能完善需要额外组件中大规模

Q21:如何实现 Session 共享?

答案:

使用 Redis 存储 Session:

@Configuration
@EnableRedisHttpSession(maxInactiveIntervalInSeconds = 1800)
public class SessionConfig {
// Spring Session 自动配置
}

优点:

  • 集中式存储
  • 自动过期
  • 跨域共享

注意:

  • Session 序列化
  • 内存占用
  • 过期策略

Q22:Redis 实现 IM 消息存储?

答案:

方案设计:

  1. 最近消息:List
lpush messages:1001 "{...}"
ltrim messages:1001 0 99 # 保留最近 100 条
  1. 未读消息:Hash
hset unread:1001 "msg:123" "{...}"
hdel unread:1001 "msg:123"
  1. 消息已读状态:Bitmap
setbit user:1001:read 123 1

六、实战技巧总结

6.1 设计原则

  1. Key 命名规范

    • 使用冒号分隔:namespace:type:id
    • 示例:user:1001:profile
  2. 合理设置过期时间

    • 热点数据:较长过期时间
    • 临时数据:较短过期时间
    • 加随机值避免同时过期
  3. 避免 bigkey

    • 单个 key 不超过 10 MB
    • 集合元素不超过 10000 个
  4. 使用 Pipeline

    • 批量操作减少网络开销
    • 控制每批 100-1000 个命令
  5. Lua 脚本

    • 保证原子性
    • 减少网络往返
    • 复用脚本(SCRIPT LOAD)

6.2 常见陷阱

  1. KEYS 命令

    • 生产环境禁止使用
    • 使用 SCAN 代替
  2. FLUSHALL

    • 清空所有数据库
    • 误操作风险高
  3. 缓存雪崩

    • 过期时间加随机值
    • 多级缓存
  4. Monitor 命令

    • 严重影响性能
    • 仅用于调试
  5. SAVE 命令

    • 阻塞主线程
    • 使用 BGSAVE

6.3 最佳实践

  1. 连接池配置
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(100);
config.setMaxIdle(50);
config.setMinIdle(10);
  1. 序列化选择

    • JSON:可读性好,空间占用大
    • Protobuf:性能好,空间占用小
    • Kryo:性能优秀
  2. 监控告警

    • 内存使用率 > 80%
    • 慢查询数量
    • 连接数异常
  3. 备份恢复

    • 定时 RDB 快照
    • AOF 持久化
    • 异地备份

七、面试准备建议

7.1 复习重点

  1. 基础概念(30%)

    • 数据结构及应用场景
    • 持久化方式
    • 过期策略
  2. 核心原理(40%)

    • 单线程模型
    • IO 多路复用
    • 主从复制
    • 哨兵、集群
  3. 实战场景(30%)

    • 缓存设计
    • 分布式锁
    • 秒杀系统
    • 排行榜

7.2 实战经验

  1. 项目经验

    • 分享实际项目中的使用场景
    • 遇到的问题及解决方案
    • 性能优化经验
  2. 故障排查

    • 慢查询分析
    • 内存泄漏处理
    • 连接数异常排查
  3. 性能调优

    • 监控指标
    • 优化方案
    • 效果对比

7.3 进阶学习

  1. 源码阅读

    • Redis 核心代码
    • 数据结构实现
    • 网络模型
  2. 架构设计

    • 分布式缓存架构
    • 高可用方案
    • 容量规划
  3. 新技术

    • Redis 7.0 新特性
    • Redis Modules
    • Redis Cluster 演进

八、模拟面试

面试官:请介绍一下 Redis 的基本数据结构及使用场景?

参考答案:

Redis 有 5 种基本数据结构:

  1. String(字符串)

    • 应用:缓存、计数器、分布式锁、Session
    • 示例:SET user:1001 "Alice"
  2. Hash(哈希)

    • 应用:对象存储、购物车
    • 示例:HSET user:1001 name "Alice" age 20
  3. List(列表)

    • 应用:消息队列、最新列表
    • 示例:LPUSH queue:tasks "task1"
  4. Set(集合)

    • 应用:唯一性、标签、共同好友
    • 示例:SADD tags:1001 "java" "redis"
  5. Sorted Set(有序集合)

    • 应用:排行榜、范围查询
    • 示例:ZADD leaderboard 100 "user1001"

此外,Redis 还有 3 种高级数据结构:

  • Bitmap:用户签到、在线统计
  • HyperLogLog:UV 统计
  • GEO:地理位置

面试官:如何保证缓存和数据库的一致性?

参考答案:

常用方案:

  1. Cache Aside Pattern(旁路缓存)

    • 读:先读缓存,未命中读数据库,写入缓存
    • 写:先更新数据库,再删除缓存
  2. 延迟双删

    // 1. 删除缓存
    redis.delete(key);
    // 2. 更新数据库
    db.update(data);
    // 3. 延迟再删除
    Thread.sleep(1000);
    redis.delete(key);
  3. 订阅 Binlog(Canal)

    • 监听 MySQL binlog
    • 解析变更后删除缓存
    • 最终一致性

对比:

  • Cache Aside:简单,但可能短暂不一致
  • 延迟双删:更可靠,但性能稍差
  • Canal:解耦,但需要额外组件

参考文档: