Go语言高可用设计:容错与降级
Go语言高可用设计:容错与降级
1. 引言
在分布式系统中,硬件故障、网络分区、软件bug等问题不可避免。高可用设计的目标是在部分组件故障时,系统仍能继续提供服务。本文将深入讲解Go语言微服务中实现高可用的核心技术:超时控制、熔断器模式、重试机制、降级策略和限流控制。
2. 超时控制
2.1 为什么需要超时控制
没有超时控制的系统可能因为一个慢查询或网络问题导致所有请求堆积,最终引发雪崩效应。超时控制是防止故障扩散的第一道防线。
2.2 HTTP客户端超时设置
package main import ( "context" "fmt" "net/http" "time" ) // 基础超时配置 func basicTimeoutClient() *http.Client { return &http.Client{ Timeout: 10 * time.Second, // 全局超时 } } // 精细化超时配置 func customTimeoutClient() *http.Client { return &http.Client{ Transport: &http.Transport{ DialContext: (&net.Dialer{ Timeout: 5 * time.Second, // 建立连接超时 KeepAlive: 30 * time.Second, }).DialContext, ResponseHeaderTimeout: 5 * time.Second, // 读取响应头超时 ExpectContinueTimeout: 1 * time.Second, TLSHandshakeTimeout: 5 * time.Second, // TLS握手超时 }, CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse }, Jar: nil, Timeout: 10 * time.Second, } } // 带Context的请求 func requestWithContext(ctx context.Context, client *http.Client, url string) ([]byte, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err } resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() return io.ReadAll(resp.Body) }2.3 gRPC超时控制
package main import ( "context" "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func grpcTimeoutExample() { conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := pb.NewMyServiceClient(conn) // 设置超时 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() resp, err := client.MyMethod(ctx, &pb.Request{}) if err != nil { st, ok := status.FromError(err) if ok && st.Code() == codes.DeadlineExceeded { fmt.Println("RPC超时") } } } // 链路超时传播 func timeoutPropagationExample() { // 假设总超时为5秒 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 调用服务A,超时设置为3秒 ctxA, cancelA := context.WithTimeout(ctx, 3*time.Second) defer cancelA() // 调用服务B,超时设置为1秒 ctxB, cancelB := context.WithTimeout(ctx, 1*time.Second) defer cancelB() // 服务A调用服务B,传递ctxB go func() { clientB.Call(ctxB, request) }() // 服务A调用服务B,传递ctxA clientA.Call(ctxA, request) }2.4 数据库连接超时
package db import ( "context" "database/sql" "time" _ "github.com/go-sql-driver/mysql" ) func mysqlDSNWithTimeout() string { // MySQL DSN中设置超时参数 return "user:password@tcp(localhost:3306)/dbname?timeout=10s&readTimeout=30s&writeTimeout=30s" } func postgresDSNWithTimeout() string { // PostgreSQL连接字符串超时参数 return "postgres://user:password@localhost:5432/dbname?connect_timeout=10" } // 带超时的查询 func queryWithTimeout(ctx context.Context, db *sql.DB, query string, args ...interface{}) (*sql.Rows, error) { // 创建带超时的context queryCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() return db.QueryContext(queryCtx, query, args...) } // 带超时的事务 func transactionWithTimeout(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error { txCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() tx, err := db.BeginTx(txCtx, nil) if err != nil { return err } defer func() { if p := recover(); p != nil { tx.Rollback() panic(p) } }() if err := fn(tx); err != nil { tx.Rickback() return err } return tx.Commit() }3. 熔断器模式
3.1 熔断器模式概述
熔断器模式灵感来自电力系统中的保险丝,当电路故障时会自动熔断以防止进一步损坏。在分布式系统中,熔断器监控远程调用的失败率,当失败率超过阈值时"熔断",快速返回错误而不是继续调用已经故障的服务。
3.2 sony/gobreaker实现
package circuitbreaker import ( "errors" "fmt" "time" "github.com/sony/gobreaker" ) var ( // 熔断器开启时的错误 ErrCircuitOpen = errors.New("circuit breaker is open") ) // 熔断器配置 type CircuitBreakerConfig struct { Name string // 熔断器名称 MaxRequests uint32 // 半开状态下最大请求数 Interval time.Duration // 统计周期 Timeout time.Duration // 熔断器从开启到半开的超时 ReadyToTrip func(count gobreaker.Counts) bool // 判断是否触发熔断 } func defaultConfig(name string) *CircuitBreakerConfig { return &CircuitBreakerConfig{ Name: name, MaxRequests: 3, // 半开状态下最多放3个请求 Interval: 10 * time.Second, // 每10秒统计一次 Timeout: 30 * time.Second, // 熔断30秒后尝试半开 ReadyToTrip: func(count gobreaker.Counts) bool { // 失败率超过60%且请求数>=5时触发 failureRatio := float64(count.TotalFailures) / float64(count.Requests) return count.Requests >= 5 && failureRatio >= 0.6 }, } } // 创建熔断器 func NewCircuitBreaker(config *CircuitBreakerConfig) *gobreaker.CircuitBreaker { settings := gobreaker.Settings{ Name: config.Name, MaxRequests: config.MaxRequests, Interval: config.Interval, Timeout: config.Timeout, ReadyToTrip: config.ReadyToTrip, StateChanged: func(name string, from gobreaker.State, to gobreaker.State) { fmt.Printf("CircuitBreaker [%s] state changed from %s to %s\n", name, from, to) }, } return gobreaker.NewCircuitBreaker(settings) } // 熔断器包装的调用 func CallWithCircuitBreaker(cb *gobreaker.CircuitBreaker, fn func() (interface{}, error)) (interface{}, error) { result, err := cb.Execute(func() (interface{}, error) { return fn() }) if err != nil { if errors.Is(err, gobreaker.ErrOpenState) || errors.Is(err, gobreaker.ErrTooManyRequests) { return nil, ErrCircuitOpen } return nil, err } return result, nil }3.3 熔断器在HTTP客户端中的应用
package client import ( "bytes" "encoding/json" "errors" "fmt" "io" "net/http" "time" "github.com/sony/gobreaker" ) type HTTPClient struct { client *http.Client cb *gobreaker.CircuitBreaker baseURL string } type Response struct { StatusCode int Body []byte Headers http.Header } func NewHTTPClient(baseURL string) *HTTPClient { cbSettings := gobreaker.Settings{ Name: "http-client", MaxRequests: 3, Interval: 10 * time.Second, Timeout: 30 * time.Second, ReadyToTrip: func(count gobreaker.Counts) bool { failureRatio := float64(count.TotalFailures) / float64(count.Requests) return count.Requests >= 10 && failureRatio >= 0.5 }, } return &HTTPClient{ client: &http.Client{ Timeout: 10 * time.Second, }, cb: gobreaker.NewCircuitBreaker(cbSettings), baseURL: baseURL, } } func (c *HTTPClient) Get(path string) (*Response, error) { return c.do(http.MethodGet, path, nil, nil) } func (c *HTTPClient) Post(path string, body interface{}) (*Response, error) { return c.do(http.MethodPost, path, body, nil) } func (c *HTTPClient) do(method, path string, reqBody interface{}, headers map[string]string) (*Response, error) { url := c.baseURL + path var bodyBytes []byte if reqBody != nil { bodyBytes, _ = json.Marshal(reqBody) } req, err := http.NewRequest(method, url, bytes.NewReader(bodyBytes)) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json") for k, v := range headers { req.Header.Set(k, v) } var resp *http.Response var err error // 通过熔断器执行请求 _, err = c.cb.Execute(func() (interface{}, error) { resp, err = c.client.Do(req) if err != nil { return nil, err } // 检查响应状态码,4xx/5xx视为失败 if resp.StatusCode >= 400 { return nil, fmt.Errorf("HTTP error: status %d", resp.StatusCode) } return resp, nil }) if err != nil { // 判断是否为熔断器错误 if errors.Is(err, gobreaker.ErrOpenState) || errors.Is(err, gobreaker.ErrTooManyRequests) { return nil, fmt.Errorf("service unavailable: %w", err) } return nil, err } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) return &Response{ StatusCode: resp.StatusCode, Body: body, Headers: resp.Header, }, nil }3.4 自定义熔断器实现
package mybreaker import ( "errors" "sync" "time" ) type State int const ( StateClosed State = iota // 熔断器关闭,正常工作 StateOpen // 熔断器打开,快速失败 StateHalfOpen // 半开,允许一个请求试试 ) var ( ErrTooManyRequests = errors.New("too many requests") ErrOpenState = errors.New("circuit breaker is open") ) type Counts struct { Requests uint64 // 总请求数 Successes uint64 // 成功数 Failures uint64 // 失败数 Timeouts uint64 // 超时数 } type CircuitBreaker struct { name string maxRequests uint32 // 半开状态下最大请求数 interval time.Duration // 统计周期 timeout time.Duration // 熔断持续时间 readyToTrip func(Counts) bool halfOpenReqs uint32 mu sync.RWMutex state State counts Counts lastStateChange time.Time openedTime time.Time } func New(name string, maxRequests uint32, interval, timeout time.Duration, readyToTrip func(Counts) bool) *CircuitBreaker { return &CircuitBreaker{ name: name, maxRequests: maxRequests, interval: interval, timeout: timeout, readyToTrip: readyToTrip, } } func (cb *CircuitBreaker) Execute(fn func() error) error { if !cb.allowRequest() { return ErrOpenState } err := fn() cb.recordResult(err) return err } func (cb *CircuitBreaker) allowRequest() bool { cb.mu.Lock() defer cb.mu.Unlock() switch cb.state { case StateClosed: return true case StateOpen: // 检查是否超时可以进入半开状态 if time.Since(cb.openedTime) > cb.timeout { cb.toStateHalfOpen() return true } return false case StateHalfOpen: // 半开状态下限制请求数 if cb.halfOpenReqs >= cb.maxRequests { return false } cb.halfOpenReqs++ return true } return false } func (cb *CircuitBreaker) recordResult(err error) { cb.mu.Lock() defer cb.mu.Unlock() cb.counts.Requests++ if err != nil { cb.counts.Failures++ } else { cb.counts.Successes++ } // 检查是否需要熔断 if cb.readyToTrip(cb.counts) { cb.toStateOpen() } else if cb.state == StateHalfOpen { // 半开状态下,如果请求成功则关闭熔断器 if err == nil { cb.toStateClosed() } else { // 失败则重新打开 cb.toStateOpen() } } } func (cb *CircuitBreaker) toStateOpen() { cb.state = StateOpen cb.openedTime = time.Now() cb.halfOpenReqs = 0 } func (cb *CircuitBreaker) toStateHalfOpen() { cb.state = StateHalfOpen cb.halfOpenReqs = 0 cb.counts = Counts{} } func (cb *CircuitBreaker) toStateClosed() { cb.state = StateClosed cb.counts = Counts{} }4. 重试机制
4.1 重试策略设计
package retry import ( "context" "errors" "fmt" "math" "math/rand" "time" ) var ( ErrMaxRetriesExceeded = errors.New("maximum retries exceeded") ErrContextCanceled = errors.New("context canceled") ) // 重试配置 type Config struct { MaxAttempts int // 最大重试次数 InitialBackoff time.Duration // 初始退避时间 MaxBackoff time.Duration // 最大退避时间 BackoffFactor float64 // 退避因子 Jitter bool // 是否添加抖动 Retryable func(error) bool // 判断错误是否可重试 } // 默认配置 func DefaultConfig() *Config { return &Config{ MaxAttempts: 3, InitialBackoff: 100 * time.Millisecond, MaxBackoff: 5 * time.Second, BackoffFactor: 2.0, Jitter: true, Retryable: func(err error) bool { // 默认:网络错误、超时、5xx错误可重试 if err == nil { return false } // 业务错误一般不重试 return true }, } } // 计算退避时间 func (c *Config) calculateBackoff(attempt int) time.Duration { backoff := float64(c.InitialBackoff) * math.Pow(c.BackoffFactor, float64(attempt-1)) if backoff > float64(c.MaxBackoff) { backoff = float64(c.MaxBackoff) } if c.Jitter { // 添加随机抖动:0.5 ~ 1.5 jitter := 0.5 + rand.Float64() backoff *= jitter } return time.Duration(backoff) } // 执行重试 func Do(ctx context.Context, config *Config, fn func() error) error { var lastErr error for attempt := 1; attempt <= config.MaxAttempts; attempt++ { select { case <-ctx.Done(): return ErrContextCanceled default: } err := fn() if err == nil { return nil } lastErr = err // 检查是否应该重试 if !config.Retryable(err) { return err } // 最后一次尝试,不需要等待 if attempt == config.MaxAttempts { break } // 等待退避时间 backoff := config.calculateBackoff(attempt) waitCtx, cancel := context.WithTimeout(ctx, backoff) select { case <-waitCtx.Done(): cancel() return fmt.Errorf("%w: %v", ErrMaxRetriesExceeded, lastErr) case <-ctx.Done(): cancel() return ErrContextCanceled } cancel() } return fmt.Errorf("%w: %v", ErrMaxRetriesExceeded, lastErr) } // HTTP请求重试示例 type HTTPClient struct { client *http.Client retry *Config } func (c *HTTPClient) GetWithRetry(ctx context.Context, url string) (*http.Response, error) { var resp *http.Response var err error err = Do(ctx, c.retry, func() error { resp, err = c.client.Get(url) if err != nil { return err } // 5xx错误重试 if resp.StatusCode >= 500 { resp.Body.Close() return fmt.Errorf("server error: %d", resp.StatusCode) } return nil }) return resp, err }4.2 gRPC重试拦截器
package grpc_retry import ( "context" "fmt" "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) var ( retryableCodes = []codes.Code{ codes.Unavailable, codes.ResourceExhausted, codes.DeadlineExceeded, } ) type RetryOption struct { MaxAttempts int InitialBackoff time.Duration MaxBackoff time.Duration BackoffMultiplier float64 } func defaultRetryOption() *RetryOption { return &RetryOption{ MaxAttempts: 3, InitialBackoff: 100 * time.Millisecond, MaxBackoff: 5 * time.Second, BackoffMultiplier: 2.0, } } func UnaryClientInterceptor(option *RetryOption) grpc.UnaryClientInterceptor { if option == nil { option = defaultRetryOption() } return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { var lastErr error for attempt := 0; attempt < option.MaxAttempts; attempt++ { if attempt > 0 { backoff := option.InitialBackoff * time.Duration(math.Pow(option.BackoffMultiplier, float64(attempt-1))) if backoff > option.MaxBackoff { backoff = option.MaxBackoff } select { case <-time.After(backoff): case <-ctx.Done(): return ctx.Err() } } err := invoker(ctx, method, req, reply, cc, opts...) if err == nil { return nil } lastErr = err // 检查是否是可重试的错误 st, ok := status.FromError(err) if !ok { return err } if !isRetryable(st.Code()) { return err } } return lastErr } } func isRetryable(code codes.Code) bool { for _, c := range retryableCodes { if c == code { return true } } return false }5. 降级策略
5.1 降级策略概述
降级策略是在服务不可用时,提供一个备选方案以保证核心功能可用。降级不是放弃,而是"降格"服务。
5.2 降级实现
package fallback import ( "context" "encoding/json" "fmt" "time" "github.com/redis/go-redis/v9" ) // FallbackManager 降级管理器 type FallbackManager struct { redis *redis.Client } func NewFallbackManager(redis *redis.Client) *FallbackManager { return &FallbackManager{redis: redis} } // FallbackFunc 降级函数类型 type FallbackFunc func() (interface{}, error) // ProductService 主服务 type ProductService struct { remote *RemoteService // 远程服务 fallback *FallbackManager cache *redis.Client } type Product struct { ID uint `json:"id"` Name string `json:"name"` Price float64 `json:"price"` } // GetProduct 获取商品,失败时降级到缓存或默认值 func (s *ProductService) GetProduct(ctx context.Context, id uint) (*Product, error) { // 1. 先尝试从缓存获取 cacheKey := fmt.Sprintf("product:%d", id) cached, err := s.cache.Get(ctx, cacheKey).Bytes() if err == nil { var product Product if json.Unmarshal(cached, &product) == nil { return &product, nil } } // 2. 尝试调用远程服务 product, err := s.remote.GetProduct(ctx, id) if err == nil { // 成功则更新缓存 if data, _ := json.Marshal(product); err == nil { s.cache.Set(ctx, cacheKey, data, 5*time.Minute) } return product, nil } // 3. 降级:从本地缓存或默认值获取 return s.fallback.GetProductFallback(ctx, id) } // GetProductFallback 降级逻辑 func (m *FallbackManager) GetProductFallback(ctx context.Context, id uint) (*Product, error) { // 从Redis降级缓存获取(设置较长的过期时间) fallbackKey := fmt.Sprintf("product:fallback:%d", id) cached, err := m.redis.Get(ctx, fallbackKey).Bytes() if err == nil { var product Product if json.Unmarshal(cached, &product) == nil { return &product, nil } } // 返回默认商品 return &Product{ ID: id, Name: "商品已下架", Price: 0, }, nil } // SetFallbackCache 设置降级缓存 func (m *FallbackManager) SetFallbackCache(ctx context.Context, id uint, product *Product) error { fallbackKey := fmt.Sprintf("product:fallback:%d", id) data, err := json.Marshal(product) if err != nil { return err } // 降级缓存过期时间设置较长,如24小时 return m.redis.Set(ctx, fallbackKey, data, 24*time.Hour).Err() }5.3 开关降级
package feature import ( "context" "sync" "github.com/redis/go-redis/v9" ) // FeatureToggle 特性开关 type FeatureToggle struct { redis *redis.Client local map[string]bool mu sync.RWMutex } func NewFeatureToggle(redis *redis.Client) *FeatureToggle { ft := &FeatureToggle{ redis: redis, local: make(map[string]bool), } // 定期同步开关状态 go ft.syncLoop() return ft } // IsEnabled 检查特性是否开启 func (ft *FeatureToggle) IsEnabled(ctx context.Context, name string) bool { ft.mu.RLock() enabled, ok := ft.local[name] ft.mu.RUnlock() if !ok { // 首次查询,从Redis获取 enabled = ft.getFromRedis(ctx, name) ft.mu.Lock() ft.local[name] = enabled ft.mu.Unlock() } return enabled } func (ft *FeatureToggle) getFromRedis(ctx context.Context, name string) bool { key := fmt.Sprintf("feature:%s", name) result, err := ft.redis.Get(ctx, key).Bool() if err != nil { return false // 默认关闭 } return result } // SetEnabled 设置特性开关 func (ft *FeatureToggle) SetEnabled(ctx context.Context, name string, enabled bool) error { key := fmt.Sprintf("feature:%s", name) if err := ft.redis.Set(ctx, key, enabled, 0).Err(); err != nil { return err } ft.mu.Lock() ft.local[name] = enabled ft.mu.Unlock() return nil } func (ft *FeatureToggle) syncLoop() { ticker := time.NewTicker(30 * time.Second) for range ticker.C { ft.mu.Lock() for name := range ft.local { // 重新从Redis读取 ctx := context.Background() ft.local[name] = ft.getFromRedis(ctx, name) } ft.mu.Unlock() } } // 使用开关进行降级 func (s *ProductService) GetProductWithToggle(ctx context.Context, id uint) (*Product, error) { toggle := s.toggle.IsEnabled(ctx, "use_new_recommendation") if toggle { // 新版推荐算法 return s.getProductNew(ctx, id) } // 降级到旧版逻辑 return s.getProductOld(ctx, id) }6. 限流控制
6.1 限流算法实现
package ratelimit import ( "context" "fmt" "sync" "time" "golang.org/x/time/rate" ) // TokenBucketLimiter 令牌桶限流器 type TokenBucketLimiter struct { limiter *rate.Limiter key string } func NewTokenBucketLimiter(qps float64, burst int) *TokenBucketLimiter { return &TokenBucketLimiter{ limiter: rate.NewLimiter(rate.ApproxDuration(qps), burst), } } func (l *TokenBucketLimiter) Allow() bool { return l.limiter.Allow() } func (l *TokenBucketLimiter) AllowN(now time.Time, n int) bool { return l.limiter.AllowN(now, n) } // 获取令牌,支持context取消 func (l *TokenBucketLimiter) Wait(ctx context.Context) error { return l.limiter.Wait(ctx) } func (l *TokenBucketLimiter) WaitN(ctx context.Context, n int) error { return l.limiter.WaitN(ctx, n) } // 多租户限流管理器 type MultiTenantLimiter struct { limiters map[string]*rate.Limiter mu sync.RWMutex defaultQPS float64 defaultBurst int } func NewMultiTenantLimiter(defaultQPS float64, defaultBurst int) *MultiTenantLimiter { return &MultiTenantLimiter{ limiters: make(map[string]*rate.Limiter), defaultQPS: defaultQPS, defaultBurst: defaultBurst, } } func (m *MultiTenantLimiter) GetLimiter(tenantID string) *rate.Limiter { m.mu.RLock() limiter, ok := m.limiters[tenantID] m.mu.RUnlock() if ok { return limiter } m.mu.Lock() defer m.mu.Unlock() // 双重检查 if limiter, ok := m.limiters[tenantID]; ok { return limiter } // 为新租户创建限流器 limiter = rate.NewLimiter(rate.ApproxDuration(m.defaultQPS), m.defaultBurst) m.limiters[tenantID] = limiter return limiter } func (m *MultiTenantLimiter) SetLimiter(tenantID string, qps float64, burst int) { m.mu.Lock() defer m.mu.Unlock() m.limiters[tenantID] = rate.NewLimiter(rate.ApproxDuration(qps), burst) }6.2 Gin中间件限流
package middleware import ( "net/http" "sync" "time" "github.com/gin-gonic/gin" "golang.org/x/time/rate" ) // IPRateLimiter 基于IP的限流中间件 type IPRateLimiter struct { limiters map[string]*rate.Limiter mu sync.RWMutex qps float64 burst int } func NewIPRateLimiter(qps float64, burst int) *IPRateLimiter { return &IPRateLimiter{ limiters: make(map[string]*rate.Limiter), qps: qps, burst: burst, } } func (m *IPRateLimiter) getLimiter(ip string) *rate.Limiter { m.mu.RLock() limiter, ok := m.limiters[ip] m.mu.RUnlock() if ok { return limiter } m.mu.Lock() defer m.mu.Unlock() if limiter, ok := m.limiters[ip]; ok { return limiter } limiter = rate.NewLimiter(rate.ApproxDuration(m.qps), m.burst) m.limiters[ip] = limiter return limiter } func (m *IPRateLimiter) Middleware() gin.HandlerFunc { return func(c *gin.Context) { ip := c.ClientIP() limiter := m.getLimiter(ip) if !limiter.Allow() { c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{ "error": "rate limit exceeded", "retry_after": "1s", }) return } c.Next() } } // TokenRateLimiter 基于Token的限流中间件 func TokenRateLimiter(limiter *rate.Limiter) gin.HandlerFunc { return func(c *gin.Context) { if !limiter.Allow() { c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{ "error": "rate limit exceeded", }) return } c.Next() } } // GlobalRateLimiter 全局限流 var GlobalLimiter = rate.NewLimiter(rate.ApproxDuration(1000), 5000) func GlobalRateLimitMiddleware() gin.HandlerFunc { return func(c *gin.Context) { if !GlobalLimiter.Allow() { c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{ "error": "global rate limit exceeded", }) return } c.Next() } } // 使用示例 func setupRouter() *gin.Engine { r := gin.Default() // 全局限流 r.Use(GlobalRateLimitMiddleware()) // IP限流:每IP每秒10个请求,突发30 ipLimiter := NewIPRateLimiter(10, 30) r.Use(ipLimiter.Middleware()) // API路由 api := r.Group("/api") { // 登录接口更严格的限流 loginLimiter := rate.NewLimiter(rate.ApproxDuration(1), 5) api.POST("/login", TokenRateLimiter(loginLimiter), loginHandler) // 普通接口 api.GET("/users", usersHandler) } return r }6.3 Redis分布式限流
package ratelimit import ( "context" "fmt" "time" "github.com/redis/go-redis/v9" ) // RedisSlidingWindowLimiter 滑动窗口限流器 type RedisSlidingWindowLimiter struct { redis *redis.Client key string rate int // 时间窗口内允许的请求数 window time.Duration // 时间窗口大小 } func NewRedisSlidingWindowLimiter(redis *redis.Client, key string, rate int, window time.Duration) *RedisSlidingWindowLimiter { return &RedisSlidingWindowLimiter{ redis: redis, key: key, rate: rate, window: window, } } // IsAllowed 检查是否允许请求 func (l *RedisSlidingWindowLimiter) IsAllowed(ctx context.Context) (bool, error) { now := time.Now().UnixMilli() windowStart := now - l.window.Milliseconds() pipe := l.redis.Pipeline() // 删除窗口外的记录 pipe.ZRemRangeByScore(ctx, l.key, "0", fmt.Sprintf("%d", windowStart)) // 计算当前窗口内请求数 countCmd := pipe.ZCard(ctx, l.key) // 添加当前请求 pipe.ZAdd(ctx, l.key, redis.Z{Score: float64(now), Member: fmt.Sprintf("%d", now)}) // 设置过期时间 pipe.Expire(ctx, l.key, l.window) _, err := pipe.Exec(ctx) if err != nil { return false, err } count := countCmd.Val() return count < int64(l.rate), nil } // RedisTokenBucketLimiter 基于Redis的令牌桶限流器 type RedisTokenBucketLimiter struct { redis *redis.Client key string capacity int // 桶容量 rate float64 // 每秒补充的令牌数 } func NewRedisTokenBucketLimiter(redis *redis.Client, key string, capacity int, rate float64) *RedisTokenBucketLimiter { return &RedisTokenBucketLimiter{ redis: redis, key: key, capacity: capacity, rate: rate, } } // Lua脚本保证原子性 const tokenBucketScript = ` local key = KEYS[1] local capacity = tonumber(ARGV[1]) local rate = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) -- 获取当前令牌数和时间戳 local data = redis.call('HMGET', key, 'tokens', 'last_update') local tokens = tonumber(data[1]) or capacity local last_update = tonumber(data[2]) or now -- 计算应该补充的令牌数 local elapsed = now - last_update local added = elapsed * rate / 1000 tokens = math.min(capacity, tokens + added) -- 检查是否足够 local allowed = 0 if tokens >= 1 then tokens = tokens - 1 allowed = 1 end -- 更新状态 redis.call('HMSET', key, 'tokens', tokens, 'last_update', now) redis.call('EXPIRE', key, 60) return allowed ` func (l *RedisTokenBucketLimiter) Allow(ctx context.Context) (bool, error) { now := time.Now().UnixMilli() result, err := l.redis.Eval(ctx, tokenBucketScript, []string{l.key}, l.capacity, l.rate, now).Int() if err != nil { return false, err } return result == 1, nil }7. 综合示例:微服务容错框架
package faulttolerance import ( "context" "fmt" "time" "github.com/sony/gobreaker" "github.com/redis/go-redis/v9" ) // ServiceClient 微服务客户端,包含所有容错机制 type ServiceClient struct { httpClient *HTTPClient breaker *gobreaker.CircuitBreaker rateLimiter *rate.Limiter retryConfig *retry.Config } func NewServiceClient(redisClient *redis.Client) *ServiceClient { return &ServiceClient{ httpClient: NewHTTPClient("http://service:8080"), breaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "service-client", MaxRequests: 3, Interval: 10 * time.Second, Timeout: 30 * time.Second, }), rateLimiter: rate.NewLimiter(100, 200), retryConfig: retry.DefaultConfig(), } } // CallWithFaultTolerance 带完整容错能力的调用 func (c *ServiceClient) CallWithFaultTolerance(ctx context.Context, path string) (*Response, error) { // 1. 限流检查 if err := c.rateLimiter.Wait(ctx); err != nil { return nil, fmt.Errorf("rate limited: %w", err) } // 2. 熔断器检查 _, err := c.breaker.Execute(func() (interface{}, error) { // 3. 重试机制 err := retry.Do(ctx, c.retryConfig, func() error { resp, err := c.httpClient.Get(path) if err != nil { return err } // 处理业务错误 if resp.StatusCode >= 500 { return fmt.Errorf("server error: %d", resp.StatusCode) } return nil }) return nil, err }) if err != nil { // 熔断器打开或重试耗尽 if errors.Is(err, gobreaker.ErrOpenState) { // 返回降级数据 return c.getFallbackResponse(path) } return nil, err } return c.httpClient.Get(path) } func (c *ServiceClient) getFallbackResponse(path string) (*Response, error) { return &Response{ StatusCode: 200, Body: []byte(`{"data": "fallback data"}`), }, nil }8. 总结
高可用设计是分布式系统稳定运行的保障,需要多层次、多维度的容错机制配合:
- 超时控制:设置合理的超时时间,避免请求堆积和资源耗尽
- 熔断器模式:快速失败,防止故障扩散,保护下游服务
- 重试机制:在可恢复的错误场景下自动重试,提高成功率
- 降级策略:在服务不可用时提供备选方案,保证核心功能
- 限流控制:保护系统不被突发流量冲垮,维护服务质量
这些机制需要根据业务特点进行调优,参数设置不当可能导致系统性能下降或容错效果不佳。建议在生产环境中持续监控这些机制的表现,并根据实际情况调整参数。
