当前位置: 首页 > news >正文

Go语言并发模式深度解析

Go语言并发模式深度解析

Go语言以其独特的并发模型而闻名,goroutine和channel是Go并发编程的核心。本文将深入探讨Go语言中的并发模式和最佳实践。

一、Goroutine原理

1.1 Goroutine与线程的区别

func main() { // 创建10万个goroutine for i := 0; i < 100000; i++ { go func(id int) { fmt.Printf("Goroutine %d\n", id) }(i) } }
特性GoroutineOS线程
栈大小初始2KB,可增长通常1MB
创建开销~2KB栈 + 少量结构内存分配 + 内核调度
上下文切换用户态,快速内核态,较慢
数量限制可创建数万通常数百

1.2 Goroutine调度器

// GPM调度模型 type g struct { stack stack stackguard0 uintptr stackguard1 uintptr _panic *_panic _defer *_defer m *m sched gobuf } type m struct { g0 *g curg *g p puintptr nextp puintptr alllink *m schedtick int32 } type p struct { mu mutex id int32 status uint32 m *m runqhead uint32 runqtail uint32 runq [256]guintptr gFree *g }

1.3 调度策略

// work-stealing算法 func schedule() { for { gp := runqget(_g_.m.p.ptr()) if gp == nil { gp = findrunnable() } execute(gp, false) } } func runqget(p *p) *g { for { head := atomic.Load(&p.runqhead) tail := p.runqtail if head == tail { return nil } gp := p.runq[head%uint32(len(p.runq))].ptr() if atomic.Cas(&p.runqhead, head, head+1) { return gp } } }

二、Channel深度解析

2.1 Channel类型

type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters lock mutex } type waitq struct { first *sudog last *sudog }

2.2 Channel操作

// 发送操作 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = cputicks() } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) return true }

2.3 Channel模式

// 模式1:生产者-消费者 func producer(ch chan<- int) { for i := 0; i < 10; i++ { ch <- i } close(ch) } func consumer(ch <-chan int) { for val := range ch { fmt.Println(val) } } // 模式2:扇出 func fanOut(input <-chan int, n int) []<-chan int { channels := make([]<-chan int, n) for i := 0; i < n; i++ { channels[i] = process(input) } return channels } func process(input <-chan int) <-chan int { out := make(chan int) go func() { for val := range input { out <- val * 2 } close(out) }() return out } // 模式3:扇入 func fanIn(channels ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup wg.Add(len(channels)) for _, ch := range channels { go func(c <-chan int) { defer wg.Done() for val := range c { out <- val } }(ch) } go func() { wg.Wait() close(out) }() return out }

三、同步原语

3.1 Mutex与RWMutex

type SafeCounter struct { mu sync.Mutex value int } func (c *SafeCounter) Inc() { c.mu.Lock() defer c.mu.Unlock() c.value++ } func (c *SafeCounter) Get() int { c.mu.Lock() defer c.mu.Unlock() return c.value } // RWMutex适合读多写少场景 type SharedData struct { mu sync.RWMutex data map[string]string } func (d *SharedData) Read(key string) (string, bool) { d.mu.RLock() defer d.mu.RUnlock() val, ok := d.data[key] return val, ok } func (d *SharedData) Write(key, value string) { d.mu.Lock() defer d.mu.Unlock() d.data[key] = value }

3.2 WaitGroup

func processItems(items []Item) error { var wg sync.WaitGroup errs := make(chan error, len(items)) for _, item := range items { wg.Add(1) go func(i Item) { defer wg.Done() if err := process(i); err != nil { errs <- err } }(item) } go func() { wg.Wait() close(errs) }() for err := range errs { return err } return nil }

3.3 Once

type Config struct { once sync.Once loaded bool data map[string]string } func (c *Config) Load() { c.once.Do(func() { c.data = loadConfigFromFile() c.loaded = true }) }

3.4 Cond

