
前言
在高并发场景下,Redis 缓存层是保护数据库的第一道屏障。然而,缓存并非万能——一旦出现穿透、击穿或雪崩,请求洪流将绕过缓存直冲数据库,轻则接口响应超时、服务降级,重则数据库被打垮、系统全面崩溃。
这三大问题在面试中频繁出现,在生产环境中也真实存在。本文以 .NET (C#) 为技术栈,从问题的本质成因出发,结合架构流程图与可落地的代码实现,系统拆解每一个问题的应对策略,帮助你真正理解并掌握这套防护体系。
系统架构背景
在典型的高并发系统中,Redis 作为缓存层位于应用服务与数据库之间,承担着拦截大量重复读请求的职责。正常的请求链路如下:
客户端请求
│
▼
[ 应用服务层 (ASP.NET Core) ]
│
├──命中──► [ Redis 缓存 ] ──► 返回数据
│
└──未命中─► [ 数据库 (MySQL/SQL Server) ] ──► 写入缓存 ──► 返回数据三大缓存问题的本质,都是缓存层失去屏障作用,导致请求洪流直接冲击数据库。
缓存穿透(Cache Penetration)
2.1 概念与成因
缓存穿透是指客户端请求的数据在 Redis 缓存和数据库中均不存在。由于缓存永远无法命中,每次请求都会穿透缓存直达数据库,且查询结果为空,无法写入缓存,形成恶性循环。
这种场景最常见于黑客攻击:攻击者构造大量不存在的 ID(如负数 ID、随机字符串),发起海量请求,将数据库直接打垮。
攻击请求 (ID=-1, ID=-999...)
│
▼
[ Redis 缓存 ] ── 未命中 ──►
│
▼
[ 数据库 ] ── 查无此数据 ──► 无法写入缓存 ──► 下次请求再次穿透
2.2 解决方案一:缓存空值
最简单直接的方案:当数据库查询结果为空时,依然将空值写入 Redis,并设置一个较短的 TTL(如 2~5 分钟),防止后续相同请求继续打穿数据库。
// .NET 实现:缓存空值策略
public async Task<Product?> GetProductByIdAsync(int productId)
{
var cacheKey = $"product:{productId}";
var db = _redis.GetDatabase();
// 1. 查询缓存
var cachedValue = await db.StringGetAsync(cacheKey);
if (cachedValue.HasValue)
{
// 命中缓存(包括空值标记)
if (cachedValue == "NULL_PLACEHOLDER")
return null; // 返回空,避免穿透数据库
return JsonSerializer.Deserialize<Product>(cachedValue!);
}
// 2. 缓存未命中,查询数据库
var product = await _dbContext.Products.FindAsync(productId);
if (product == null)
{
// 3. 数据库也不存在,缓存空值占位符,TTL 设短(防止浪费内存)
await db.StringSetAsync(cacheKey, "NULL_PLACEHOLDER", TimeSpan.FromMinutes(2));
return null;
}
// 4. 数据存在,正常写入缓存
await db.StringSetAsync(cacheKey, JsonSerializer.Serialize(product), TimeSpan.FromMinutes(30));
return product;
}
优点: 实现简单,改动小。
缺点: 会占用额外 Redis 内存;数据库新增数据后,在缓存过期前存在短暂不一致窗口。
2.3 解决方案二:布隆过滤器(Bloom Filter)
布隆过滤器是一种空间效率极高的概率型数据结构,能以极小的内存代价判断一个元素"一定不存在"或"可能存在"。
工作原理:
数据写入时:
数据 x ──► Hash函数1 ──► Bitmap 位置 1 置 1
Hash函数2 ──► Bitmap 位置 4 置 1
Hash函数3 ──► Bitmap 位置 6 置 1
查询时:
查询 y ──► 计算对应 Bitmap 位置
├── 所有位置均为 1 ──► "可能存在",放行查询
└── 任一位置为 0 ──► "一定不存在",直接拒绝
在 .NET 中,可以使用 StackExchange.Redis 结合 Redis 的 BF.ADD / BF.EXISTS 命令(需要 RedisBloom 模块),或者使用纯 C# 实现:
// 安装 NuGet 包:StackExchange.Redis
// 使用 Redis BitField 实现布隆过滤器
public class RedisBloomFilter
{
private readonly IDatabase _db;
private readonly string _key;
private readonly int _bitSize;
private readonly int _hashCount;
public RedisBloomFilter(IDatabase db, string key, int bitSize = 1 << 24, int hashCount = 5)
{
_db = db;
_key = key;
_bitSize = bitSize;
_hashCount = hashCount;
}
// 将数据加入布隆过滤器
public async Task AddAsync(string value)
{
var positions = GetBitPositions(value);
var batch = _db.CreateBatch();
var tasks = positions.Select(pos => batch.StringSetBitAsync(_key, pos, true));
batch.Execute();
await Task.WhenAll(tasks);
}
// 判断数据是否可能存在
public async Task<bool> MightExistAsync(string value)
{
var positions = GetBitPositions(value);
var tasks = positions.Select(pos => _db.StringGetBitAsync(_key, pos));
var results = await Task.WhenAll(tasks);
return results.All(r => r); // 所有位均为1才"可能存在"
}
private IEnumerable<long> GetBitPositions(string value)
{
for (int i = 0; i < _hashCount; i++)
{
// 使用不同种子生成多个哈希值
var hash = MurmurHash3(value, (uint)(i * 2654435761));
yield return Math.Abs((long)(hash % (uint)_bitSize));
}
}
private static uint MurmurHash3(string input, uint seed)
{
var data = Encoding.UTF8.GetBytes(input);
uint h1 = seed;
const uint c1 = 0xcc9e2d51, c2 = 0x1b873593;
int len = data.Length;
int remaining = len & 3;
int blocks = len >> 2;
for (int i = 0; i < blocks; i++)
{
uint k1 = BitConverter.ToUInt32(data, i * 4);
k1 *= c1; k1 = RotateLeft(k1, 15); k1 *= c2;
h1 ^= k1; h1 = RotateLeft(h1, 13);
h1 = h1 * 5 + 0xe6546b64;
}
h1 ^= (uint)len;
h1 ^= h1 >> 16; h1 *= 0x85ebca6b;
h1 ^= h1 >> 13; h1 *= 0xc2b2ae35;
h1 ^= h1 >> 16;
return h1;
}
private static uint RotateLeft(uint x, int n) => (x << n) | (x >> (32 - n));
}
在服务层中集成布隆过滤器:
public class ProductService
{
private readonly RedisBloomFilter _bloomFilter;
private readonly IDatabase _redisDb;
private readonly AppDbContext _dbContext;
public async Task<Product?> GetProductAsync(int productId)
{
var idStr = productId.ToString();
// 第一关:布隆过滤器拦截
if (!await _bloomFilter.MightExistAsync(idStr))
{
// 布隆过滤器判定不存在,直接返回,绝不访问数据库
return null;
}
// 第二关:查询 Redis 缓存
var cacheKey = $"product:{productId}";
var cached = await _redisDb.StringGetAsync(cacheKey);
if (cached.HasValue)
return JsonSerializer.Deserialize<Product>(cached!);
// 第三关:查询数据库并回写缓存
var product = await _dbContext.Products.FindAsync(productId);
if (product != null)
await _redisDb.StringSetAsync(cacheKey, JsonSerializer.Serialize(product), TimeSpan.FromMinutes(30));
return product;
}
// 新增数据时,同步更新布隆过滤器
public async Task AddProductAsync(Product product)
{
await _dbContext.Products.AddAsync(product);
await _dbContext.SaveChangesAsync();
await _bloomFilter.AddAsync(product.Id.ToString()); // 同步写入布隆过滤器
}
}
缓存击穿(Cache Breakdown)
3.1 概念与成因
缓存击穿是指某个高并发访问的热点 Key 在某一瞬间过期失效,导致大量并发请求同时穿透缓存,蜂拥至数据库,造成数据库瞬间压力剧增。
与穿透的核心区别在于:数据库中确实存在该数据,且并发量极高,是针对单一热点 Key 的精准冲击。
热点Key过期的瞬间:
并发请求 x1000
│
▼
[ Redis ] ── Key 已过期,全部未命中 ──►
│
▼
[ 数据库 ] ◄── 1000个并发同时查询 ──── 数据库崩溃风险!
3.2 解决方案一:互斥锁(Mutex Lock)
核心思路:缓存未命中时,只允许一个线程去重建缓存,其他线程等待或重试,避免大量请求同时打到数据库。
线程1 ──► 缓存未命中 ──► 获取互斥锁成功 ──► 查询DB ──► 重建缓存 ──► 释放锁
线程2 ──► 缓存未命中 ──► 获取互斥锁失败 ──► 等待/重试 ──► 命中缓存 ──► 返回
线程3 ──► 缓存未命中 ──► 获取互斥锁失败 ──► 等待/重试 ──► 命中缓存 ──► 返回
public class CacheBreakdownMutexService
{
private readonly IDatabase _redisDb;
private readonly AppDbContext _dbContext;
// 使用 Redis SETNX 实现分布式互斥锁
private async Task<bool> TryAcquireLockAsync(string lockKey, TimeSpan expiry)
{
return await _redisDb.StringSetAsync(lockKey, "1", expiry, When.NotExists);
}
private async Task ReleaseLockAsync(string lockKey)
{
await _redisDb.KeyDeleteAsync(lockKey);
}
public async Task<Product?> GetProductWithMutexAsync(int productId)
{
var cacheKey = $"product:{productId}";
var lockKey = $"lock:product:{productId}";
// 1. 查询缓存
var cached = await _redisDb.StringGetAsync(cacheKey);
if (cached.HasValue)
return JsonSerializer.Deserialize<Product>(cached!);
// 2. 缓存未命中,尝试获取互斥锁
while (true)
{
if (await TryAcquireLockAsync(lockKey, TimeSpan.FromSeconds(10)))
{
try
{
// Double-Check:获取锁后再次检查缓存(防止重复重建)
cached = await _redisDb.StringGetAsync(cacheKey);
if (cached.HasValue)
return JsonSerializer.Deserialize<Product>(cached!);
// 3. 查询数据库,重建缓存
var product = await _dbContext.Products.FindAsync(productId);
if (product != null)
{
await _redisDb.StringSetAsync(
cacheKey,
JsonSerializer.Serialize(product),
TimeSpan.FromMinutes(30)
);
}
return product;
}
finally
{
// 4. 释放锁(无论成功失败都要释放)
await ReleaseLockAsync(lockKey);
}
}
else
{
// 未获取到锁,短暂等待后重试
await Task.Delay(50);
// 重试时先尝试从缓存读取(可能已被其他线程重建)
cached = await _redisDb.StringGetAsync(cacheKey);
if (cached.HasValue)
return JsonSerializer.Deserialize<Product>(cached!);
}
}
}
}
优点: 强一致性,数据库只会被一个线程查询。
缺点: 高并发下大量线程阻塞等待,系统吞吐量下降,响应时间变长。
3.3 解决方案二:逻辑过期(Logical Expiration)
核心思路:不给 Redis Key 设置物理 TTL,而是在 Value 内部存储一个"逻辑过期时间"字段。查询时检测是否逻辑过期,若过期则立即返回旧数据,同时异步启动一个后台线程去刷新缓存,实现最终一致性。
线程1 ──► 发现逻辑过期 ──► 获取互斥锁 ──► 立即返回旧数据
│
└──► 异步线程 ──► 查询DB ──► 更新缓存 ──► 释放锁
线程2 ──► 发现逻辑过期 ──► 获取锁失败 ──► 直接返回旧数据(无需等待)
线程3 ──► 发现逻辑过期 ──► 获取锁失败 ──► 直接返回旧数据(无需等待)
// 带逻辑过期时间的缓存包装类
public class LogicalExpireEntry<T>
{
public T Data { get; set; } = default!;
public DateTime ExpireTime { get; set; }
}
public class LogicalExpireService
{
private readonly IDatabase _redisDb;
private readonly AppDbContext _dbContext;
// 写入缓存时设置逻辑过期时间(Redis Key 本身不设 TTL 或设很长)
public async Task SetWithLogicalExpireAsync<T>(string key, T data, TimeSpan logicalTtl)
{
var entry = new LogicalExpireEntry<T>
{
Data = data,
ExpireTime = DateTime.UtcNow.Add(logicalTtl)
};
// Redis Key 物理上永不过期(或设置很长的兜底时间)
await _redisDb.StringSetAsync(key, JsonSerializer.Serialize(entry));
}
public async Task<Product?> GetProductWithLogicalExpireAsync(int productId)
{
var cacheKey = $"product:logical:{productId}";
var lockKey = $"lock:logical:{productId}";
// 1. 查询缓存
var cached = await _redisDb.StringGetAsync(cacheKey);
// 缓存中无数据(冷启动),直接查库(此处可结合预热策略)
if (!cached.HasValue) return null;
var entry = JsonSerializer.Deserialize<LogicalExpireEntry<Product>>(cached!);
// 2. 检查是否逻辑过期
if (DateTime.UtcNow <= entry!.ExpireTime)
{
// 未过期,直接返回
return entry.Data;
}
// 3. 已逻辑过期,尝试获取互斥锁
bool lockAcquired = await _redisDb.StringSetAsync(lockKey, "1", TimeSpan.FromSeconds(10), When.NotExists);
if (lockAcquired)
{
// 4. 获取锁成功,异步刷新缓存(当前线程立即返回旧数据)
_ = Task.Run(async () =>
{
try
{
var freshProduct = await _dbContext.Products.FindAsync(productId);
if (freshProduct != null)
await SetWithLogicalExpireAsync(cacheKey, freshProduct, TimeSpan.FromMinutes(30));
}
finally
{
await _redisDb.KeyDeleteAsync(lockKey);
}
});
}
// 5. 无论是否获取到锁,都立即返回旧数据,不阻塞请求
return entry.Data;
}
}
优点: 线程无阻塞,系统吞吐量高,用户体验好。
缺点: 牺牲强一致性,存在短暂的数据旧值窗口;需要提前进行缓存预热。
3.4 两种方案对比
缓存雪崩(Cache Avalanche)
4.1 概念与成因
缓存雪崩有两种触发场景:
场景一:大量 Key 同时过期。 在系统初始化时(如每日凌晨批量刷新),大量缓存被设置了相同的过期时间,导致在某一时刻集体失效,海量请求同时涌向数据库。
场景二:Redis 节点宕机。 Redis 服务本身发生故障,缓存层完全失效,所有请求直接打到数据库,引发系统性崩溃。
场景一:
t=0:00 批量写入缓存,TTL 均设为 1 小时
t=1:00 所有 Key 同时过期 ──► 全量请求打到数据库 ──► 数据库宕机
场景二:
Redis 主节点宕机
│
▼
所有请求无法命中缓存 ──► 全量打到数据库 ──► 数据库宕机 ──► 系统崩溃
4.2 解决方案一:过期时间随机打散
在设置缓存 TTL 时,在基础过期时间上叠加一个随机偏移量,让不同 Key 的过期时间分散开来,避免集体失效。
public class CacheAvalancheService
{
private readonly IDatabase _redisDb;
private readonly Random _random = new Random();
// 带随机抖动的缓存写入
public async Task SetWithJitterAsync<T>(string key, T data, TimeSpan baseTtl, int jitterSeconds = 300)
{
// 在基础 TTL 上随机增加 0~jitterSeconds 秒的抖动
var jitter = TimeSpan.FromSeconds(_random.Next(0, jitterSeconds));
var actualTtl = baseTtl + jitter;
await _redisDb.StringSetAsync(key, JsonSerializer.Serialize(data), actualTtl);
}
// 批量写入缓存时使用随机 TTL
public async Task BatchSetProductsAsync(IEnumerable<Product> products)
{
var batch = _redisDb.CreateBatch();
var tasks = new List<Task>();
foreach (var product in products)
{
var key = $"product:{product.Id}";
var value = JsonSerializer.Serialize(product);
// 基础 30 分钟 + 随机 0~10 分钟抖动
var ttl = TimeSpan.FromMinutes(30) + TimeSpan.FromSeconds(_random.Next(0, 600));
tasks.Add(batch.StringSetAsync(key, value, ttl));
}
batch.Execute();
await Task.WhenAll(tasks);
}
}
4.3 解决方案二:服务熔断与限流降级
在 .NET 中,可以使用 Polly 库实现熔断(Circuit Breaker)和限流(Rate Limiting)策略,作为缓存雪崩的兜底防线。
# 安装 Polly NuGet 包
dotnet add package Polly
dotnet add package Microsoft.Extensions.Http.Polly
// Program.cs / Startup.cs 中注册 Polly 策略
builder.Services.AddHttpClient("ProductApi")
.AddPolicyHandler(GetCircuitBreakerPolicy());
static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5, // 连续5次失败后熔断
durationOfBreak: TimeSpan.FromSeconds(30) // 熔断持续30秒
);
}
// 在服务层使用 Polly 包装数据库查询
public class ResilientProductService
{
private readonly AsyncCircuitBreakerPolicy _circuitBreaker;
private readonly AsyncRateLimitPolicy _rateLimiter;
public ResilientProductService()
{
// 熔断策略:连续3次异常后熔断,30秒后尝试恢复
_circuitBreaker = Policy
.Handle<Exception>()
.CircuitBreakerAsync(3, TimeSpan.FromSeconds(30),
onBreak: (ex, duration) =>
Console.WriteLine($"[熔断] 电路已断开,持续 {duration.TotalSeconds}s"),
onReset: () =>
Console.WriteLine("[熔断] 电路已恢复"));
// 限流策略:每秒最多100个请求
_rateLimiter = Policy.RateLimitAsync(100, TimeSpan.FromSeconds(1));
}
public async Task<Product?> GetProductSafeAsync(int productId)
{
try
{
return await _rateLimiter.ExecuteAsync(async () =>
await _circuitBreaker.ExecuteAsync(async () =>
await QueryDatabaseAsync(productId)
)
);
}
catch (BrokenCircuitException)
{
// 熔断期间返回降级数据(默认值/缓存旧值)
Console.WriteLine("[降级] 返回默认数据");
return GetFallbackProduct(productId);
}
catch (RateLimitRejectedException)
{
// 被限流,返回提示
throw new Exception("系统繁忙,请稍后重试");
}
}
private Product GetFallbackProduct(int id) =>
new Product { Id = id, Name = "暂时不可用", Price = 0 };
private async Task<Product?> QueryDatabaseAsync(int id) =>
await Task.FromResult(new Product()); // 实际查询逻辑
}
4.4 解决方案三:多级缓存架构
采用"本地内存缓存(L1)+ Redis 分布式缓存(L2)"的两级架构,即使 Redis 出现问题,本地缓存仍能顶住第一波压力。
// 安装:dotnet add package Microsoft.Extensions.Caching.Memory
public class MultiLevelCacheService
{
private readonly IMemoryCache _localCache; // L1: 本地内存缓存 (Caffeine-like)
private readonly IDatabase _redisDb; // L2: Redis 分布式缓存
private readonly AppDbContext _dbContext;
public MultiLevelCacheService(IMemoryCache localCache, IConnectionMultiplexer redis, AppDbContext db)
{
_localCache = localCache;
_redisDb = redis.GetDatabase();
_dbContext = db;
}
public async Task<Product?> GetProductAsync(int productId)
{
var cacheKey = $"product:{productId}";
// L1:查本地内存缓存(最快,无网络开销)
if (_localCache.TryGetValue(cacheKey, out Product? localProduct))
{
Console.WriteLine("L1 命中");
return localProduct;
}
// L2:查 Redis 分布式缓存
var redisValue = await _redisDb.StringGetAsync(cacheKey);
if (redisValue.HasValue)
{
Console.WriteLine("L2 命中");
var product = JsonSerializer.Deserialize<Product>(redisValue!);
// 回填本地缓存(L2 命中后同步到 L1,TTL 设短)
_localCache.Set(cacheKey, product, TimeSpan.FromMinutes(1));
return product;
}
// L1 & L2 均未命中,查询数据库
Console.WriteLine("穿透到数据库");
var dbProduct = await _dbContext.Products.FindAsync(productId);
if (dbProduct != null)
{
// 同时写入 L1 和 L2
_localCache.Set(cacheKey, dbProduct, TimeSpan.FromMinutes(1));
await _redisDb.StringSetAsync(cacheKey, JsonSerializer.Serialize(dbProduct), TimeSpan.FromMinutes(30));
}
return dbProduct;
}
}
4.5 解决方案四:Redis 高可用集群
从基础设施层面,通过部署 Redis Sentinel(哨兵模式) 或 Redis Cluster(集群模式) 实现高可用,避免单点故障。
// Program.cs 中配置 Redis Sentinel(哨兵模式)
builder.Services.AddSingleton<IConnectionMultiplexer>(sp =>
{
var config = new ConfigurationOptions
{
// 配置哨兵节点
EndPoints =
{
{ "sentinel-host-1", 26379 },
{ "sentinel-host-2", 26379 },
{ "sentinel-host-3", 26379 }
},
ServiceName = "mymaster", // 主节点名称
Password = "your-redis-password",
AbortOnConnectFail = false, // 连接失败时不抛出异常
ConnectRetry = 3, // 重试次数
ReconnectRetryPolicy = new LinearRetry(1000) // 线性重试间隔
};
return ConnectionMultiplexer.Connect(config);
});