Go语言并发模式深度解析
Go语言并发模式深度解析
Go语言以其独特的并发模型而闻名,goroutine和channel是Go并发编程的核心。本文将深入探讨Go语言中的并发模式和最佳实践。
一、Goroutine原理
1.1 Goroutine与线程的区别
func main() { // 创建10万个goroutine for i := 0; i < 100000; i++ { go func(id int) { fmt.Printf("Goroutine %d\n", id) }(i) } }| 特性 | Goroutine | OS线程 |
|---|---|---|
| 栈大小 | 初始2KB,可增长 | 通常1MB |
| 创建开销 | ~2KB栈 + 少量结构 | 内存分配 + 内核调度 |
| 上下文切换 | 用户态,快速 | 内核态,较慢 |
| 数量限制 | 可创建数万 | 通常数百 |
1.2 Goroutine调度器
// GPM调度模型 type g struct { stack stack stackguard0 uintptr stackguard1 uintptr _panic *_panic _defer *_defer m *m sched gobuf } type m struct { g0 *g curg *g p puintptr nextp puintptr alllink *m schedtick int32 } type p struct { mu mutex id int32 status uint32 m *m runqhead uint32 runqtail uint32 runq [256]guintptr gFree *g }1.3 调度策略
// work-stealing算法 func schedule() { for { gp := runqget(_g_.m.p.ptr()) if gp == nil { gp = findrunnable() } execute(gp, false) } } func runqget(p *p) *g { for { head := atomic.Load(&p.runqhead) tail := p.runqtail if head == tail { return nil } gp := p.runq[head%uint32(len(p.runq))].ptr() if atomic.Cas(&p.runqhead, head, head+1) { return gp } } }二、Channel深度解析
2.1 Channel类型
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters lock mutex } type waitq struct { first *sudog last *sudog }2.2 Channel操作
// 发送操作 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = cputicks() } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) return true }2.3 Channel模式
// 模式1:生产者-消费者 func producer(ch chan<- int) { for i := 0; i < 10; i++ { ch <- i } close(ch) } func consumer(ch <-chan int) { for val := range ch { fmt.Println(val) } } // 模式2:扇出 func fanOut(input <-chan int, n int) []<-chan int { channels := make([]<-chan int, n) for i := 0; i < n; i++ { channels[i] = process(input) } return channels } func process(input <-chan int) <-chan int { out := make(chan int) go func() { for val := range input { out <- val * 2 } close(out) }() return out } // 模式3:扇入 func fanIn(channels ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup wg.Add(len(channels)) for _, ch := range channels { go func(c <-chan int) { defer wg.Done() for val := range c { out <- val } }(ch) } go func() { wg.Wait() close(out) }() return out }三、同步原语
3.1 Mutex与RWMutex
type SafeCounter struct { mu sync.Mutex value int } func (c *SafeCounter) Inc() { c.mu.Lock() defer c.mu.Unlock() c.value++ } func (c *SafeCounter) Get() int { c.mu.Lock() defer c.mu.Unlock() return c.value } // RWMutex适合读多写少场景 type SharedData struct { mu sync.RWMutex data map[string]string } func (d *SharedData) Read(key string) (string, bool) { d.mu.RLock() defer d.mu.RUnlock() val, ok := d.data[key] return val, ok } func (d *SharedData) Write(key, value string) { d.mu.Lock() defer d.mu.Unlock() d.data[key] = value }3.2 WaitGroup
func processItems(items []Item) error { var wg sync.WaitGroup errs := make(chan error, len(items)) for _, item := range items { wg.Add(1) go func(i Item) { defer wg.Done() if err := process(i); err != nil { errs <- err } }(item) } go func() { wg.Wait() close(errs) }() for err := range errs { return err } return nil }3.3 Once
type Config struct { once sync.Once loaded bool data map[string]string } func (c *Config) Load() { c.once.Do(func() { c.data = loadConfigFromFile() c.loaded = true }) }3.4 Cond
type Queue struct { mu sync.Mutex cond *sync.Cond items []interface{} } func NewQueue() *Queue { q := &Queue{ items: make([]interface{}, 0), } q.cond = sync.NewCond(&q.mu) return q } func (q *Queue) Enqueue(item interface{}) { q.mu.Lock() defer q.mu.Unlock() q.items = append(q.items, item) q.cond.Signal() } func (q *Queue) Dequeue() interface{} { q.mu.Lock() defer q.mu.Unlock() for len(q.items) == 0 { q.cond.Wait() } item := q.items[0] q.items = q.items[1:] return item }四、并发设计模式
4.1 Worker Pool模式
type WorkerPool struct { workers int tasks chan Task wg sync.WaitGroup } func NewWorkerPool(workers int) *WorkerPool { return &WorkerPool{ workers: workers, tasks: make(chan Task, 100), } } func (p *WorkerPool) Start() { for i := 0; i < p.workers; i++ { p.wg.Add(1) go p.worker() } } func (p *WorkerPool) worker() { defer p.wg.Done() for task := range p.tasks { task.Execute() } } func (p *WorkerPool) Submit(task Task) { p.tasks <- task } func (p *WorkerPool) Stop() { close(p.tasks) p.wg.Wait() }4.2 Context模式
func fetchData(ctx context.Context, url string) (string, error) { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return "", err } resp, err := http.DefaultClient.Do(req) if err != nil { return "", err } defer resp.Body.Close() return io.ReadAll(resp.Body) } func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() data, err := fetchData(ctx, "http://example.com") if err != nil { log.Fatal(err) } fmt.Println(data) }4.3 Pipeline模式
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out } func filterEven(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { if n%2 == 0 { out <- n } } close(out) }() return out } func main() { c := gen(1, 2, 3, 4, 5) s := sq(c) f := filterEven(s) for n := range f { fmt.Println(n) } }五、并发安全数据结构
5.1 并发安全Map
type ConcurrentMap struct { mu sync.RWMutex items map[string]interface{} } func NewConcurrentMap() *ConcurrentMap { return &ConcurrentMap{ items: make(map[string]interface{}), } } func (m *ConcurrentMap) Get(key string) (interface{}, bool) { m.mu.RLock() defer m.mu.RUnlock() val, ok := m.items[key] return val, ok } func (m *ConcurrentMap) Set(key string, value interface{}) { m.mu.Lock() defer m.mu.Unlock() m.items[key] = value } func (m *ConcurrentMap) Delete(key string) { m.mu.Lock() defer m.mu.Unlock() delete(m.items, key) }5.2 并发安全队列
type ConcurrentQueue struct { mu sync.Mutex items []interface{} } func NewConcurrentQueue() *ConcurrentQueue { return &ConcurrentQueue{ items: make([]interface{}, 0), } } func (q *ConcurrentQueue) Enqueue(item interface{}) { q.mu.Lock() defer q.mu.Unlock() q.items = append(q.items, item) } func (q *ConcurrentQueue) Dequeue() (interface{}, bool) { q.mu.Lock() defer q.mu.Unlock() if len(q.items) == 0 { return nil, false } item := q.items[0] q.items = q.items[1:] return item, true } func (q *ConcurrentQueue) Len() int { q.mu.RLock() defer q.mu.RUnlock() return len(q.items) }六、并发最佳实践
6.1 避免共享状态
// 不好的做法:共享状态 var counter int var mu sync.Mutex func increment() { mu.Lock() counter++ mu.Unlock() } // 好的做法:通过channel传递状态 func counterWithChannel(start int) <-chan int { out := make(chan int) go func() { for i := start; ; i++ { out <- i } }() return out }6.2 优雅关闭
func worker(ctx context.Context, jobs <-chan Job) { for { select { case job, ok := <-jobs: if !ok { return } process(job) case <-ctx.Done(): return } } }6.3 错误处理
type Result struct { Value interface{} Err error } func processWithErrorHandling(ctx context.Context, jobs <-chan Job) <-chan Result { results := make(chan Result) go func() { defer close(results) for job := range jobs { select { case <-ctx.Done(): return default: val, err := process(job) results <- Result{Value: val, Err: err} } } }() return results }七、总结
Go语言的并发模型提供了强大而简洁的并发编程能力:
- Goroutine:轻量级执行单元,高效的M:N调度
- Channel:类型安全的通信机制,实现无锁并发
- 同步原语:Mutex、RWMutex、WaitGroup、Once、Cond
- 设计模式:Worker Pool、Pipeline、Fan-Out/Fan-In
- 最佳实践:避免共享状态、优雅关闭、错误处理
掌握这些知识,可以编写出高效、安全的并发Go程序。
