缓存

利用 singleflight 和 分布式锁(redis) 防止大流量直接打到数据库

使用 SetNX

cache.go
package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "time"

    "github.com/go-redis/redis/v8"
    "golang.org/x/sync/singleflight"
)

var (
    localGroup  singleflight.Group
    redisClient *redis.Client
)

// simulateExpensiveTask 模拟一个耗时操作,如数据库查询或调用外部 API。
func simulateExpensiveTask(key string) (string, error) {
    log.Printf("[Expensive Task] Performing for key: %s\n", key)
    time.Sleep(2 * time.Second) // 模拟耗时
    return "result-for-" + key, nil
}

// getWithDistributedSingleflight 使用 singleflight 和 Redis 锁协调并发请求。
func getWithDistributedSingleflight(ctx context.Context, key string) (string, error) {
    // 1. 使用 localGroup 进行进程内 singleflight。
    // 这可以防止单个服务实例上的 "惊群效应"。
    val, err, _ := localGroup.Do(key, func() (interface{}, error) {
        // --- 这里开始是分布式锁的逻辑 ---
        distributedLockKey := "lock:" + key
        // 2. 尝试获取分布式锁,带超时。
        lock := redisClient.SetNX(ctx, distributedLockKey, "locked", 10*time.Second)

        // 3. 检查是否成功获取锁。
        if !lock.Val() {
            // 如果未能获取锁,说明另一个节点正在处理。
            // 此时,我们只需等待并重试,或者直接执行任务(取决于容忍度)。
            // 在此简单示例中,我们再次执行任务,但实际上,可以等待并重新检查缓存。
            log.Printf("[Distributed Lock] Failed to get lock for key: %s, another instance is working.\n", key)
            time.Sleep(3 * time.Second) // 模拟等待
            return simulateExpensiveTask(key)
        }
        defer redisClient.Del(ctx, distributedLockKey) // 确保锁被释放

        // 4. 获得锁后,再次检查缓存(可选但推荐)。
        // ... 这里可以添加缓存检查逻辑 ...

        // 5. 执行耗时操作。
        result, err := simulateExpensiveTask(key)
        if err != nil {
            return nil, err
        }

        // 6. 将结果存入缓存,以便其他实例使用。
        // ... 这里可以添加缓存存储逻辑 ...
        return result, nil
    })

    if err != nil {
        return "", err
    }
    return val.(string), nil
}

// handleRequest 是 HTTP 请求的处理函数。
func handleRequest(w http.ResponseWriter, r *http.Request) {
    // 从 URL 查询参数中获取键。
    key := r.URL.Query().Get("key")
    if key == "" {
        http.Error(w, "Missing 'key' parameter", http.StatusBadRequest)
        return
    }

    // 调用我们的分布式 singleflight 函数。
    result, err := getWithDistributedSingleflight(r.Context(), key)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    // 返回结果。
    w.Header().Set("Content-Type", "application/json")
    response := map[string]string{"result": result}
    json.NewEncoder(w).Encode(response)
}

func main() {
    // 初始化 Redis 客户端。确保您的 Redis 服务正在运行。
    redisClient = redis.NewClient(&redis.Options{
        Addr: "192.168.20.162:6379", // 替换为您的 Redis 地址
    })
    // 检查 Redis 连接。
    _, err := redisClient.Ping(context.Background()).Result()
    if err != nil {
        log.Fatalf("Could not connect to Redis: %v", err)
    }

    // 注册 HTTP 处理函数。
    http.HandleFunc("/fetch", handleRequest)

    log.Println("Server started on :8080")
    // 启动 HTTP 服务器。
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatalf("Server failed to start: %v", err)
    }
}

使用 bsm/redislock

lock.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/bsm/redislock"
    "github.com/redis/go-redis/v9"
    "golang.org/x/sync/singleflight"
)

var (
    localGroup  singleflight.Group
    redisClient *redis.Client
    locker      *redislock.Client
)

// simulateExpensiveTask 模拟一个耗时操作,如数据库查询。
func simulateExpensiveTask(key string) (string, error) {
    log.Printf("[Expensive Task] Performing for key: %s\n", key)
    time.Sleep(2 * time.Second) // 模拟耗时
    return "result-for-" + key, nil
}

