×

.NET 实战:Redis 缓存穿透、击穿与雪崩的原理剖析与解决方案

独孤求败 独孤求败 发表于2026-04-24 09:12:54 浏览31 评论0

抢沙发发表评论

微信图片_2026-04-24_091310_183.jpg

前言

在高并发场景下,Redis 缓存层是保护数据库的第一道屏障。然而,缓存并非万能——一旦出现穿透、击穿或雪崩,请求洪流将绕过缓存直冲数据库,轻则接口响应超时、服务降级,重则数据库被打垮、系统全面崩溃。

这三大问题在面试中频繁出现,在生产环境中也真实存在。本文以 .NET (C#) 为技术栈,从问题的本质成因出发,结合架构流程图与可落地的代码实现,系统拆解每一个问题的应对策略,帮助你真正理解并掌握这套防护体系。

系统架构背景

在典型的高并发系统中,Redis 作为缓存层位于应用服务与数据库之间,承担着拦截大量重复读请求的职责。正常的请求链路如下:

客户端请求
    │
    ▼
[ 应用服务层 (ASP.NET Core) ]
    │
    ├──命中──► [ Redis 缓存 ]  ──► 返回数据
    │
    └──未命中─► [ 数据库 (MySQL/SQL Server) ] ──► 写入缓存 ──► 返回数据

三大缓存问题的本质,都是缓存层失去屏障作用,导致请求洪流直接冲击数据库。

缓存穿透(Cache Penetration)

2.1 概念与成因

缓存穿透是指客户端请求的数据在 Redis 缓存和数据库中均不存在。由于缓存永远无法命中,每次请求都会穿透缓存直达数据库,且查询结果为空,无法写入缓存,形成恶性循环。

这种场景最常见于黑客攻击:攻击者构造大量不存在的 ID(如负数 ID、随机字符串),发起海量请求,将数据库直接打垮。

攻击请求 (ID=-1, ID=-999...)
    │
    ▼
[ Redis 缓存 ] ── 未命中 ──►
    │
    ▼
[ 数据库 ] ── 查无此数据 ──► 无法写入缓存 ──► 下次请求再次穿透

2.2 解决方案一:缓存空值

最简单直接的方案:当数据库查询结果为空时,依然将空值写入 Redis,并设置一个较短的 TTL(如 2~5 分钟),防止后续相同请求继续打穿数据库。

// .NET 实现:缓存空值策略
public async Task<Product?> GetProductByIdAsync(int productId)
{
    var cacheKey = $"product:{productId}";
    var db = _redis.GetDatabase();

    // 1. 查询缓存
    var cachedValue = await db.StringGetAsync(cacheKey);

    if (cachedValue.HasValue)
    {
        // 命中缓存(包括空值标记)
        if (cachedValue == "NULL_PLACEHOLDER")
            return null// 返回空,避免穿透数据库

        return JsonSerializer.Deserialize<Product>(cachedValue!);
    }

    // 2. 缓存未命中,查询数据库
    var product = await _dbContext.Products.FindAsync(productId);

    if (product == null)
    {
        // 3. 数据库也不存在,缓存空值占位符,TTL 设短(防止浪费内存)
        await db.StringSetAsync(cacheKey, "NULL_PLACEHOLDER", TimeSpan.FromMinutes(2));
        return null;
    }

    // 4. 数据存在,正常写入缓存
    await db.StringSetAsync(cacheKey, JsonSerializer.Serialize(product), TimeSpan.FromMinutes(30));
    return product;
}

优点: 实现简单,改动小。
缺点: 会占用额外 Redis 内存;数据库新增数据后,在缓存过期前存在短暂不一致窗口。

2.3 解决方案二:布隆过滤器(Bloom Filter

布隆过滤器是一种空间效率极高的概率型数据结构,能以极小的内存代价判断一个元素"一定不存在"或"可能存在"。

工作原理:

数据写入时:
  数据 x  ──► Hash函数1 ──► Bitmap 位置 1 置 1
             Hash函数2 ──► Bitmap 位置 4 置 1
             Hash函数3 ──► Bitmap 位置 6 置 1

查询时:
  查询 y  ──► 计算对应 Bitmap 位置
            ├── 所有位置均为 1 ──► "可能存在",放行查询
            └── 任一位置为 0 ──► "一定不存在",直接拒绝

在 .NET 中,可以使用 StackExchange.Redis 结合 Redis 的 BF.ADD / BF.EXISTS 命令(需要 RedisBloom 模块),或者使用纯 C# 实现:

// 安装 NuGet 包:StackExchange.Redis
// 使用 Redis BitField 实现布隆过滤器

public class RedisBloomFilter
{
    private readonly IDatabase _db;
    private readonly string _key;
    private readonly int _bitSize;
    private readonly int _hashCount;

    public RedisBloomFilter(IDatabase db, string key, int bitSize = 1 << 24int hashCount = 5)
    {
        _db = db;
        _key = key;
        _bitSize = bitSize;
        _hashCount = hashCount;
    }

    // 将数据加入布隆过滤器
    public async Task AddAsync(string value)
    {
        var positions = GetBitPositions(value);
        var batch = _db.CreateBatch();
        var tasks = positions.Select(pos => batch.StringSetBitAsync(_key, pos, true));
        batch.Execute();
        await Task.WhenAll(tasks);
    }

    // 判断数据是否可能存在
    public async Task<boolMightExistAsync(string value)
    {
        var positions = GetBitPositions(value);
        var tasks = positions.Select(pos => _db.StringGetBitAsync(_key, pos));
        var results = await Task.WhenAll(tasks);
        return results.All(r => r); // 所有位均为1才"可能存在"
    }

    private IEnumerable<longGetBitPositions(string value)
    {
        for (int i = 0; i < _hashCount; i++)
        {
            // 使用不同种子生成多个哈希值
            var hash = MurmurHash3(value, (uint)(i * 2654435761));
            yield return Math.Abs((long)(hash % (uint)_bitSize));
        }
    }

    private static uint MurmurHash3(string input, uint seed)
    {
        var data = Encoding.UTF8.GetBytes(input);
        uint h1 = seed;
        const uint c1 = 0xcc9e2d51, c2 = 0x1b873593;

        int len = data.Length;
        int remaining = len & 3;
        int blocks = len >> 2;

        for (int i = 0; i < blocks; i++)
        {
            uint k1 = BitConverter.ToUInt32(data, i * 4);
            k1 *= c1; k1 = RotateLeft(k1, 15); k1 *= c2;
            h1 ^= k1; h1 = RotateLeft(h1, 13);
            h1 = h1 * 5 + 0xe6546b64;
        }

        h1 ^= (uint)len;
        h1 ^= h1 >> 16; h1 *= 0x85ebca6b;
        h1 ^= h1 >> 13; h1 *= 0xc2b2ae35;
        h1 ^= h1 >> 16;
        return h1;
    }

    private static uint RotateLeft(uint x, int n) => (x << n) | (x >> (32 - n));
}

在服务层中集成布隆过滤器:

public class ProductService
{
    private readonly RedisBloomFilter _bloomFilter;
    private readonly IDatabase _redisDb;
    private readonly AppDbContext _dbContext;

    public async Task<Product?> GetProductAsync(int productId)
    {
        var idStr = productId.ToString();

        // 第一关:布隆过滤器拦截
        if (!await _bloomFilter.MightExistAsync(idStr))
        {
            // 布隆过滤器判定不存在,直接返回,绝不访问数据库
            return null;
        }

        // 第二关:查询 Redis 缓存
        var cacheKey = $"product:{productId}";
        var cached = await _redisDb.StringGetAsync(cacheKey);
        if (cached.HasValue)
            return JsonSerializer.Deserialize<Product>(cached!);

        // 第三关:查询数据库并回写缓存
        var product = await _dbContext.Products.FindAsync(productId);
        if (product != null)
            await _redisDb.StringSetAsync(cacheKey, JsonSerializer.Serialize(product), TimeSpan.FromMinutes(30));

        return product;
    }

    // 新增数据时,同步更新布隆过滤器
    public async Task AddProductAsync(Product product)
    {
        await _dbContext.Products.AddAsync(product);
        await _dbContext.SaveChangesAsync();
        await _bloomFilter.AddAsync(product.Id.ToString()); // 同步写入布隆过滤器
    }
}

缓存击穿(Cache Breakdown)

3.1 概念与成因

缓存击穿是指某个高并发访问的热点 Key 在某一瞬间过期失效,导致大量并发请求同时穿透缓存,蜂拥至数据库,造成数据库瞬间压力剧增。

与穿透的核心区别在于:数据库中确实存在该数据,且并发量极高,是针对单一热点 Key 的精准冲击。

热点Key过期的瞬间:

并发请求 x1000
    │
    ▼
[ Redis ] ── Key 已过期,全部未命中 ──►
    │
    ▼
[ 数据库 ] ◄── 1000个并发同时查询 ──── 数据库崩溃风险!

3.2 解决方案一:互斥锁(Mutex Lock)

核心思路:缓存未命中时,只允许一个线程去重建缓存,其他线程等待或重试,避免大量请求同时打到数据库。

线程1 ──► 缓存未命中 ──► 获取互斥锁成功 ──► 查询DB ──► 重建缓存 ──► 释放锁
线程2 ──► 缓存未命中 ──► 获取互斥锁失败 ──► 等待/重试 ──► 命中缓存 ──► 返回
线程3 ──► 缓存未命中 ──► 获取互斥锁失败 ──► 等待/重试 ──► 命中缓存 ──► 返回
public class CacheBreakdownMutexService
{
    private readonly IDatabase _redisDb;
    private readonly AppDbContext _dbContext;

    // 使用 Redis SETNX 实现分布式互斥锁
    private async Task<boolTryAcquireLockAsync(string lockKey, TimeSpan expiry)
    {
        return await _redisDb.StringSetAsync(lockKey, "1", expiry, When.NotExists);
    }

    private async Task ReleaseLockAsync(string lockKey)
    {
        await _redisDb.KeyDeleteAsync(lockKey);
    }

    public async Task<Product?> GetProductWithMutexAsync(int productId)
    {
        var cacheKey = $"product:{productId}";
        var lockKey = $"lock:product:{productId}";

        // 1. 查询缓存
        var cached = await _redisDb.StringGetAsync(cacheKey);
        if (cached.HasValue)
            return JsonSerializer.Deserialize<Product>(cached!);

        // 2. 缓存未命中,尝试获取互斥锁
        while (true)
        {
            if (await TryAcquireLockAsync(lockKey, TimeSpan.FromSeconds(10)))
            {
                try
                {
                    // Double-Check:获取锁后再次检查缓存(防止重复重建)
                    cached = await _redisDb.StringGetAsync(cacheKey);
                    if (cached.HasValue)
                        return JsonSerializer.Deserialize<Product>(cached!);

                    // 3. 查询数据库,重建缓存
                    var product = await _dbContext.Products.FindAsync(productId);
                    if (product != null)
                    {
                        await _redisDb.StringSetAsync(
                            cacheKey,
                            JsonSerializer.Serialize(product),
                            TimeSpan.FromMinutes(30)
                        );
                    }
                    return product;
                }
                finally
                {
                    // 4. 释放锁(无论成功失败都要释放)
                    await ReleaseLockAsync(lockKey);
                }
            }
            else
            {
                // 未获取到锁,短暂等待后重试
                await Task.Delay(50);

                // 重试时先尝试从缓存读取(可能已被其他线程重建)
                cached = await _redisDb.StringGetAsync(cacheKey);
                if (cached.HasValue)
                    return JsonSerializer.Deserialize<Product>(cached!);
            }
        }
    }
}

优点: 强一致性,数据库只会被一个线程查询。
缺点: 高并发下大量线程阻塞等待,系统吞吐量下降,响应时间变长。

3.3 解决方案二:逻辑过期(Logical Expiration)

核心思路:不给 Redis Key 设置物理 TTL,而是在 Value 内部存储一个"逻辑过期时间"字段。查询时检测是否逻辑过期,若过期则立即返回旧数据,同时异步启动一个后台线程去刷新缓存,实现最终一致性。

线程1 ──► 发现逻辑过期 ──► 获取互斥锁 ──► 立即返回旧数据
                                │
                                └──► 异步线程 ──► 查询DB ──► 更新缓存 ──► 释放锁

线程2 ──► 发现逻辑过期 ──► 获取锁失败 ──► 直接返回旧数据(无需等待)
线程3 ──► 发现逻辑过期 ──► 获取锁失败 ──► 直接返回旧数据(无需等待)
// 带逻辑过期时间的缓存包装类
public class LogicalExpireEntry<T>
{
    public T Data { getset; } = default!;
    public DateTime ExpireTime { getset; }
}

public class LogicalExpireService
{
    private readonly IDatabase _redisDb;
    private readonly AppDbContext _dbContext;

    // 写入缓存时设置逻辑过期时间(Redis Key 本身不设 TTL 或设很长)
    public async Task SetWithLogicalExpireAsync<T>(string key, T data, TimeSpan logicalTtl)
    {
        var entry = new LogicalExpireEntry<T>
        {
            Data = data,
            ExpireTime = DateTime.UtcNow.Add(logicalTtl)
        };
        // Redis Key 物理上永不过期(或设置很长的兜底时间)
        await _redisDb.StringSetAsync(key, JsonSerializer.Serialize(entry));
    }

    public async Task<Product?> GetProductWithLogicalExpireAsync(int productId)
    {
        var cacheKey = $"product:logical:{productId}";
        var lockKey = $"lock:logical:{productId}";

        // 1. 查询缓存
        var cached = await _redisDb.StringGetAsync(cacheKey);

        // 缓存中无数据(冷启动),直接查库(此处可结合预热策略)
        if (!cached.HasValue) return null;

        var entry = JsonSerializer.Deserialize<LogicalExpireEntry<Product>>(cached!);

        // 2. 检查是否逻辑过期
        if (DateTime.UtcNow <= entry!.ExpireTime)
        {
            // 未过期,直接返回
            return entry.Data;
        }

        // 3. 已逻辑过期,尝试获取互斥锁
        bool lockAcquired = await _redisDb.StringSetAsync(lockKey, "1", TimeSpan.FromSeconds(10), When.NotExists);

        if (lockAcquired)
        {
            // 4. 获取锁成功,异步刷新缓存(当前线程立即返回旧数据)
            _ = Task.Run(async () =>
            {
                try
                {
                    var freshProduct = await _dbContext.Products.FindAsync(productId);
                    if (freshProduct != null)
                        await SetWithLogicalExpireAsync(cacheKey, freshProduct, TimeSpan.FromMinutes(30));
                }
                finally
                {
                    await _redisDb.KeyDeleteAsync(lockKey);
                }
            });
        }

        // 5. 无论是否获取到锁,都立即返回旧数据,不阻塞请求
        return entry.Data;
    }
}

优点: 线程无阻塞,系统吞吐量高,用户体验好。
缺点: 牺牲强一致性,存在短暂的数据旧值窗口;需要提前进行缓存预热。

3.4 两种方案对比

维度
互斥锁
逻辑过期
一致性
强一致
最终一致
可用性
低(线程阻塞等待)
高(立即返回旧值)
实现复杂度
较高
适用场景
数据一致性要求高
高并发、体验优先

缓存雪崩(Cache Avalanche)

4.1 概念与成因

缓存雪崩有两种触发场景:

场景一:大量 Key 同时过期。 在系统初始化时(如每日凌晨批量刷新),大量缓存被设置了相同的过期时间,导致在某一时刻集体失效,海量请求同时涌向数据库。

场景二:Redis 节点宕机。 Redis 服务本身发生故障,缓存层完全失效,所有请求直接打到数据库,引发系统性崩溃。

场景一:
t=0:00  批量写入缓存,TTL 均设为 1 小时
t=1:00  所有 Key 同时过期 ──► 全量请求打到数据库 ──► 数据库宕机

场景二:
Redis 主节点宕机
    │
    ▼
所有请求无法命中缓存 ──► 全量打到数据库 ──► 数据库宕机 ──► 系统崩溃

4.2 解决方案一:过期时间随机打散

在设置缓存 TTL 时,在基础过期时间上叠加一个随机偏移量,让不同 Key 的过期时间分散开来,避免集体失效。

public class CacheAvalancheService
{
    private readonly IDatabase _redisDb;
    private readonly Random _random = new Random();

    // 带随机抖动的缓存写入
    public async Task SetWithJitterAsync<T>(string key, T data, TimeSpan baseTtl, int jitterSeconds = 300)
    {
        // 在基础 TTL 上随机增加 0~jitterSeconds 秒的抖动
        var jitter = TimeSpan.FromSeconds(_random.Next(0, jitterSeconds));
        var actualTtl = baseTtl + jitter;

        await _redisDb.StringSetAsync(key, JsonSerializer.Serialize(data), actualTtl);
    }

    // 批量写入缓存时使用随机 TTL
    public async Task BatchSetProductsAsync(IEnumerable<Product> products)
    {
        var batch = _redisDb.CreateBatch();
        var tasks = new List<Task>();

        foreach (var product in products)
        {
            var key = $"product:{product.Id}";
            var value = JsonSerializer.Serialize(product);
            // 基础 30 分钟 + 随机 0~10 分钟抖动
            var ttl = TimeSpan.FromMinutes(30) + TimeSpan.FromSeconds(_random.Next(0600));
            tasks.Add(batch.StringSetAsync(key, value, ttl));
        }

        batch.Execute();
        await Task.WhenAll(tasks);
    }
}

4.3 解决方案二:服务熔断与限流降级

在 .NET 中,可以使用 Polly 库实现熔断(Circuit Breaker)和限流(Rate Limiting)策略,作为缓存雪崩的兜底防线。

# 安装 Polly NuGet 包
dotnet add package Polly
dotnet add package Microsoft.Extensions.Http.Polly
// Program.cs / Startup.cs 中注册 Polly 策略
builder.Services.AddHttpClient("ProductApi")
    .AddPolicyHandler(GetCircuitBreakerPolicy());

static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(
            handledEventsAllowedBeforeBreaking: 5,   // 连续5次失败后熔断
            durationOfBreak: TimeSpan.FromSeconds(30// 熔断持续30秒
        );
}

// 在服务层使用 Polly 包装数据库查询
public class ResilientProductService
{
    private readonly AsyncCircuitBreakerPolicy _circuitBreaker;
    private readonly AsyncRateLimitPolicy _rateLimiter;

    public ResilientProductService()
    {
        // 熔断策略:连续3次异常后熔断,30秒后尝试恢复
        _circuitBreaker = Policy
            .Handle<Exception>()
            .CircuitBreakerAsync(3, TimeSpan.FromSeconds(30),
                onBreak: (ex, duration) =>
                    Console.WriteLine($"[熔断] 电路已断开,持续 {duration.TotalSeconds}s"),
                onReset: () =>
                    Console.WriteLine("[熔断] 电路已恢复"));

        // 限流策略:每秒最多100个请求
        _rateLimiter = Policy.RateLimitAsync(100, TimeSpan.FromSeconds(1));
    }

    public async Task<Product?> GetProductSafeAsync(int productId)
    {
        try
        {
            return await _rateLimiter.ExecuteAsync(async () =>
                await _circuitBreaker.ExecuteAsync(async () =>
                    await QueryDatabaseAsync(productId)
                )
            );
        }
        catch (BrokenCircuitException)
        {
            // 熔断期间返回降级数据(默认值/缓存旧值)
            Console.WriteLine("[降级] 返回默认数据");
            return GetFallbackProduct(productId);
        }
        catch (RateLimitRejectedException)
        {
            // 被限流,返回提示
            throw new Exception("系统繁忙,请稍后重试");
        }
    }

    private Product GetFallbackProduct(int id) =>
        new Product { Id = id, Name = "暂时不可用", Price = 0 };

    private async Task<Product?> QueryDatabaseAsync(int id) =>
        await Task.FromResult(new Product()); // 实际查询逻辑
}

4.4 解决方案三:多级缓存架构

采用"本地内存缓存(L1)+ Redis 分布式缓存(L2)"的两级架构,即使 Redis 出现问题,本地缓存仍能顶住第一波压力。

// 安装:dotnet add package Microsoft.Extensions.Caching.Memory
public class MultiLevelCacheService
{
    private readonly IMemoryCache _localCache;   // L1: 本地内存缓存 (Caffeine-like)
    private readonly IDatabase _redisDb;          // L2: Redis 分布式缓存
    private readonly AppDbContext _dbContext;

    public MultiLevelCacheService(IMemoryCache localCache, IConnectionMultiplexer redis, AppDbContext db)
    {
        _localCache = localCache;
        _redisDb = redis.GetDatabase();
        _dbContext = db;
    }

    public async Task<Product?> GetProductAsync(int productId)
    {
        var cacheKey = $"product:{productId}";

        // L1:查本地内存缓存(最快,无网络开销)
        if (_localCache.TryGetValue(cacheKey, out Product? localProduct))
        {
            Console.WriteLine("L1 命中");
            return localProduct;
        }

        // L2:查 Redis 分布式缓存
        var redisValue = await _redisDb.StringGetAsync(cacheKey);
        if (redisValue.HasValue)
        {
            Console.WriteLine("L2 命中");
            var product = JsonSerializer.Deserialize<Product>(redisValue!);

            // 回填本地缓存(L2 命中后同步到 L1,TTL 设短)
            _localCache.Set(cacheKey, product, TimeSpan.FromMinutes(1));
            return product;
        }

        // L1 & L2 均未命中,查询数据库
        Console.WriteLine("穿透到数据库");
        var dbProduct = await _dbContext.Products.FindAsync(productId);

        if (dbProduct != null)
        {
            // 同时写入 L1 和 L2
            _localCache.Set(cacheKey, dbProduct, TimeSpan.FromMinutes(1));
            await _redisDb.StringSetAsync(cacheKey, JsonSerializer.Serialize(dbProduct), TimeSpan.FromMinutes(30));
        }

        return dbProduct;
    }
}

4.5 解决方案四:Redis 高可用集群

从基础设施层面,通过部署 Redis Sentinel(哨兵模式) 或 Redis Cluster(集群模式) 实现高可用,避免单点故障。

// Program.cs 中配置 Redis Sentinel(哨兵模式)
builder.Services.AddSingleton<IConnectionMultiplexer>(sp =>
{
    var config = new ConfigurationOptions
    {
        // 配置哨兵节点
        EndPoints =
        {
            { "sentinel-host-1"26379 },
            { "sentinel-host-2"26379 },
            { "sentinel-host-3"26379 }
        },
        ServiceName = "mymaster",       // 主节点名称
        Password = "your-redis-password",
        AbortOnConnectFail = false,     // 连接失败时不抛出异常
        ConnectRetry = 3,               // 重试次数
        ReconnectRetryPolicy = new LinearRetry(1000// 线性重试间隔
    };
    return ConnectionMultiplexer.Connect(config);
});


群贤毕至

访客