type Queue struct { mu sync.Mutex cond *sync.Cond items []interface{} } func NewQueue() *Queue { q := &Queue{ items: make([]interface{}, 0), } q.cond = sync.NewCond(&q.mu) return q } func (q *Queue) Enqueue(item interface{}) { q.mu.Lock() defer q.mu.Unlock() q.items = append(q.items, item) q.cond.Signal() } func (q *Queue) Dequeue() interface{} { q.mu.Lock() defer q.mu.Unlock() for len(q.items) == 0 { q.cond.Wait() } item := q.items[0] q.items = q.items[1:] return item }

四、并发设计模式

4.1 Worker Pool模式

type WorkerPool struct { workers int tasks chan Task wg sync.WaitGroup } func NewWorkerPool(workers int) *WorkerPool { return &WorkerPool{ workers: workers, tasks: make(chan Task, 100), } } func (p *WorkerPool) Start() { for i := 0; i < p.workers; i++ { p.wg.Add(1) go p.worker() } } func (p *WorkerPool) worker() { defer p.wg.Done() for task := range p.tasks { task.Execute() } } func (p *WorkerPool) Submit(task Task) { p.tasks <- task } func (p *WorkerPool) Stop() { close(p.tasks) p.wg.Wait() }

4.2 Context模式

func fetchData(ctx context.Context, url string) (string, error) { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return "", err } resp, err := http.DefaultClient.Do(req) if err != nil { return "", err } defer resp.Body.Close() return io.ReadAll(resp.Body) } func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() data, err := fetchData(ctx, "http://example.com") if err != nil { log.Fatal(err) } fmt.Println(data) }

4.3 Pipeline模式

func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out } func filterEven(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { if n%2 == 0 { out <- n } } close(out) }() return out } func main() { c := gen(1, 2, 3, 4, 5) s := sq(c) f := filterEven(s) for n := range f { fmt.Println(n) } }

五、并发安全数据结构

5.1 并发安全Map

type ConcurrentMap struct { mu sync.RWMutex items map[string]interface{} } func NewConcurrentMap() *ConcurrentMap { return &ConcurrentMap{ items: make(map[string]interface{}), } } func (m *ConcurrentMap) Get(key string) (interface{}, bool) { m.mu.RLock() defer m.mu.RUnlock() val, ok := m.items[key] return val, ok } func (m *ConcurrentMap) Set(key string, value interface{}) { m.mu.Lock() defer m.mu.Unlock() m.items[key] = value } func (m *ConcurrentMap) Delete(key string) { m.mu.Lock() defer m.mu.Unlock() delete(m.items, key) }

5.2 并发安全队列

type ConcurrentQueue struct { mu sync.Mutex items []interface{} } func NewConcurrentQueue() *ConcurrentQueue { return &ConcurrentQueue{ items: make([]interface{}, 0), } } func (q *ConcurrentQueue) Enqueue(item interface{}) { q.mu.Lock() defer q.mu.Unlock() q.items = append(q.items, item) } func (q *ConcurrentQueue) Dequeue() (interface{}, bool) { q.mu.Lock() defer q.mu.Unlock() if len(q.items) == 0 { return nil, false } item := q.items[0] q.items = q.items[1:] return item, true } func (q *ConcurrentQueue) Len() int { q.mu.RLock() defer q.mu.RUnlock() return len(q.items) }

六、并发最佳实践

6.1 避免共享状态

// 不好的做法:共享状态 var counter int var mu sync.Mutex func increment() { mu.Lock() counter++ mu.Unlock() } // 好的做法:通过channel传递状态 func counterWithChannel(start int) <-chan int { out := make(chan int) go func() { for i := start; ; i++ { out <- i } }() return out }

6.2 优雅关闭

func worker(ctx context.Context, jobs <-chan Job) { for { select { case job, ok := <-jobs: if !ok { return } process(job) case <-ctx.Done(): return } } }

6.3 错误处理

type Result struct { Value interface{} Err error } func processWithErrorHandling(ctx context.Context, jobs <-chan Job) <-chan Result { results := make(chan Result) go func() { defer close(results) for job := range jobs { select { case <-ctx.Done(): return default: val, err := process(job) results <- Result{Value: val, Err: err} } } }() return results }