// getWithDistributedSingleflight 使用 singleflight 和 bsm/redislock 协调并发请求。
func getWithDistributedSingleflight(ctx context.Context, key string) (string, error) {
    // 1. 首先尝试从 Redis 缓存中获取数据。
    cachedVal, err := redisClient.Get(ctx, "cache:"+key).Result()
    if err == nil {
        log.Printf("[Cache Hit] Serving from cache for key: %s\n", key)
        return cachedVal, nil
    }
    if err != redis.Nil {
        return "", fmt.Errorf("failed to get from cache: %w", err)
    }

    // 2. 如果缓存未命中,使用 localGroup 进行进程内 singleflight。
    val, err, _ := localGroup.Do(key, func() (interface{}, error) {
        // --- 这里开始是分布式锁和数据加载逻辑 ---
        distributedLockKey := "lock:" + key

        // 3. 再次检查缓存,防止在等待 localGroup 期间,其他 goroutine 已完成任务并更新了缓存。
        cachedVal, err := redisClient.Get(ctx, "cache:"+key).Result()
        if err == nil {
            log.Printf("[Local Singleflight Hit Cache] Serving from cache for key: %s\n", key)
            return cachedVal, nil
        }

        // 4. 尝试获取分布式锁,带超时。
        lock, err := locker.Obtain(ctx, distributedLockKey, 10*time.Second, nil)
        if err == redislock.ErrNotObtained {
            // 如果未能获取锁,说明另一个节点正在处理。
            // 此时可以等待一个短时间,然后重试或再次检查缓存。
            log.Printf("[Distributed Lock] Failed to get lock for key: %s, another instance is working. Waiting...\n", key)
            time.Sleep(3 * time.Second) // 模拟等待
            // 再次检查缓存。
            cachedVal, err := redisClient.Get(ctx, "cache:"+key).Result()
            if err == nil {
                log.Printf("[Distributed Lock Wait Hit Cache] Serving from cache for key: %s\n", key)
                return cachedVal, nil
            }
            return nil, fmt.Errorf("could not obtain lock and cache still empty")
        } else if err != nil {
            return nil, fmt.Errorf("failed to obtain lock: %w", err)
        }
        defer lock.Release(ctx) // 确保锁被释放

        log.Printf("[Distributed Lock] Obtained lock for key: %s. Performing expensive task.\n", key)

        // 5. 获得锁后,执行耗时操作。
        result, err := simulateExpensiveTask(key)
        if err != nil {
            return nil, err
        }

        // 6. 将结果存入 Redis 缓存,设置过期时间。
        cacheTTL := 5 * time.Minute
        if err := redisClient.Set(ctx, "cache:"+key, result, cacheTTL).Err(); err != nil {
            log.Printf("[Cache] Failed to set cache for key: %s, err: %v\n", key, err)
        } else {
            log.Printf("[Cache] Set cache for key: %s\n", key)
        }

        return result, nil
    })

    if err != nil {
        return "", err
    }
    return val.(string), nil
}

// handleRequest 是 HTTP 请求的处理函数。
func handleRequest(w http.ResponseWriter, r *http.Request) {
    key := r.URL.Query().Get("key")
    if key == "" {
        http.Error(w, "Missing 'key' parameter", http.StatusBadRequest)
        return
    }

    result, err := getWithDistributedSingleflight(r.Context(), key)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "application/json")
    response := map[string]string{"result": result}
    json.NewEncoder(w).Encode(response)
}

func main() {
    redisClient = redis.NewClient(&redis.Options{
        Addr: "192.168.20.162:6379", // 替换为你的 Redis 地址
    })

    locker = redislock.New(redisClient)

    // 检查 Redis 连接。
    _, err := redisClient.Ping(context.Background()).Result()
    if err != nil {
        log.Fatalf("Could not connect to Redis: %v", err)
    }

    http.HandleFunc("/fetch", handleRequest)

    log.Println("Server started on :8080")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatalf("Server failed to start: %v", err)
    }
}