本文将详细分享我在Redis缓存应用中的实战经验,包括缓存设计模式、缓存穿透/击穿/雪崩解决方案、分布式锁实现、缓存与数据库一致性保证等。每个知识点都配有真实的业务场景和代码实现。
1. 缓存设计模式选择
Cache-Aside模式(最常用):
@Service
public class ProductService {
private static final String PRODUCT_KEY_PREFIX = "product:";
private static final Duration CACHE_TTL = Duration.ofHours(2);
public Product getProductById(Long id) {
// 1. 先查缓存
String cacheKey = PRODUCT_KEY_PREFIX + id;
Product product = redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
return product;
}
// 2. 缓存未命中,查数据库
product = productMapper.selectById(id);
if (product != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set(cacheKey, product, CACHE_TTL);
}
return product;
}
public void updateProduct(Product product) {
// 1. 更新数据库
productMapper.update(product);
// 2. 删除缓存(后续查询会自动重建)
String cacheKey = PRODUCT_KEY_PREFIX + product.getId();
redisTemplate.delete(cacheKey);
}
}Read-Through/Write-Through模式:
@Component
public class CacheManager {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 读穿透:缓存不存在时自动加载
*/
public <T> T getOrLoad(String key, Class<T> type, Supplier<T> loader, Duration ttl) {
T value = (T) redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}
// 分布式锁,防止缓存击穿
String lockKey = "lock:" + key;
if (tryLock(lockKey)) {
try {
// 双重检查,防止重复加载
value = (T) redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}
// 加载数据
value = loader.get();
if (value != null) {
redisTemplate.opsForValue().set(key, value, ttl);
}
return value;
} finally {
releaseLock(lockKey);
}
} else {
// 未获取到锁,短暂等待后重试
try {
Thread.sleep(100);
return getOrLoad(key, type, loader, ttl);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return loader.get(); // 降级为直接查询
}
}
}
}2. 缓存问题解决方案
缓存穿透解决方案:
@Service
public class ProductService {
/**
* 解决缓存穿透:缓存空值+布隆过滤器
*/
public Product getProductByIdWithProtection(Long id) {
// 1. 参数校验
if (id == null || id <= 0) {
return null;
}
// 2. 布隆过滤器判断是否存在(防止恶意攻击)
if (!bloomFilter.mightContain(id)) {
return null;
}
String cacheKey = "product:" + id;
// 3. 查询缓存
Product product = redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
// 特殊标记表示空值
if (product.getId() == -1) {
return null;
}
return product;
}
// 4. 查询数据库
product = productMapper.selectById(id);
if (product == null) {
// 缓存空值,短暂过期时间
Product emptyProduct = new Product();
emptyProduct.setId(-1L); // 特殊标记
redisTemplate.opsForValue().set(cacheKey, emptyProduct, Duration.ofMinutes(5));
return null;
}
// 5. 写入缓存
redisTemplate.opsForValue().set(cacheKey, product, Duration.ofHours(2));
return product;
}
}缓存击穿解决方案:
@Service
public class HotProductService {
/**
* 解决缓存击穿:互斥锁
*/
public Product getHotProduct(Long id) {
String cacheKey = "hot_product:" + id;
String lockKey = "lock:hot_product:" + id;
// 1. 查缓存
Product product = redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
return product;
}
// 2. 尝试获取分布式锁
if (tryLock(lockKey, 3, TimeUnit.SECONDS)) {
try {
// 3. 双重检查
product = redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
return product;
}
// 4. 查询数据库
product = productMapper.selectById(id);
if (product != null) {
// 5. 热点数据永不过期,但定期更新
redisTemplate.opsForValue().set(cacheKey, product);
// 6. 设置逻辑过期时间
redisTemplate.opsForValue().set(cacheKey + ":expire",
System.currentTimeMillis() + 30 * 60 * 1000); // 30分钟后过期
}
return product;
} finally {
releaseLock(lockKey);
}
} else {
// 未获取到锁,短暂等待后重试
try {
Thread.sleep(100);
return getHotProduct(id);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return productMapper.selectById(id); // 降级
}
}
}
/**
* 定时任务:更新热点数据
*/
@Scheduled(fixedRate = 20 * 60 * 1000) // 20分钟执行一次
public void refreshHotProducts() {
Set<String> hotProductKeys = redisTemplate.keys("hot_product:*");
for (String key : hotProductKeys) {
String expireKey = key + ":expire";
Long expireTime = redisTemplate.opsForValue().get(expireKey);
if (expireTime != null && expireTime < System.currentTimeMillis()) {
// 异步更新缓存
CompletableFuture.runAsync(() -> updateHotProductCache(key));
}
}
}
}缓存雪崩解决方案:
@Service
public class ProductService {
/**
* 解决缓存雪崩:随机过期时间+集群部署
*/
public void setProductCache(Product product) {
String cacheKey = "product:" + product.getId();
// 基础过期时间 + 随机偏移量(0-300秒)
long baseTtl = 2 * 60 * 60; // 2小时
long randomOffset = ThreadLocalRandom.current().nextInt(0, 300);
Duration ttl = Duration.ofSeconds(baseTtl + randomOffset);
redisTemplate.opsForValue().set(cacheKey, product, ttl);
}
/**
* 多级缓存:本地缓存 + Redis缓存
*/
@Cacheable(value = "productCache", key = "#id")
public Product getProductWithMultiLevelCache(Long id) {
// 本地缓存未命中,查询Redis
return getProductFromRedis(id);
}
}3. 分布式锁实现
基于Redis的分布式锁:
@Component
public class RedisDistributedLock {
private static final String LOCK_PREFIX = "lock:";
private static final long DEFAULT_EXPIRE_TIME = 30; // 秒
private static final long DEFAULT_WAIT_TIME = 5; // 秒
/**
* 尝试获取锁
*/
public boolean tryLock(String lockKey, long expireTime, TimeUnit timeUnit) {
String key = LOCK_PREFIX + lockKey;
String value = generateLockValue();
// 使用SET NX EX命令,保证原子性
Boolean success = redisTemplate.execute((RedisCallback<Boolean>) connection -> {
RedisStringCommands commands = connection.stringCommands();
byte[] keyBytes = redisTemplate.getStringSerializer().serialize(key);
byte[] valueBytes = redisTemplate.getStringSerializer().serialize(value);
// SET key value NX EX timeout
return commands.set(keyBytes, valueBytes,
Expiration.seconds(timeUnit.toSeconds(expireTime)),
SetOption.SET_IF_ABSENT);
});
return Boolean.TRUE.equals(success);
}
/**
* 释放锁(Lua脚本保证原子性)
*/
public boolean releaseLock(String lockKey) {
String key = LOCK_PREFIX + lockKey;
String lockValue = getLockValue(Thread.currentThread().getId());
// Lua脚本:只有锁的值匹配时才删除
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(luaScript);
script.setResultType(Long.class);
Long result = redisTemplate.execute(script, Arrays.asList(key), lockValue);
return result != null && result == 1;
}
/**
* 可重入锁实现
*/
public boolean tryReentrantLock(String lockKey, long expireTime, TimeUnit timeUnit) {
String key = LOCK_PREFIX + lockKey;
String currentValue = getLockValue(Thread.currentThread().getId());
String existingValue = redisTemplate.opsForValue().get(key);
if (existingValue != null && existingValue.equals(currentValue)) {
// 重入:增加计数
redisTemplate.expire(key, expireTime, timeUnit);
return true;
}
return tryLock(lockKey, expireTime, timeUnit);
}
private String generateLockValue() {
return Thread.currentThread().getId() + ":" + UUID.randomUUID().toString();
}
private String getLockValue(long threadId) {
// 简化实现,实际应该存储线程ID和计数
return threadId + ":" + UUID.randomUUID().toString();
}
}4. 缓存与数据库一致性
双写一致性方案:
@Service
public class ProductService {
/**
* 先更新数据库,再删除缓存
*/
@Transactional
public void updateProduct(Product product) {
// 1. 更新数据库
productMapper.update(product);
// 2. 删除缓存
String cacheKey = "product:" + product.getId();
redisTemplate.delete(cacheKey);
// 3. 发送缓存失效消息
rocketMQTemplate.send("cache-invalidate-topic",
MessageBuilder.withPayload(cacheKey).build());
}
/**
* 延迟双删:解决读写并发问题
*/
@Transactional
public void updateProductWithDoubleDelete(Product product) {
String cacheKey = "product:" + product.getId();
// 第一次删除
redisTemplate.delete(cacheKey);
// 更新数据库
productMapper.update(product);
// 异步延迟第二次删除
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000); // 延迟1秒
redisTemplate.delete(cacheKey);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
/**
* 监听数据库binlog实现最终一致性
*/
@Component
@Slf4j
public class BinlogCacheSyncListener {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 监听数据库变更,同步缓存
*/
@EventListener
public void onDatabaseChange(DatabaseChangeEvent event) {
if ("product".equals(event.getTableName())) {
String cacheKey = "product:" + event.getPrimaryKey();
if ("DELETE".equals(event.getOperation())) {
// 删除操作:直接删除缓存
redisTemplate.delete(cacheKey);
} else if ("INSERT".equals(event.getOperation()) || "UPDATE".equals(event.getOperation())) {
// 插入/更新操作:删除缓存,下次查询自动重建
redisTemplate.delete(cacheKey);
// 或者异步更新缓存
if (isHotData(event.getPrimaryKey())) {
updateCacheAsync(cacheKey, event.getPrimaryKey());
}
}
}
}
private void updateCacheAsync(String cacheKey, Long productId) {
CompletableFuture.runAsync(() -> {
try {
Product product = productMapper.selectById(productId);
if (product != null) {
redisTemplate.opsForValue().set(cacheKey, product, Duration.ofHours(2));
}
} catch (Exception e) {
log.error("异步更新缓存失败,key: {}", cacheKey, e);
}
});
}
}5. Redis高级数据结构应用
使用ZSet实现排行榜:
@Service
public class LeaderboardService {
private static final String LEADERBOARD_KEY = "leaderboard:product:sales";
/**
* 更新商品销量排行榜
*/
public void updateProductSales(Long productId, int sales) {
redisTemplate.opsForZSet().add(LEADERBOARD_KEY, productId.toString(), sales);
// 只保留前1000名
redisTemplate.opsForZSet().removeRange(LEADERBOARD_KEY, 0, -1001);
}
/**
* 获取销量前十的商品
*/
public List<Long> getTop10Products() {
Set<ZSetOperations.TypedTuple<String>> tuples =
redisTemplate.opsForZSet().reverseRangeWithScores(LEADERBOARD_KEY, 0, 9);
return tuples.stream()
.map(tuple -> Long.valueOf(tuple.getValue()))
.collect(Collectors.toList());
}
/**
* 获取商品排名
*/
public Long getProductRank(Long productId) {
Long rank = redisTemplate.opsForZSet().reverseRank(LEADERBOARD_KEY, productId.toString());
return rank != null ? rank + 1 : null; // 从1开始排名
}
}使用HyperLogLog统计UV:
@Service
public class UVStatisticsService {
/**
* 统计页面UV(独立访客)
*/
public void recordPageView(String pageId, String userId) {
String key = "uv:page:" + pageId + ":" + LocalDate.now().toString();
redisTemplate.opsForHyperLogLog().add(key, userId);
// 设置过期时间
redisTemplate.expire(key, Duration.ofDays(2));
}
/**
* 获取页面UV
*/
public long getPageUV(String pageId, LocalDate date) {
String key = "uv:page:" + pageId + ":" + date.toString();
Long count = redisTemplate.opsForHyperLogLog().size(key);
return count != null ? count : 0;
}
/**
* 合并多天的UV统计
*/
public long getPageUV(String pageId, LocalDate startDate, LocalDate endDate) {
List<String> keys = new ArrayList<>();
LocalDate current = startDate;
while (!current.isAfter(endDate)) {
keys.add("uv:page:" + pageId + ":" + current.toString());
current = current.plusDays(1);
}
String unionKey = "uv:page:" + pageId + ":union";
redisTemplate.opsForHyperLogLog().union(unionKey, keys.toArray(new String[0]));
Long count = redisTemplate.opsForHyperLogLog().size(unionKey);
redisTemplate.delete(unionKey); // 清理临时key
return count != null ? count : 0;
}
}6. Redis集群和监控
集群配置和故障转移:
spring:
redis:
cluster:
nodes:
- 192.168.1.101:6379
- 192.168.1.102:6379
- 192.168.1.103:6379
max-redirects: 3
lettuce:
pool:
max-active: 20
max-wait: -1ms
max-idle: 10
min-idle: 0
cluster:
refresh:
adaptive: true
period: 2000监控和告警:
@Component
@Slf4j
public class RedisMonitor {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Scheduled(fixedRate = 60000) // 每分钟监控一次
public void monitorRedisHealth() {
try {
// 检查连接性
redisTemplate.opsForValue().size("health_check");
// 检查内存使用率
Properties info = redisTemplate.getRequiredConnectionFactory()
.getConnection().info("memory");
String usedMemory = info.getProperty("used_memory");
String maxMemory = info.getProperty("maxmemory");
long used = Long.parseLong(usedMemory);
long max = Long.parseLong(maxMemory);
double usageRate = (double) used / max;
if (usageRate > 0.8) {
log.warn("Redis内存使用率过高: {}%", usageRate * 100);
// 发送告警
}
// 检查键空间大小
Long keyCount = redisTemplate.getConnectionFactory()
.getConnection().dbSize();
if (keyCount > 1000000) {
log.warn("Redis键数量过多: {}", keyCount);
}
} catch (Exception e) {
log.error("Redis监控异常", e);
// 发送紧急告警
}
}
}7. 最佳实践总结
- 选择合适的缓存策略:Cache-Aside最通用
- 预防缓存异常:穿透、击穿、雪崩
- 保证数据一致性:延迟双删、监听binlog
- 合理设置过期时间:热点数据永不过期+逻辑过期
- 使用合适的数据结构:String、Hash、ZSet、HyperLogLog
- 监控集群健康:内存、连接数、键数量
- 设计降级方案:缓存失效时直接查库
总结:
Redis是提升系统性能的利器,但使用不当也会成为系统的瓶颈。两年的实战经验让我深刻理解到,缓存设计不仅仅是技术选型,更是业务场景、数据特性和系统架构的综合考量。 从简单的键值存储到复杂的分布式锁,从基本的数据缓存到高级的统计功能,每个功能都应该服务于具体的业务价值。