七、总结

Go语言的并发模型提供了强大而简洁的并发编程能力:

  1. Goroutine:轻量级执行单元,高效的M:N调度
  2. Channel:类型安全的通信机制,实现无锁并发
  3. 同步原语:Mutex、RWMutex、WaitGroup、Once、Cond
  4. 设计模式:Worker Pool、Pipeline、Fan-Out/Fan-In
  5. 最佳实践:避免共享状态、优雅关闭、错误处理

掌握这些知识,可以编写出高效、安全的并发Go程序。

http://www.rkmt.cn/news/1438622.html

相关文章:

  • 别再只用Aircrack-ng了!用Kali Linux的Kismet做WiFi网络扫描,可视化界面更友好
  • 别再折腾环境了!手把手教你用Vivado 2018.3和Modelsim 22.04搞定联合仿真(附库编译避坑指南)
  • 神经网络与深度学习第四周学习笔记(3/4)
  • 保姆级教程:在Deepin V23上配置xrdp+x11vnc,实现Windows远程桌面稳定连接
  • 从0到1吃透Pandas!Python数据分析零基础实战教程
  • 从‘刻舟求剑’到‘乒乓切换’:图解STM32H7中DMA双缓存与Cache的协同工作
  • 2026年评价高的庐阳区窗帘/合肥窗帘/包河区窗帘/新站区窗帘长期合作厂家推荐 - 品牌宣传支持者
  • 广度优先搜索 (BFS)
  • 2026年质量好的共挤膜气泡膜卷/彩色气泡膜卷可靠供应商推荐 - 行业平台推荐
  • 2026年比较好的梁山水处理乳品设备/梁山乳品设备/离心机乳品设备/均质机乳品设备精选推荐公司 - 行业平台推荐
  • 别再只用Aircrack了!横向评测Kismet与airodump-ng:无线网络扫描工具到底怎么选?
  • 用STM32F103和继电器DIY智能家居:低成本改造台灯与风扇的保姆级教程
  • 构建个人增强系统:从可穿戴设备到生物反馈的实践指南
  • CRAFT框架:大模型驱动的多机器人协同训练技术解析
  • 2026年知名的浙江机房建设方案/机房建设施工方案榜单优选公司 - 行业平台推荐
  • 2026年口碑好的挂布台车/多功能台车/浙江隧道台车高口碑品牌推荐 - 品牌宣传支持者
  • 【Gemini安全红皮书首发】:基于MITRE ATTCK框架的5类攻击面测绘+自动化检测脚本(限前500名开发者领取)
  • 2026年口碑好的硅岩净化板/净化板/岩棉净化板推荐品牌厂家 - 行业平台推荐
  • 基于Azure AI Studio与RAG架构构建私有数据AI助手实战指南
  • 2026年质量好的胡辣汤/逍遥镇胡辣汤/羊肉胡辣汤/面筋胡辣汤加盟热门榜 - 行业平台推荐
  • 深度学习花卉识别笔记
  • 2026年知名的均质机乳品设备/离心机乳品设备主流厂家对比评测 - 品牌宣传支持者
  • 量子密钥分发安全挑战与混合QLSTM防御方案
  • 2026年热门的安防监控弱电工程/园区门禁弱电工程/楼宇安防弱电工程专业公司推荐 - 行业平台推荐
  • DS390芯片4K SRAM配置与栈优化实战
  • Cobalt Strike上线后的实战操作指南:Beacon操控、权限提升与内网横向移动
  • 从特斯拉Optimus看具身智能:人形机器人的技术架构与工程挑战
  • 零基础入门NLP:绕过数学深坑,从实践到应用的完整指南
  • 别再逐行读文件了!Shell脚本处理文本,试试mapfile/readarray这5个高效场景
  • 不想让50G Mod塞爆C盘?手把手教你逆向修改《欧卡2》默认Mod路径(附Patch工具)