Redis 快取策略設計:從入門到生產環境
Redis 快取策略設計:從入門到生產環境
Redis 是現代後端架構中最常用的快取工具,但「用 Redis 做快取」和「設計好的快取策略」是兩回事。這篇文章分享我在生產環境中學到的快取設計思路和踩過的坑。
為什麼需要快取?
快取的本質是用空間換時間。資料庫查詢通常需要幾毫秒到幾十毫秒,Redis 讀取通常在 0.1ms 以內。在高流量場景下,快取可以:
- 減少資料庫負載
- 降低 API 回應時間
- 節省計算資源
基本操作
import redis
import json
from typing import Optional, Any
redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
class CacheService:
def __init__(self, client: redis.Redis):
self.client = client
def get(self, key: str) -> Optional[Any]:
data = self.client.get(key)
if data is None:
return None
return json.loads(data)
def set(self, key: str, value: Any, ttl: int = 3600) -> None:
self.client.setex(key, ttl, json.dumps(value, default=str))
def delete(self, key: str) -> None:
self.client.delete(key)
def delete_pattern(self, pattern: str) -> int:
# 注意:KEYS 命令在生產環境不建議使用,改用 SCAN
cursor = 0
deleted = 0
while True:
cursor, keys = self.client.scan(cursor, match=pattern, count=100)
if keys:
deleted += self.client.delete(*keys)
if cursor == 0:
break
return deleted快取模式
Cache-Aside(最常用)
async def get_user(user_id: int) -> dict:
cache_key = f"user:{user_id}"
# 先查快取
cached = cache.get(cache_key)
if cached is not None:
return cached
# 快取未命中,查資料庫
user = await db.query("SELECT * FROM users WHERE id = $1", user_id)
if user:
# 存入快取,TTL 1 小時
cache.set(cache_key, user, ttl=3600)
return user
async def update_user(user_id: int, data: dict):
# 更新資料庫
await db.execute("UPDATE users SET ... WHERE id = $1", user_id)
# 刪除快取(讓下次請求重新載入)
cache.delete(f"user:{user_id}")Write-Through
寫入時同時更新資料庫和快取:
async def update_user_write_through(user_id: int, data: dict):
# 同時更新資料庫和快取
async with db.transaction():
await db.execute("UPDATE users SET ... WHERE id = $1", user_id)
updated_user = await db.query("SELECT * FROM users WHERE id = $1", user_id)
# 資料庫成功後更新快取
cache.set(f"user:{user_id}", updated_user, ttl=3600)
return updated_user快取雪崩(Cache Avalanche)
當大量快取同時過期,所有請求打到資料庫:
import random
def set_with_jitter(key: str, value: Any, base_ttl: int):
# 在基礎 TTL 上加入隨機浮動(±10%)
jitter = random.randint(-base_ttl // 10, base_ttl // 10)
ttl = base_ttl + jitter
cache.set(key, value, ttl=ttl)
# 原本所有商品快取都是 3600s,改用浮動 TTL
for product in products:
set_with_jitter(f"product:{product['id']}", product, base_ttl=3600)快取穿透(Cache Penetration)
查詢不存在的資料,每次都穿透到資料庫:
async def get_user_safe(user_id: int) -> Optional[dict]:
cache_key = f"user:{user_id}"
cached = cache.get(cache_key)
if cached is not None:
# 快取了 None 值(用特殊標記)
return None if cached == '__NULL__' else cached
user = await db.query("SELECT * FROM users WHERE id = $1", user_id)
if user:
cache.set(cache_key, user, ttl=3600)
else:
# 快取空結果,短 TTL 防止資料真的被新增時無法訪問
cache.set(cache_key, '__NULL__', ttl=60)
return user更好的方案:Bloom Filter(布隆過濾器)
from bloom_filter2 import BloomFilter
# 初始化布隆過濾器,載入所有有效的 user_id
bloom = BloomFilter(max_elements=1000000, error_rate=0.01)
for user_id in get_all_user_ids():
bloom.add(str(user_id))
async def get_user_with_bloom(user_id: int):
# 如果不在 Bloom Filter 中,一定不存在
if str(user_id) not in bloom:
return None
# 繼續正常流程
return await get_user(user_id)快取擊穿(Cache Breakdown)
熱點資料快取過期瞬間,大量請求同時查資料庫:
import asyncio
from asyncio import Lock
locks = {}
async def get_hot_data(key: str):
cached = cache.get(key)
if cached:
return cached
# 同一個 key 只讓一個請求去查資料庫
if key not in locks:
locks[key] = Lock()
async with locks[key]:
# 拿到鎖後再次檢查快取(可能已被其他請求填充)
cached = cache.get(key)
if cached:
return cached
data = await db.query_hot_data(key)
cache.set(key, data, ttl=3600)
return dataRedis 資料結構應用
# Hash:儲存物件欄位(比 JSON 字串更節省記憶體)
redis_client.hset("user:1001", mapping={
"name": "Alice",
"email": "alice@example.com",
"score": "1250"
})
name = redis_client.hget("user:1001", "name")
# Sorted Set:排行榜
redis_client.zadd("leaderboard", {"Alice": 1250, "Bob": 980, "Charlie": 1100})
top_10 = redis_client.zrevrange("leaderboard", 0, 9, withscores=True)
# Set:追蹤唯一訪客
redis_client.sadd("visitors:2026-01-20", user_id)
daily_count = redis_client.scard("visitors:2026-01-20")
# 計數器
redis_client.incr("page_views:home")
redis_client.incrby("api_calls:hour:14", 1)
redis_client.expire("api_calls:hour:14", 3600)監控快取命中率
# 查看 Redis 統計資訊
redis-cli INFO stats | grep -E "(keyspace_hits|keyspace_misses)"
# 計算命中率
# hit_rate = keyspace_hits / (keyspace_hits + keyspace_misses)快取命中率低於 80% 通常代表快取策略需要調整。正確的快取設計需要了解你的讀寫比例、資料熱度分佈和容許的資料陳舊程度,沒有一體適用的方案。
分享這篇文章