AI摘要
文章以一次大促雪崩事故切入,系统梳理缓存穿透、击穿、雪崩三大问题,给出布隆过滤器、互斥锁、逻辑过期、多级缓存、随机TTL、预热、熔断等组合拳式解决方案,并附完整Java代码与秒杀实战,强调监控、降级及先写策略的重要性,最终形成可复制的缓存防御体系。
我以为缓存只是简单的查不到就去数据库,直到一次大促期间的缓存连环失效,让整个系统雪崩式崩溃,我才意识到缓存设计的复杂性远不止表面那么简单。
一、事故现场:从缓存失效到系统崩溃的全过程
1.1 问题代码还原
先看看当时有问题的缓存实现:
@Service
public class ProductService {
@Autowired
private RedisTemplate<String, Product> redisTemplate;
@Autowired
private ProductMapper productMapper;
/**
* 获取商品详情 - 最初的简单实现
*/
public Product getProduct(Long productId) {
// 1. 先查缓存
String cacheKey = "product:" + productId;
Product product = redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
return product;
}
// 2. 缓存没有,查数据库
product = productMapper.selectById(productId);
// 3. 写入缓存
if (product != null) {
redisTemplate.opsForValue().set(cacheKey, product, 5, TimeUnit.MINUTES);
}
return product;
}
/**
* 热门商品秒杀接口
*/
@Transactional
public boolean seckill(Long productId, Integer userId) {
// 检查库存
Product product = getProduct(productId); // 使用上面的缓存方法
if (product.getStock() <= 0) {
return false;
}
// 扣减库存
int rows = productMapper.decreaseStock(productId);
if (rows > 0) {
// 更新缓存中的库存
product.setStock(product.getStock() - 1);
redisTemplate.opsForValue().set("product:" + productId, product, 5, TimeUnit.MINUTES);
// 创建订单
createOrder(productId, userId);
return true;
}
return false;
}
}1.2 事故时间线分析
timeline
title 缓存雪崩事故时间线
section 事故前
00:00 : 开始预热缓存<br>大部分商品缓存5分钟过期
00:05 : 第一波缓存同时失效
section 事故中
00:05 : 大量请求穿透到数据库
00:06 : 数据库连接池耗尽
00:07 : 应用服务器线程池打满
00:10 : 订单服务完全瘫痪
section 事故后
00:30 : 人工介入重启服务
01:00 : 逐步恢复服务二、缓存穿透:当查询不存在的数据时
2.1 什么是缓存穿透?
缓存穿透是指查询一个一定不存在的数据,由于缓存中不命中,每次都要去数据库查询,失去了缓存的意义。
攻击场景:恶意用户用不存在的商品ID频繁请求。
// 模拟攻击代码
public class CachePenetrationAttack {
public void attack() {
// 攻击者使用不存在的ID发起请求
for (int i = 1000000; i < 1000100; i++) {
// 这些ID在数据库中都不存在
productService.getProduct((long) i);
}
}
}2.2 布隆过滤器实战方案
我实现的布隆过滤器解决方案:
@Component
public class BloomFilterService {
private final BitSet bitSet;
private final int size = 1000000; // 位图大小
private final int[] seeds = {3, 5, 7, 11, 13, 31, 37, 61}; // 哈希种子
public BloomFilterService() {
this.bitSet = new BitSet(size);
// 初始化时加载所有存在的ID
initializeBloomFilter();
}
private void initializeBloomFilter() {
// 从数据库加载所有存在的商品ID
List<Long> allProductIds = productMapper.selectAllIds();
for (Long id : allProductIds) {
add(id);
}
}
/**
* 添加元素到位图
*/
public void add(Long value) {
for (int seed : seeds) {
int hash = hash(value, seed);
bitSet.set(hash % size, true);
}
}
/**
* 判断元素是否存在
*/
public boolean contains(Long value) {
if (value == null) {
return false;
}
boolean result = true;
for (int seed : seeds) {
int hash = hash(value, seed);
if (!bitSet.get(hash % size)) {
result = false;
break;
}
}
return result;
}
/**
* 哈希函数
*/
private int hash(Long value, int seed) {
int result = 0;
String str = value.toString();
for (int i = 0; i < str.length(); i++) {
result = seed * result + str.charAt(i);
}
return (size - 1) & result; // 保证在size范围内
}
}2.3 Redis布隆过滤器实现
对于更大型的系统,我使用Redis的布隆过滤器模块:
# 安装RedisBloom模块
# docker run -p 6379:6379 --name redis-redisbloom redislabs/rebloom:latest
# Redis命令行操作
127.0.0.1:6379> BF.RESERVE product_bloom 0.01 1000000
OK
# 添加商品ID
127.0.0.1:6379> BF.ADD product_bloom 10001
(integer) 1
# 检查是否存在
127.0.0.1:6379> BF.EXISTS product_bloom 9999999
(integer) 0 # 不存在对应的Java代码:
@Component
public class RedisBloomFilter {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String BLOOM_FILTER_KEY = "product_bloom";
/**
* 初始化布隆过滤器
*/
public void initBloomFilter(List<Long> productIds) {
// 批量添加
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
for (Long id : productIds) {
connection.stringCommands().set(
("BF.ADD " + BLOOM_FILTER_KEY + " " + id).getBytes(),
new byte[0]
);
}
}
/**
* 检查是否存在
*/
public boolean mightContain(Long productId) {
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
byte[] result = connection.stringCommands().get(
("BF.EXISTS " + BLOOM_FILTER_KEY + " " + productId).getBytes()
);
return result != null && result.length > 0 && result[0] == 1;
}
}三、缓存击穿:热点Key的突然失效
3.1 什么是缓存击穿?
缓存击穿是指一个热点Key突然失效,导致大量请求同时打到数据库上。
典型场景:秒杀商品、热门文章等。
// 缓存击穿的模拟
public class CacheBreakdownSimulation {
public void simulate() {
// 假设商品10001是秒杀商品,缓存同时过期
Long hotProductId = 10001L;
// 1000个线程同时请求这个商品
ExecutorService executor = Executors.newFixedThreadPool(100);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
productService.getProduct(hotProductId);
});
}
}
}3.2 互斥锁解决方案
我的互斥锁实现:
@Service
public class ProductServiceV2 {
// 使用Redis分布式锁
private static final String LOCK_PREFIX = "lock:product:";
/**
* 改进版:使用互斥锁防止缓存击穿
*/
public Product getProductWithLock(Long productId) {
String cacheKey = "product:" + productId;
// 1. 先查缓存
Product product = redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
return product;
}
// 2. 获取分布式锁
String lockKey = LOCK_PREFIX + productId;
String lockValue = UUID.randomUUID().toString();
try {
// 尝试获取锁,设置3秒超时
Boolean locked = redisTemplate.opsForValue().setIfAbsent(
lockKey, lockValue, 3, TimeUnit.SECONDS
);
if (Boolean.TRUE.equals(locked)) {
// 获取锁成功,查数据库
product = productMapper.selectById(productId);
if (product != null) {
// 写入缓存,设置随机过期时间防止雪崩
int expireTime = 300 + new Random().nextInt(60); // 300-360秒
redisTemplate.opsForValue().set(cacheKey, product, expireTime, TimeUnit.SECONDS);
} else {
// 数据库不存在,缓存空值防止穿透
redisTemplate.opsForValue().set(cacheKey, new NullProduct(), 60, TimeUnit.SECONDS);
}
return product;
} else {
// 获取锁失败,等待重试
Thread.sleep(50);
return getProductWithLock(productId); // 递归重试
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("获取锁失败", e);
} finally {
// 释放锁,使用Lua脚本保证原子性
if (lockValue != null) {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(lockKey), lockValue);
}
}
}
}3.3 逻辑过期方案
对于更新不频繁的热点数据,我采用逻辑过期方案:
@Data
public class RedisData {
private Object data; // 实际数据
private Long expireTime; // 逻辑过期时间
}
@Service
public class ProductServiceV3 {
/**
* 逻辑过期方案
*/
public Product getProductWithLogicExpire(Long productId) {
String cacheKey = "product:" + productId;
// 1. 查询缓存
RedisData redisData = (RedisData) redisTemplate.opsForValue().get(cacheKey);
// 2. 判断是否逻辑过期
if (redisData == null) {
// 缓存不存在,走互斥锁逻辑初始化
return initProductCache(productId);
}
Product product = (Product) redisData.getData();
Long expireTime = redisData.getExpireTime();
if (expireTime > System.currentTimeMillis()) {
// 未过期,直接返回
return product;
} else {
// 已过期,异步重建
asyncRebuildCache(productId);
return product; // 返回旧数据
}
}
/**
* 异步重建缓存
*/
private void asyncRebuildCache(Long productId) {
CompletableFuture.runAsync(() -> {
// 获取互斥锁
String lockKey = "rebuild:product:" + productId;
Boolean locked = redisTemplate.opsForValue().setIfAbsent(
lockKey, "1", 10, TimeUnit.SECONDS
);
if (Boolean.TRUE.equals(locked)) {
try {
// 查询最新数据
Product product = productMapper.selectById(productId);
if (product != null) {
// 设置新的逻辑过期时间(比如5分钟后)
RedisData newData = new RedisData();
newData.setData(product);
newData.setExpireTime(System.currentTimeMillis() + 5 * 60 * 1000);
redisTemplate.opsForValue().set(
"product:" + productId,
newData
);
}
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
}
});
}
}四、缓存雪崩:大规模的缓存同时失效
4.1 什么是缓存雪崩?
缓存雪崩是指大量缓存Key在同一时间失效,导致所有请求都打到数据库,引起数据库压力激增。
// 缓存雪崩的模拟
public class CacheAvalancheSimulation {
public void simulate() {
// 假设我们有10000个商品,缓存都在00:00同时过期
for (int i = 1; i <= 10000; i++) {
// 所有请求在缓存失效后同时到达
new Thread(() -> {
productService.getProduct((long) (new Random().nextInt(10000) + 1));
}).start();
}
}
}4.2 多级缓存架构
我设计的本地缓存+Redis多级缓存方案:
@Configuration
public class CacheConfig {
@Bean
public CacheManager cacheManager() {
// 创建Caffeine本地缓存
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(Caffeine.newBuilder()
.initialCapacity(1000) // 初始容量
.maximumSize(10000) // 最大容量
.expireAfterWrite(5, TimeUnit.MINUTES) // 写入后过期时间
.recordStats()); // 开启统计
return cacheManager;
}
}
@Service
public class ProductServiceV4 {
@Autowired
private CacheManager cacheManager;
/**
* 多级缓存:本地缓存 + Redis
*/
public Product getProductWithMultiLevelCache(Long productId) {
// 1. 先查本地缓存
Cache localCache = cacheManager.getCache("products");
Product product = localCache.get(productId, Product.class);
if (product != null) {
return product;
}
// 2. 查Redis缓存
String redisKey = "product:" + productId;
product = redisTemplate.opsForValue().get(redisKey);
if (product != null) {
// 回填本地缓存
localCache.put(productId, product);
return product;
}
// 3. 查数据库(带互斥锁)
product = getProductFromDBWithLock(productId);
if (product != null) {
// 同时写入Redis和本地缓存
redisTemplate.opsForValue().set(
redisKey,
product,
300 + new Random().nextInt(60), // 随机过期时间
TimeUnit.SECONDS
);
localCache.put(productId, product);
}
return product;
}
}4.3 缓存预热与过期时间随机化
我的缓存预热和过期时间优化方案:
@Component
public class CacheWarmUp {
@Autowired
private ProductMapper productMapper;
@Autowired
private RedisTemplate<String, Product> redisTemplate;
/**
* 缓存预热:应用启动时加载热点数据
*/
@PostConstruct
public void warmUpCache() {
// 加载热门商品
List<Product> hotProducts = productMapper.selectHotProducts(100);
for (Product product : hotProducts) {
String key = "product:" + product.getId();
// 设置随机过期时间(4-6小时)
int expireHours = 4 + new Random().nextInt(3);
redisTemplate.opsForValue().set(
key,
product,
expireHours,
TimeUnit.HOURS
);
}
// 加载其他重要数据
warmUpOtherCaches();
}
/**
* 定时刷新缓存
*/
@Scheduled(cron = "0 0 */2 * * ?") // 每2小时执行一次
public void refreshCache() {
// 异步刷新
CompletableFuture.runAsync(() -> {
List<Product> products = productMapper.selectAll();
for (Product product : products) {
// 在缓存过期前刷新
refreshProductCache(product.getId());
}
});
}
/**
* 刷新单个商品缓存
*/
private void refreshProductCache(Long productId) {
String key = "product:" + productId;
Long ttl = redisTemplate.getExpire(key, TimeUnit.SECONDS);
// 如果剩余时间小于10分钟,刷新缓存
if (ttl != null && ttl < 600) {
Product product = productMapper.selectById(productId);
if (product != null) {
// 设置新的随机过期时间
int newTtl = 3600 + new Random().nextInt(1800); // 1-1.5小时
redisTemplate.opsForValue().set(key, product, newTtl, TimeUnit.SECONDS);
}
}
}
}五、一套组合拳:综合防御方案
5.1 完整的缓存策略实现
基于实战经验,我总结出的完整缓存方案:
@Service
@Slf4j
public class CacheStrategyService {
// 使用Guava Cache作为本地缓存
private final LoadingCache<Long, Optional<Product>> localCache;
private final BloomFilterService bloomFilter;
public CacheStrategyService() {
// 初始化布隆过滤器
this.bloomFilter = new BloomFilterService();
// 初始化本地缓存
this.localCache = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.refreshAfterWrite(1, TimeUnit.MINUTES)
.recordStats()
.build(new CacheLoader<Long, Optional<Product>>() {
@Override
public Optional<Product> load(Long productId) {
return loadFromRedis(productId);
}
});
}
/**
* 完整的缓存查询流程
*/
public Product getProductFullStrategy(Long productId) {
// 1. 布隆过滤器过滤不存在的ID
if (!bloomFilter.contains(productId)) {
log.warn("布隆过滤器拦截不存在的商品ID: {}", productId);
return null;
}
try {
// 2. 查本地缓存
Optional<Product> productOpt = localCache.get(productId);
if (productOpt.isPresent()) {
return productOpt.get();
}
// 3. 本地缓存没有,查Redis(带互斥锁)
return getFromRedisWithLock(productId);
} catch (ExecutionException e) {
log.error("本地缓存查询失败", e);
// 降级:直接查Redis
return getFromRedis(productId);
}
}
/**
* 带互斥锁的Redis查询
*/
private Product getFromRedisWithLock(Long productId) {
String redisKey = "product:" + productId;
String lockKey = "lock:product:" + productId;
// 先查一次Redis(可能有其他线程已经写入)
Product product = redisTemplate.opsForValue().get(redisKey);
if (product != null) {
return product;
}
// 获取分布式锁
String lockValue = UUID.randomUUID().toString();
try {
Boolean locked = redisTemplate.opsForValue().setIfAbsent(
lockKey, lockValue, 5, TimeUnit.SECONDS
);
if (Boolean.TRUE.equals(locked)) {
// 再次检查Redis(Double Check)
product = redisTemplate.opsForValue().get(redisKey);
if (product != null) {
return product;
}
// 查询数据库
product = productMapper.selectById(productId);
if (product != null) {
// 写入Redis,设置随机过期时间
int expireTime = 300 + ThreadLocalRandom.current().nextInt(120);
redisTemplate.opsForValue().set(
redisKey, product, expireTime, TimeUnit.SECONDS
);
// 更新本地缓存
localCache.put(productId, Optional.of(product));
} else {
// 数据库不存在,缓存空值(短时间)
redisTemplate.opsForValue().set(
redisKey, new NullProduct(), 60, TimeUnit.SECONDS
);
localCache.put(productId, Optional.empty());
}
return product;
} else {
// 获取锁失败,等待后重试
Thread.sleep(100);
return getFromRedis(productId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
// 释放锁
releaseLock(lockKey, lockValue);
}
}
/**
* 安全的锁释放(Lua脚本)
*/
private void releaseLock(String lockKey, String lockValue) {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(lockKey),
lockValue
);
}
}5.2 监控与熔断降级
完整的缓存监控体系:
@Component
public class CacheMonitor {
@Autowired
private MeterRegistry meterRegistry;
private final Counter cacheHitCounter;
private final Counter cacheMissCounter;
private final Timer cacheQueryTimer;
public CacheMonitor() {
// 初始化监控指标
this.cacheHitCounter = Counter.builder("cache.hits")
.description("缓存命中次数")
.register(meterRegistry);
this.cacheMissCounter = Counter.builder("cache.misses")
.description("缓存未命中次数")
.register(meterRegistry);
this.cacheQueryTimer = Timer.builder("cache.query.time")
.description("缓存查询耗时")
.register(meterRegistry);
}
/**
* 监控缓存查询
*/
public Product monitoredGetProduct(Long productId) {
return cacheQueryTimer.record(() -> {
Product product = cacheStrategyService.getProductFullStrategy(productId);
if (product != null) {
cacheHitCounter.increment();
} else {
cacheMissCounter.increment();
}
return product;
});
}
/**
* 熔断降级:当缓存故障时直接返回默认值
*/
@HystrixCommand(fallbackMethod = "getProductFallback")
public Product getProductWithCircuitBreaker(Long productId) {
return monitoredGetProduct(productId);
}
public Product getProductFallback(Long productId) {
log.warn("缓存服务熔断,返回默认商品信息,productId: {}", productId);
// 返回默认值或静态数据
return Product.defaultProduct(productId);
}
}六、实战案例:秒杀系统的缓存优化
基于上述策略,我重构了秒杀系统的缓存架构:
@Service
public class SeckillServiceV2 {
/**
* 秒杀优化版:三级缓存 + 库存预扣 + 异步下单
*/
public boolean seckillOptimized(Long productId, Integer userId) {
// 1. 库存预检查(本地缓存)
if (!checkStockLocal(productId)) {
return false;
}
// 2. Redis库存预扣减
Long stock = redisTemplate.opsForValue().decrement("stock:" + productId);
if (stock == null || stock < 0) {
// 库存不足,回滚
redisTemplate.opsForValue().increment("stock:" + productId);
return false;
}
// 3. 发送异步下单消息
sendOrderMessage(productId, userId);
return true;
}
/**
* 异步下单消费者
*/
@RabbitListener(queues = "seckill.order.queue")
public void processSeckillOrder(SeckillOrderMessage message) {
try {
// 真正的下单逻辑
createRealOrder(message.getProductId(), message.getUserId());
// 更新商品缓存
refreshProductCache(message.getProductId());
} catch (Exception e) {
// 下单失败,恢复库存
redisTemplate.opsForValue().increment("stock:" + message.getProductId());
log.error("秒杀下单失败", e);
}
}
/**
* 库存预热:秒杀开始前加载库存到Redis
*/
public void preheatStock(Long productId, Integer stock) {
// 设置库存,并设置合适的过期时间
redisTemplate.opsForValue().set(
"stock:" + productId,
stock,
2, // 秒杀活动持续2小时
TimeUnit.HOURS
);
// 预热商品详情
preheatProductDetail(productId);
}
}七、经验总结:缓存设计的心得
7.1 缓存设计原则
- 缓存不是银弹:不要为了缓存而缓存,要考虑维护成本和一致性成本
- 分级缓存:本地缓存 → Redis缓存 → 数据库,形成多级防御
- 缓存维度:根据业务场景选择合适的缓存粒度(对象级、列表级、页面级)
7.2 监控指标的重要性
必须监控的关键指标:
# 缓存监控指标
cache:
hit_rate: # 命中率,低于80%需要优化
avg_response_time: # 平均响应时间
qps: # 每秒查询量
memory_usage: # 内存使用率
eviction_count: # 淘汰次数7.3 常见误区与正确做法
// 错误做法:缓存与数据库双写不一致
public void updateProduct(Product product) {
// 先更新数据库
productMapper.update(product);
// 再删除缓存(可能失败)
redisTemplate.delete("product:" + product.getId());
}
// 正确做法:先删缓存,再更新数据库
public void updateProductCorrect(Product product) {
// 1. 先删除缓存
redisTemplate.delete("product:" + product.getId());
// 2. 再更新数据库
productMapper.update(product);
// 3. 可选:延迟双删
CompletableFuture.runDelayed(() -> {
redisTemplate.delete("product:" + product.getId());
}, 1000); // 1秒后再次删除
}总结
缓存穿透、击穿、雪崩不是三个孤立的问题,而是缓存系统在不同压力下的不同表现形态。经过这次惨痛的事故和后续的深入研究,我形成了完整的防御体系:
- 防穿透:布隆过滤器 + 空值缓存
- 防击穿:互斥锁 + 逻辑过期
- 防雪崩:多级缓存 + 过期时间随机化 + 热点数据预热
最重要的启示:缓存设计不是简单的技术选型,而是需要结合业务特点、流量规模、数据一致性要求等多方面因素的综合决策。真正的缓存优化不是在出问题后打补丁,而是在设计时就考虑到各种极端情况。
这次经历让我养成了在系统设计阶段就考虑缓存策略的习惯,也让我明白了监控和降级的重要性。技术的价值不在于掌握多少工具,而在于理解问题本质并形成体系化的解决方案。