跳至主要內容

Redis 快取策略設計:從入門到生產環境

Redis 快取策略設計:從入門到生產環境

Redis 是現代後端架構中最常用的快取工具,但「用 Redis 做快取」和「設計好的快取策略」是兩回事。這篇文章分享我在生產環境中學到的快取設計思路和踩過的坑。

為什麼需要快取?

快取的本質是用空間換時間。資料庫查詢通常需要幾毫秒到幾十毫秒,Redis 讀取通常在 0.1ms 以內。在高流量場景下,快取可以:

  • 減少資料庫負載
  • 降低 API 回應時間
  • 節省計算資源

基本操作

import redis
import json
from typing import Optional, Any

redis_client = redis.Redis(
    host='localhost',
    port=6379,
    decode_responses=True
)

class CacheService:
    def __init__(self, client: redis.Redis):
        self.client = client
    
    def get(self, key: str) -> Optional[Any]:
        data = self.client.get(key)
        if data is None:
            return None
        return json.loads(data)
    
    def set(self, key: str, value: Any, ttl: int = 3600) -> None:
        self.client.setex(key, ttl, json.dumps(value, default=str))
    
    def delete(self, key: str) -> None:
        self.client.delete(key)
    
    def delete_pattern(self, pattern: str) -> int:
        # 注意:KEYS 命令在生產環境不建議使用,改用 SCAN
        cursor = 0
        deleted = 0
        while True:
            cursor, keys = self.client.scan(cursor, match=pattern, count=100)
            if keys:
                deleted += self.client.delete(*keys)
            if cursor == 0:
                break
        return deleted

快取模式

Cache-Aside(最常用)

async def get_user(user_id: int) -> dict:
    cache_key = f"user:{user_id}"
    
    # 先查快取
    cached = cache.get(cache_key)
    if cached is not None:
        return cached
    
    # 快取未命中,查資料庫
    user = await db.query("SELECT * FROM users WHERE id = $1", user_id)
    
    if user:
        # 存入快取,TTL 1 小時
        cache.set(cache_key, user, ttl=3600)
    
    return user

async def update_user(user_id: int, data: dict):
    # 更新資料庫
    await db.execute("UPDATE users SET ... WHERE id = $1", user_id)
    
    # 刪除快取(讓下次請求重新載入)
    cache.delete(f"user:{user_id}")

Write-Through

寫入時同時更新資料庫和快取:

async def update_user_write_through(user_id: int, data: dict):
    # 同時更新資料庫和快取
    async with db.transaction():
        await db.execute("UPDATE users SET ... WHERE id = $1", user_id)
        updated_user = await db.query("SELECT * FROM users WHERE id = $1", user_id)
    
    # 資料庫成功後更新快取
    cache.set(f"user:{user_id}", updated_user, ttl=3600)
    return updated_user

快取雪崩(Cache Avalanche)

當大量快取同時過期,所有請求打到資料庫:

import random

def set_with_jitter(key: str, value: Any, base_ttl: int):
    # 在基礎 TTL 上加入隨機浮動(±10%)
    jitter = random.randint(-base_ttl // 10, base_ttl // 10)
    ttl = base_ttl + jitter
    cache.set(key, value, ttl=ttl)

# 原本所有商品快取都是 3600s,改用浮動 TTL
for product in products:
    set_with_jitter(f"product:{product['id']}", product, base_ttl=3600)

快取穿透(Cache Penetration)

查詢不存在的資料,每次都穿透到資料庫:

async def get_user_safe(user_id: int) -> Optional[dict]:
    cache_key = f"user:{user_id}"
    cached = cache.get(cache_key)
    
    if cached is not None:
        # 快取了 None 值(用特殊標記)
        return None if cached == '__NULL__' else cached
    
    user = await db.query("SELECT * FROM users WHERE id = $1", user_id)
    
    if user:
        cache.set(cache_key, user, ttl=3600)
    else:
        # 快取空結果,短 TTL 防止資料真的被新增時無法訪問
        cache.set(cache_key, '__NULL__', ttl=60)
    
    return user

更好的方案:Bloom Filter(布隆過濾器)

from bloom_filter2 import BloomFilter

# 初始化布隆過濾器,載入所有有效的 user_id
bloom = BloomFilter(max_elements=1000000, error_rate=0.01)
for user_id in get_all_user_ids():
    bloom.add(str(user_id))

async def get_user_with_bloom(user_id: int):
    # 如果不在 Bloom Filter 中,一定不存在
    if str(user_id) not in bloom:
        return None
    
    # 繼續正常流程
    return await get_user(user_id)

快取擊穿(Cache Breakdown)

熱點資料快取過期瞬間,大量請求同時查資料庫:

import asyncio
from asyncio import Lock

locks = {}

async def get_hot_data(key: str):
    cached = cache.get(key)
    if cached:
        return cached
    
    # 同一個 key 只讓一個請求去查資料庫
    if key not in locks:
        locks[key] = Lock()
    
    async with locks[key]:
        # 拿到鎖後再次檢查快取(可能已被其他請求填充)
        cached = cache.get(key)
        if cached:
            return cached
        
        data = await db.query_hot_data(key)
        cache.set(key, data, ttl=3600)
        return data

Redis 資料結構應用

# Hash:儲存物件欄位(比 JSON 字串更節省記憶體)
redis_client.hset("user:1001", mapping={
    "name": "Alice",
    "email": "alice@example.com",
    "score": "1250"
})
name = redis_client.hget("user:1001", "name")

# Sorted Set:排行榜
redis_client.zadd("leaderboard", {"Alice": 1250, "Bob": 980, "Charlie": 1100})
top_10 = redis_client.zrevrange("leaderboard", 0, 9, withscores=True)

# Set:追蹤唯一訪客
redis_client.sadd("visitors:2026-01-20", user_id)
daily_count = redis_client.scard("visitors:2026-01-20")

# 計數器
redis_client.incr("page_views:home")
redis_client.incrby("api_calls:hour:14", 1)
redis_client.expire("api_calls:hour:14", 3600)

監控快取命中率

# 查看 Redis 統計資訊
redis-cli INFO stats | grep -E "(keyspace_hits|keyspace_misses)"

# 計算命中率
# hit_rate = keyspace_hits / (keyspace_hits + keyspace_misses)

快取命中率低於 80% 通常代表快取策略需要調整。正確的快取設計需要了解你的讀寫比例、資料熱度分佈和容許的資料陳舊程度,沒有一體適用的方案。

分享這篇文章