Go语言的并发模式

张开发
2026/4/17 8:49:28 15 分钟阅读

分享文章

Go语言的并发模式
Go语言的并发模式1. 并发的基础概念1.1 什么是并发并发是指同时处理多个任务的能力在Go语言中并发通过goroutine实现并发可以提高程序的性能和响应速度1.2 Go语言的并发优势轻量级的goroutine基于channel的通信机制简洁的并发原语强大的标准库支持2. 基本并发模式2.1 工作池模式package main import ( fmt sync ) func worker(id int, jobs -chan int, results chan- int) { for j : range jobs { fmt.Printf(Worker %d processing job %d\n, id, j) results - j * 2 } } func main() { const numJobs 5 const numWorkers 3 jobs : make(chan int, numJobs) results : make(chan int, numJobs) // 启动工作协程 for w : 1; w numWorkers; w { go worker(w, jobs, results) } // 发送任务 for j : 1; j numJobs; j { jobs - j } close(jobs) // 收集结果 for a : 1; a numJobs; a { -results } }2.2 扇入扇出模式package main import ( fmt time ) func gen(nums ...int) -chan int { out : make(chan int) go func() { for _, n : range nums { out - n } close(out) }() return out } func sq(in -chan int) -chan int { out : make(chan int) go func() { for n : range in { out - n * n } close(out) }() return out } func merge(cs ...-chan int) -chan int { var wg sync.WaitGroup out : make(chan int) output : func(c -chan int) { for n : range c { out - n } wg.Done() } wg.Add(len(cs)) for _, c : range cs { go output(c) } go func() { wg.Wait() close(out) }() return out } func main() { in : gen(2, 3, 4, 5, 6, 7, 8, 9) // 扇出多个goroutine从同一个channel读取数据 c1 : sq(in) c2 : sq(in) c3 : sq(in) // 扇入一个goroutine从多个channel读取数据 for n : range merge(c1, c2, c3) { fmt.Println(n) } }2.3 管道模式package main import ( fmt ) func main() { // 创建管道 c : make(chan int) // 启动goroutine go func() { for i : 0; i 10; i { c - i } close(c) }() // 读取数据 for v : range c { fmt.Println(v) } }3. 高级并发模式3.1 Context模式package main import ( context fmt time ) func worker(ctx context.Context, id int) { for { select { case -ctx.Done(): fmt.Printf(Worker %d canceled\n, id) return default: fmt.Printf(Worker %d working\n, id) time.Sleep(500 * time.Millisecond) } } } func main() { // 创建上下文 ctx, cancel : context.WithCancel(context.Background()) // 启动工作协程 for i : 1; i 3; i { go worker(ctx, i) } // 等待一段时间后取消 time.Sleep(2 * time.Second) fmt.Println(Canceling workers) cancel() // 等待协程结束 time.Sleep(1 * time.Second) fmt.Println(Done) }3.2 ErrGroup模式package main import ( context fmt net/http golang.org/x/sync/errgroup ) func main() { g, ctx : errgroup.WithContext(context.Background()) // 启动多个HTTP服务器 for i : 0; i 3; i { port : 8080 i g.Go(func() error { server : http.Server{ Addr: fmt.Sprintf(:%d, port), Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, Hello from port %d\n, port) }), } // 使用带取消的上下文 go func() { -ctx.Done() server.Shutdown(context.Background()) }() return server.ListenAndServe() }) } // 等待所有服务器启动或出错 if err : g.Wait(); err ! nil { fmt.Println(Error:, err) } }3.3 互斥锁与读写锁package main import ( fmt sync time ) // 使用互斥锁 type Counter struct { mu sync.Mutex count int } func (c *Counter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.count } func (c *Counter) Get() int { c.mu.Lock() defer c.mu.Unlock() return c.count } // 使用读写锁 type RWCounter struct { mu sync.RWMutex count int } func (c *RWCounter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.count } func (c *RWCounter) Get() int { c.mu.RLock() defer c.mu.RUnlock() return c.count } func main() { // 测试互斥锁 counter : Counter{} var wg sync.WaitGroup for i : 0; i 1000; i { wg.Add(1) go func() { defer wg.Done() counter.Increment() }() } wg.Wait() fmt.Println(Counter value:, counter.Get()) // 测试读写锁 rwCounter : RWCounter{} // 启动多个读协程 for i : 0; i 100; i { wg.Add(1) go func() { defer wg.Done() rwCounter.Get() }() } // 启动写协程 for i : 0; i 10; i { wg.Add(1) go func() { defer wg.Done() rwCounter.Increment() }() } wg.Wait() fmt.Println(RWCounter value:, rwCounter.Get()) }4. 并发安全的数据结构4.1 原子操作package main import ( fmt sync sync/atomic ) func main() { var counter int32 var wg sync.WaitGroup // 启动多个协程增加计数器 for i : 0; i 1000; i { wg.Add(1) go func() { defer wg.Done() atomic.AddInt32(counter, 1) }() } wg.Wait() fmt.Println(Counter:, atomic.LoadInt32(counter)) }4.2 并发安全的Mappackage main import ( fmt sync ) func main() { // 使用sync.Map var m sync.Map var wg sync.WaitGroup // 写入数据 for i : 0; i 100; i { wg.Add(1) go func(i int) { defer wg.Done() m.Store(i, i*2) }(i) } wg.Wait() // 读取数据 m.Range(func(key, value interface{}) bool { fmt.Printf(%v: %v\n, key, value) return true }) }5. 并发模式的最佳实践5.1 设计原则优先使用channel进行通信避免共享状态使用适当的同步原语合理控制goroutine数量5.2 常见问题死锁竞态条件内存泄漏过度并发5.3 调试技巧使用race detector日志记录性能分析代码审查6. 实战案例6.1 并发爬虫package main import ( fmt net/http sync golang.org/x/net/html ) func main() { // 起始URL startURL : https://example.com // 用于存储已访问的URL var visited make(map[string]bool) var mu sync.Mutex // 通道用于传递URL urls : make(chan string) // 启动工作协程 var wg sync.WaitGroup for i : 0; i 5; i { wg.Add(1) go func() { defer wg.Done() for url : range urls { crawl(url, urls, visited, mu) } }() } // 发送起始URL urls - startURL // 等待爬取完成 wg.Wait() close(urls) fmt.Printf(Crawled %d URLs\n, len(visited)) } func crawl(url string, urls chan- string, visited *map[string]bool, mu *sync.Mutex) { // 检查是否已访问 mu.Lock() if (*visited)[url] { mu.Unlock() return } (*visited)[url] true mu.Unlock() fmt.Printf(Crawling %s\n, url) // 获取页面内容 resp, err : http.Get(url) if err ! nil { fmt.Printf(Error fetching %s: %v\n, url, err) return } defer resp.Body.Close() // 解析HTML if resp.Header.Get(Content-Type) ! text/html { return } doc, err : html.Parse(resp.Body) if err ! nil { fmt.Printf(Error parsing %s: %v\n, url, err) return } // 提取链接 extractLinks(doc, url, urls) } func extractLinks(n *html.Node, baseURL string, urls chan- string) { if n.Type html.ElementNode n.Data a { for _, attr : range n.Attr { if attr.Key href { // 处理相对URL // 这里简化处理实际需要使用url包解析 urls - attr.Val break } } } for c : n.FirstChild; c ! nil; c c.NextSibling { extractLinks(c, baseURL, urls) } }6.2 并发文件处理package main import ( fmt io/ioutil os path/filepath sync ) func main() { // 目录路径 dir : ./files // 读取目录中的所有文件 files, err : ioutil.ReadDir(dir) if err ! nil { fmt.Println(Error reading directory:, err) return } // 通道用于传递文件路径 filePaths : make(chan string) // 启动工作协程 var wg sync.WaitGroup for i : 0; i 3; i { wg.Add(1) go func() { defer wg.Done() for path : range filePaths { processFile(path) } }() } // 发送文件路径 for _, file : range files { if !file.IsDir() { filePaths - filepath.Join(dir, file.Name()) } } close(filePaths) wg.Wait() fmt.Println(All files processed) } func processFile(path string) { fmt.Printf(Processing %s\n, path) // 读取文件内容 data, err : ioutil.ReadFile(path) if err ! nil { fmt.Printf(Error reading file %s: %v\n, path, err) return } // 处理文件内容 // 这里简化处理实际可以根据需求进行处理 fmt.Printf(File %s has %d bytes\n, path, len(data)) }7. 性能优化7.1 并发性能考虑goroutine的开销channel的性能锁的竞争内存分配7.2 优化技巧减少goroutine的创建使用缓冲通道避免锁竞争合理使用原子操作8. 总结Go语言的并发模式是其最强大的特性之一通过goroutine和channel我们可以轻松实现高效的并发程序。本文介绍了Go语言中常见的并发模式包括工作池模式、扇入扇出模式、管道模式、Context模式、ErrGroup模式等以及并发安全的数据结构和最佳实践。在实际开发中我们应该根据具体的应用场景选择合适的并发模式并且注意避免常见的并发问题如死锁、竞态条件、内存泄漏等。通过合理的并发设计我们可以充分利用多核处理器的优势提高程序的性能和响应速度。希望本文对你理解和应用Go语言的并发模式有所帮助祝你在Go语言的道路上越走越远

更多文章