Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:生产级 Controller 实践 —— 并发安全、资源清理与高可用设计
当我们把 Controller 部署到生产环境时,会遇到很多开发环境不会出现的问题:多线程并发安全、内存泄漏、资源清理、高可用部署……这些问题如果不在设计阶段就考虑清楚,等上线后再修复代价会非常大。
这一篇文章,我们来系统地学习生产级 Controller 的最佳实践。
Kubernetes 生产实践 高可用 资源泄漏 v1.36.1 🔓 学习重点提示 — 建议先通读全文,再重点回顾标注内容
★ 重点掌握(必须)
• 并发安全:为什么 informer.eventHandlers 的注册必须在锁保护下
• 资源清理:Finalizer 如何保证删除安全
• 优雅关闭:WaitGroup 和 channel 的正确使用
☆ 次重点(了解即可)
• 监控指标设计
一、并发安全:Controller 编程的核心挑战
Controller 天然是多线程并发执行的:Informer 的事件处理在 goroutine 中运行,Worker 处理任务也在多个 goroutine 中运行。如果不处理好并发安全,就会出现数据竞争、状态不一致等问题。
1. WorkQueue 的并发安全保证
WorkQueue 本身是并发安全的,Get() 返回同一个 key 时,只有一个 goroutine 能获取到:
// WorkQueue 的并发安全由其内部锁保证 // 同一时间只有一个 worker 能 Get 到同一个 key key1, _ := queue.Get() // goroutine A 获取了 "deploy-1" key2, _ := queue.Get() // goroutine B 获取了 "deploy-1"(如果 A 没有 Done) // 实际上,A 调用 Get() 后,key1 就从队列中移除了 // B 只能获取到其他 key,或者等待
2. Reconcile 的幂等性设计
WorkQueue 保证同一时间只有一个 worker 处理同一个 key,但不同时间可能有多个 worker 处理同一个 key(因为失败重试)。所以 Reconcile 必须设计成幂等的:
// 幂等的 Reconcile 示例:确保 Pod 带有特定标签
func (c *Controller) reconcile(ctx context.Context, deployment *appsv1.Deployment) error {// 获取当前 Pod 列表(从 Lister,可能不是最新)pods, err := c.podLister.Pods(deployment.Namespace).List(labels.Everything())if err != nil {return err}// 筛选属于这个 Deployment 的 PodownedPods := filterOwnedPods(deployment, pods)// 比较期望状态和实际状态for _, pod := range ownedPods {if pod.Labels["app"] != deployment.Labels["app"] {// 更新 Pod(使用 Patch 而不是 Replace,避免冲突)patch, _ := json.Marshal([]map[string]interface{}{{"op": "add","path": "/metadata/labels/app","value": deployment.Labels["app"],},})_, err := c.kubeclientset.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.JSONPatchType, patch, metav1.PatchOptions{},)if err != nil {return err}}}return nil // 幂等:重复执行结果一致
}
3. 避免读取过期数据
Lister 返回的是本地缓存的数据,可能不是最新状态。当需要最新状态时,应该直接查询 APIServer:
// 当需要最新状态时,直接从 APIServer 获取
func (c *Controller) reconcile(ctx context.Context, deployment *appsv1.Deployment) error {// Lister 可能有过期数据pods, err := c.podLister.Pods(deployment.Namespace).List(...)// 某些情况下需要最新状态latestDeployment, err := c.kubeclientset.AppsV1().Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{})if err != nil {return err}// 检查是否有其他 Controller 修改过这个资源if latestDeployment.ResourceVersion != deployment.ResourceVersion {klog.V(4).Infof("Deployment %s/%s has been modified, will retry",deployment.Namespace, deployment.Name)return fmt.Errorf("deployment was modified")}// ... 继续处理 ...
}
二、资源清理:Finalizer 的正确使用
当 Controller 管理的资源被删除时,我们需要确保清理逻辑能够执行。Finalizer(最终一致性机制)就是来解决这个问题的。
什么是 Finalizer?
Finalizer 是 Kubernetes 的一种机制,它告诉 APIServer:在删除对象之前,必须先移除某个 finalizer。这个机制常用于:
- 清理外部资源(如云存储卷、外部数据库)
- 等待相关资源删除完成(如等待 Pod 删除)
- 完成数据同步或备份
Finalizer 的工作流程
┌──────────────────────────────────────────────────────────────────────────┐ │ Finalizer 工作流程 │ ├──────────────────────────────────────────────────────────────────────────┤ │ │ │ 1. 创建资源时,添加 finalizer │ │ metadata.finalizers = ["example.com/cleanup"] │ │ │ │ 2. 用户发起删除请求 │ │ kubectl delete foo my-foo │ │ │ │ 3. APIServer 设置 deletionTimestamp,但不立即删除 │ │ metadata.deletionTimestamp = "2024-01-01T12:00:00Z" │ │ → Controller 收到 Update/Delete 事件 │ │ │ │ 4. Controller 执行清理逻辑 │ │ - 清理关联资源 │ │ - 清理外部资源 │ │ - 完成必要的清理工作 │ │ │ │ 5. Controller 移除 finalizer │ │ PATCH metadata.finalizers = [] │ │ │ │ 6. APIServer 完成删除 │ │ ✓ 资源被真正删除 │ │ │ └──────────────────────────────────────────────────────────────────────────┘
Finalizer 实现示例
const finalizerName = "example.com/cleanup"func (c *Controller) reconcileFoo(ctx context.Context, foo *examplev1.Foo) error {// 检查是否需要添加 finalizerif !hasFinalizer(foo, finalizerName) {foo = foo.DeepCopy()foo.Finalizers = append(foo.Finalizers, finalizerName)_, err := c.exampleClient.ExampleV1().Foos(foo.Namespace).Update(ctx, foo, metav1.UpdateOptions{})return err}// 检查是否正在被删除if foo.DeletionTimestamp != nil {// 执行清理逻辑if err := c.cleanupResources(ctx, foo); err != nil {return err}// 移除 finalizer,允许资源被删除foo = foo.DeepCopy()foo.Finalizers = removeFinalizer(foo.Finalizers, finalizerName)_, err := c.exampleClient.ExampleV1().Foos(foo.Namespace).Update(ctx, foo, metav1.UpdateOptions{})return err}// 正常处理逻辑// ...return nil
}func hasFinalizer(obj metav1.Object, finalizer string) bool {for _, f := range obj.GetFinalizers() {if f == finalizer {return true}}return false
}func removeFinalizer(finalizers []string, finalizer string) []string {var result []stringfor _, f := range finalizers {if f != finalizer {result = append(result, f)}}return result
}
⚠️ 警告
如果 Controller 因为 bug 无法移除 finalizer,资源会永远无法删除。这时需要手动介入:kubectl edit foo my-foo 并移除 finalizers 数组。
三、优雅关闭:Controller 的生命周期管理
当 Controller 收到 SIGTERM(终止信号)时,应该优雅地关闭,而不是立即退出。
优雅关闭的步骤
- 1停止接收新任务:不再接受新的 Reconcile 请求
- 2等待正在处理的任务完成:给 worker 一段时间处理完当前任务
- 3关闭 Informer:停止 Watch,释放连接
- 4释放锁(如使用了 Leader 选举):让其他副本可以接管
优雅关闭实现
func (c *Controller) Run(ctx context.Context, workers int) error {// 等待缓存同步if !cache.WaitForCacheSync(ctx.Done(), c.deploymentSynced, c.podSynced) {return fmt.Errorf("failed to wait for caches to sync")}// 启动 workersfor i := 0; i < workers; i++ {c.workerGroup.Go(func() error {return c.runWorker(ctx, i)})}// 等待 context 取消<-ctx.Done()// Context 取消后,开始优雅关闭klog.Info("Shutdown signal received, gracefully stopping...")// 通知所有 worker 停止c.workerGroup.Wait()// 关闭 Informer Factoryc.factory.Shutdown()klog.Info("Shutdown complete")return nil
}func (c *Controller) runWorker(ctx context.Context, id int) error {for {select {case <-ctx.Done():return nil // Context 取消,退出default:if !c.processNextWorkItem(ctx) {return nil}}}
}
四、内存泄漏:Informer 的陷阱
Informer 是最容易导致内存泄漏的地方。常见的原因有:
1. 没有调用 Shutdown
// 错误示例:没有关闭 Informer
func main() {factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)factory.Start(ctx.Done())// ... 使用 factory ...// 程序退出时没有调用 Shutdown// goroutine 继续运行,持有的内存无法释放
}// 正确示例
func main() {factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)ctx, cancel := context.WithCancel(context.Background())defer cancel()factory.Start(ctx.Done())// ... 使用 factory ...// 程序退出前调用 Shutdownfactory.Shutdown()
}
2. 注册了 Handler 但没有清理
// 错误示例:在 Controller 运行时注册了新的 Handler
func (c *Controller) updateHandler(newHandler cache.ResourceEventHandler) {// 每次调用都会添加一个新的 Handler!c.informer.AddEventHandler(newHandler)// 旧 Handler 永远不会被移除
}// 正确做法:Handler 只注册一次,使用 channel 或其他机制传递事件
func (c *Controller) Run() {// 只注册一次 Handlerc.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {c.eventCh
3. Indexer 持有过期对象
Indexer 会缓存所有见过的对象。如果对象被大量创建和删除,Indexer 会持续积累内存。使用带过期时间的索引可以缓解这个问题:
// 使用带 TTL 的 ThreadSafeStore
indexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,"byUID": func(obj interface{}) ([]string, error) {meta, ok := obj.(metav1.Object)if !ok {return nil, fmt.Errorf("object is not metav1.Object")}return []string{string(meta.GetUID())}, nil},
}// 或者定期清理不需要的索引
func (c *Controller) cleanupIndexer() {items := c.indexer.List()for _, item := range items {meta, _ := item.(metav1.Object)// 删除过期的对象if time.Since(meta.GetDeletionTimestamp().Time) > 5*time.Minute {c.indexer.Delete(item)}}
}
五、高可用部署:多副本 + Leader 选举
生产环境的 Controller 通常需要部署多个副本以保证高可用。
Deployment + Leader 选举模式
┌──────────────────────────────────────────────────────────────────────────┐ │ 高可用部署架构 │ ├──────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Deployment (replicas: 3) │ │ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Pod-1 │ │ Pod-2 │ │ Pod-3 │ │ │ │ │ │ (Leader) │ │ (Standby) │ │ (Standby) │ │ │ │ │ └──────┬──────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌─────────────┐ │ │ │ │ │ Lease Lock │ 只允许一个 Pod 处理事件 │ │ │ │ │(in cluster)│ │ │ │ │ └─────────────┘ │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────────────────┘
Deployment 配置
apiVersion: apps/v1 kind: Deployment metadata:name: example-operatornamespace: operators spec:replicas: 3selector:matchLabels:app: example-operatortemplate:metadata:labels:app: example-operatorspec:serviceAccountName: example-operatorcontainers:- name: operatorimage: example/operator:v1.0.0args:- --leader-elect=true- --leader-election-namespace=operators# Pod 必须配置优雅终止宽限期terminationGracePeriodSeconds: 30
六、监控指标:生产级 Controller 的必备功能
生产环境的 Controller 应该暴露 metrics,方便监控和告警。
import ("github.com/prometheus/client_golang/prometheus""github.com/prometheus/client_golang/prometheus/promauto"
)var (reconcileTotal = promauto.NewCounterVec(prometheus.CounterOpts{Name: "controller_reconcile_total",Help: "Total number of reconcile attempts",},[]string{"result"}, // result: success, error)reconcileDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{Name: "controller_reconcile_duration_seconds",Help: "Duration of reconcile operations",Buckets: prometheus.DefBuckets,},[]string{"result"},)workqueueDepth = promauto.NewGauge(prometheus.GaugeOpts{Name: "controller_workqueue_depth",Help: "Current depth of the workqueue",},)
)func (c *Controller) reconcile(ctx context.Context, key string) error {start := time.Now()defer func() {duration := time.Since(start).Seconds()reconcileDuration.WithLabelValues("success").Observe(duration)reconcileTotal.WithLabelValues("success").Inc()}()err := c.doReconcile(ctx, key)if err != nil {reconcileDuration.WithLabelValues("error").Observe(time.Since(start).Seconds())reconcileTotal.WithLabelValues("error").Inc()return err}return nil
}
🌟 实用技巧
建议暴露以下 metrics:reconcile 总次数和成功率、reconcile 耗时分布、workqueue 深度、Leader 选举切换次数。
七、总结
这一节是 client-go 系列的最后一篇,我们总结了生产级 Controller 的最佳实践:
- 并发安全:Reconcile 必须是幂等的,避免读取过期数据
- Finalizer:确保清理逻辑能够执行,避免资源泄漏
- 优雅关闭:正确处理 SIGTERM,给 worker 时间完成当前任务
- 内存泄漏:正确关闭 Informer,避免注册重复 Handler
- 高可用:多副本部署 + Leader 选举
- 监控指标:暴露 metrics,方便监控和告警
至此,client-go 系列的全部文章已经完成。从 Informer 源码、DeltaFIFO、Indexer、WorkQueue,到 SharedInformerFactory、Controller 开发模式、Leader 选举、限速器、Watch 机制、DynamicClient,再到调试工具和生产实践,希望这一系列文章能帮助您真正掌握 Kubernetes 客户端编程。
Kubernetes 编程 / Operator 专题【左扬精讲】—— 生产级 Controller 实践 · 来源:Kubernetes v1.36.1 client-go 源码分析
