缓存
利用 singleflight 和 分布式锁(redis) 防止大流量直接打到数据库
使用 SetNX
cache.go
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"time"
"github.com/go-redis/redis/v8"
"golang.org/x/sync/singleflight"
)
var (
localGroup singleflight.Group
redisClient *redis.Client
)
// simulateExpensiveTask 模拟一个耗时操作,如数据库查询或调用外部 API。
func simulateExpensiveTask(key string) (string, error) {
log.Printf("[Expensive Task] Performing for key: %s\n", key)
time.Sleep(2 * time.Second) // 模拟耗时
return "result-for-" + key, nil
}
// getWithDistributedSingleflight 使用 singleflight 和 Redis 锁协调并发请求。
func getWithDistributedSingleflight(ctx context.Context, key string) (string, error) {
// 1. 使用 localGroup 进行进程内 singleflight。
// 这可以防止单个服务实例上的 "惊群效应"。
val, err, _ := localGroup.Do(key, func() (interface{}, error) {
// --- 这里开始是分布式锁的逻辑 ---
distributedLockKey := "lock:" + key
// 2. 尝试获取分布式锁,带超时。
lock := redisClient.SetNX(ctx, distributedLockKey, "locked", 10*time.Second)
// 3. 检查是否成功获取锁。
if !lock.Val() {
// 如果未能获取锁,说明另一个节点正在处理。
// 此时,我们只需等待并重试,或者直接执行任务(取决于容忍度)。
// 在此简单示例中,我们再次执行任务,但实际上,可以等待并重新检查缓存。
log.Printf("[Distributed Lock] Failed to get lock for key: %s, another instance is working.\n", key)
time.Sleep(3 * time.Second) // 模拟等待
return simulateExpensiveTask(key)
}
defer redisClient.Del(ctx, distributedLockKey) // 确保锁被释放
// 4. 获得锁后,再次检查缓存(可选但推荐)。
// ... 这里可以添加缓存检查逻辑 ...
// 5. 执行耗时操作。
result, err := simulateExpensiveTask(key)
if err != nil {
return nil, err
}
// 6. 将结果存入缓存,以便其他实例使用。
// ... 这里可以添加缓存存储逻辑 ...
return result, nil
})
if err != nil {
return "", err
}
return val.(string), nil
}
// handleRequest 是 HTTP 请求的处理函数。
func handleRequest(w http.ResponseWriter, r *http.Request) {
// 从 URL 查询参数中获取键。
key := r.URL.Query().Get("key")
if key == "" {
http.Error(w, "Missing 'key' parameter", http.StatusBadRequest)
return
}
// 调用我们的分布式 singleflight 函数。
result, err := getWithDistributedSingleflight(r.Context(), key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 返回结果。
w.Header().Set("Content-Type", "application/json")
response := map[string]string{"result": result}
json.NewEncoder(w).Encode(response)
}
func main() {
// 初始化 Redis 客户端。确保您的 Redis 服务正在运行。
redisClient = redis.NewClient(&redis.Options{
Addr: "192.168.20.162:6379", // 替换为您的 Redis 地址
})
// 检查 Redis 连接。
_, err := redisClient.Ping(context.Background()).Result()
if err != nil {
log.Fatalf("Could not connect to Redis: %v", err)
}
// 注册 HTTP 处理函数。
http.HandleFunc("/fetch", handleRequest)
log.Println("Server started on :8080")
// 启动 HTTP 服务器。
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("Server failed to start: %v", err)
}
}
lock.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/bsm/redislock"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/singleflight"
)
var (
localGroup singleflight.Group
redisClient *redis.Client
locker *redislock.Client
)
// simulateExpensiveTask 模拟一个耗时操作,如数据库查询。
func simulateExpensiveTask(key string) (string, error) {
log.Printf("[Expensive Task] Performing for key: %s\n", key)
time.Sleep(2 * time.Second) // 模拟耗时
return "result-for-" + key, nil
}
// getWithDistributedSingleflight 使用 singleflight 和 bsm/redislock 协调并发请求。
func getWithDistributedSingleflight(ctx context.Context, key string) (string, error) {
// 1. 首先尝试从 Redis 缓存中获取数据。
cachedVal, err := redisClient.Get(ctx, "cache:"+key).Result()
if err == nil {
log.Printf("[Cache Hit] Serving from cache for key: %s\n", key)
return cachedVal, nil
}
if err != redis.Nil {
return "", fmt.Errorf("failed to get from cache: %w", err)
}
// 2. 如果缓存未命中,使用 localGroup 进行进程内 singleflight。
val, err, _ := localGroup.Do(key, func() (interface{}, error) {
// --- 这里开始是分布式锁和数据加载逻辑 ---
distributedLockKey := "lock:" + key
// 3. 再次检查缓存,防止在等待 localGroup 期间,其他 goroutine 已完成任务并更新了缓存。
cachedVal, err := redisClient.Get(ctx, "cache:"+key).Result()
if err == nil {
log.Printf("[Local Singleflight Hit Cache] Serving from cache for key: %s\n", key)
return cachedVal, nil
}
// 4. 尝试获取分布式锁,带超时。
lock, err := locker.Obtain(ctx, distributedLockKey, 10*time.Second, nil)
if err == redislock.ErrNotObtained {
// 如果未能获取锁,说明另一个节点正在处理。
// 此时可以等待一个短时间,然后重试或再次检查缓存。
log.Printf("[Distributed Lock] Failed to get lock for key: %s, another instance is working. Waiting...\n", key)
time.Sleep(3 * time.Second) // 模拟等待
// 再次检查缓存。
cachedVal, err := redisClient.Get(ctx, "cache:"+key).Result()
if err == nil {
log.Printf("[Distributed Lock Wait Hit Cache] Serving from cache for key: %s\n", key)
return cachedVal, nil
}
return nil, fmt.Errorf("could not obtain lock and cache still empty")
} else if err != nil {
return nil, fmt.Errorf("failed to obtain lock: %w", err)
}
defer lock.Release(ctx) // 确保锁被释放
log.Printf("[Distributed Lock] Obtained lock for key: %s. Performing expensive task.\n", key)
// 5. 获得锁后,执行耗时操作。
result, err := simulateExpensiveTask(key)
if err != nil {
return nil, err
}
// 6. 将结果存入 Redis 缓存,设置过期时间。
cacheTTL := 5 * time.Minute
if err := redisClient.Set(ctx, "cache:"+key, result, cacheTTL).Err(); err != nil {
log.Printf("[Cache] Failed to set cache for key: %s, err: %v\n", key, err)
} else {
log.Printf("[Cache] Set cache for key: %s\n", key)
}
return result, nil
})
if err != nil {
return "", err
}
return val.(string), nil
}
// handleRequest 是 HTTP 请求的处理函数。
func handleRequest(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
if key == "" {
http.Error(w, "Missing 'key' parameter", http.StatusBadRequest)
return
}
result, err := getWithDistributedSingleflight(r.Context(), key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
response := map[string]string{"result": result}
json.NewEncoder(w).Encode(response)
}
func main() {
redisClient = redis.NewClient(&redis.Options{
Addr: "192.168.20.162:6379", // 替换为你的 Redis 地址
})
locker = redislock.New(redisClient)
// 检查 Redis 连接。
_, err := redisClient.Ping(context.Background()).Result()
if err != nil {
log.Fatalf("Could not connect to Redis: %v", err)
}
http.HandleFunc("/fetch", handleRequest)
log.Println("Server started on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("Server failed to start: %v", err)
}
}