并发从程序设计的角度,就是希望通过某些机制让计算机可以在一个时间段内,执行多个任务。让一个或多个物理 CPU 在多个程序之间多路复用,提高对计算机资源的利用率。
Go 语言通过编译器运行时(runtime),从语言上支持了并发的特性。Go 语言的并发通过 goroutine 特性完成。goroutine 类似于线程,但是可以根据需要创建多个 goroutine 并发工作**。goroutine 是由 Go 语言的运行时调度完成,而线程是由操作系统调度完成。**
并发模型一般分为:多进程编程、多线程编程、非阻塞/异步 IO 编程以及基于协程的编程。
Go调度器基于GMP模型工作
Go的并发哲学基于CSP(Communicating Sequential Processes,通信顺序进程)模型。
CSP模型的核心思想是:不要通过共享内存来通信,而要通过通信来共享内存
这一原则鼓励通过显式的消息传递(channel)而非共享状态来协调并发执行,从而减少锁和同步原语的使用。
模式 | 特点 | 典型语言/库 |
---|---|---|
多线程共享内存 | 使用锁保护共享状态 | Java, C++ |
Actor模型 | 独立actors通过消息通信 | Erlang, Akka |
CSP模型 | 通过通道协调并发进程 | Go |
goroutine 是协程的 Go 语言实现,它是语言原生支持的,相对于一般由库实现协程的方式,goroutine 更加强大,它的调度一定程度上是由 go 运行时(runtime)管理。
其好处之一是,当某 goroutine 发生阻塞时(例如同步IO操作等),会自动出让 CPU 给其它 goroutine。
goroutine是非常轻量级的,它就是一段代码,一个函数入口,以及在堆上为其分配的一个堆栈(初始大小为4K,会随着程序的执行自动增长删除)。所以它非常廉价,我们可以很轻松的创建上万个 goroutine。
Go 程序从 main 包的 main()
函数开始,在程序启动时,Go 程序就会为 main()
函数创建一个默认的 goroutine。
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello from goroutine!")
}
func main() {
// 启动一个goroutine
go sayHello()
// 防止主goroutine退出太快
time.Sleep(100 * time.Millisecond)
fmt.Println("Main function finished")
}
go sayHello()
启动了一个新的goroutine执行sayHello
函数time.Sleep
让主goroutine等待Go运行时会在以下情况触发goroutine调度:
runtime.Gosched()
主动让出CPUsync
包中的互斥锁时如下,则是主动让出 CPU
package main
import (
"fmt"
"runtime"
)
func main() {
go func() {
for i := 0; i < 3; i++ {
fmt.Println("Goroutine:", i)
}
}()
for i := 0; i < 3; i++ {
// 让出CPU,给其他goroutine执行的机会
runtime.Gosched()
fmt.Println("Main:", i)
}
}
channel是Go语言提供的一种数据结构,它像一个管道,可以在不同的goroutine之间安全地传递数据。channel有以下特点:
从本质上讲,channel是一个数据结构,内部包含一个缓冲区、一个互斥锁以及两个等待队列(发送者队列和接收者队列)。
在并发编程中,我们通常需要解决以下问题:
传统的共享内存并发模型通常使用锁来解决这些问题,但锁机制容易导致复杂性增加、死锁和性能问题。channel提供了一种更简洁、更符合Go语言哲学的方案。
定义一个 channel 时,也需要定义发送到 channel 的值的类型,注意,chan 类型的空值是 nil,必须使用 make 创建 channel
// 声明,chanName:通道变量名称,chanType:通道内的数据类型
var chanName chan chanType
// 声明并定义
var chanName chan chanType := make(chan chanType)
// 声明并定义,使用自动类型推导
chanName := make(chan chanType)
// 例子
var c chan int = make(chan int)
c := make(chan int)
c := make(chan string)
关闭 channel 非常简单,直接使用Go语言内置的 close()
函数即可
close(chName)
关闭channel时应遵循以下原则:
通道创建后,就可以使用通道进行发送和接收操作。
通道的发送使用特殊的操作符<-
,将数据通过通道发送的格式为:
chanName <- value
把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时能智能地发现一些永远无法发送成功的语句并做出提示,例如,现在创建一个chan,但是没有接收方:
package main
func main() {
ch := make(chan string)
ch <- "hello"
}
/*
fatal error: all goroutines are asleep - deadlock!
*/
运行时发现所有的 goroutine(包括main)都处于等待 goroutine。也就是说所有 goroutine 中的 channel 并没有形成发送和接收对应的代码。
阻塞模式接收数据时,将接收变量作为<-
操作符的左值,执行该语句时将会阻塞,直到接收到数据并赋值给 value变量。
value := <-chanName
阻塞接收数据后,忽略从通道返回的数据,执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步。
<-chanName
使用非阻塞方式从通道接收数据时,语句不会发生阻塞,非阻塞的通道接收方法可能造成高的 CPU 占用,因此使用非常少。
value, ok := <-chanName
/*
value:表示接收到的数据。未接收到数据时,value 为通道类型的零值。
ok:表示是否接收到数据,false表示通道已经关闭且为空
*/
通道的数据接收可以借用 for range 语句进行多个元素的接收操作,使用后所在的goroutine将会阻塞循环接收
ch := make(chan int, 3)
go func() {
ch <- 1
ch <- 2
ch <- 3
close(ch) // 必须关闭,否则下面的range循环会死锁
}()
for v := range ch {
fmt.Println(v)
}
所谓的单向 channel 概念,其实只是对 channel 的一种使用限制,比如限制一个通道在这个函数中的读写,因此,单向通道有利于代码接口的严谨性。
// 只写的通道
var chanName chan<- chanType
// 只读的通道
var chanName <-chan chanType
类型转换关系
func send(ch chan<- int) {
ch <- 42 // 只能发送
// <-ch // 编译错误:不能从只发送channel接收
}
func receive(ch <-chan int) {
v := <-ch // 只能接收
// ch <- 42 // 编译错误:不能向只接收channel发送
}
func main() {
ch := make(chan int) // 双向channel
go send(ch) // 可以将双向channel传给只发送channel参数
go receive(ch) // 可以将双向channel传给只接收channel参数
}
Go语言中无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。
如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。
chanName := make(chan chanType)
chanName := make(chan chanType,0) // 显式指定缓冲大小为0
Go语言中有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。
这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同**:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。**
在无缓冲通道的基础上,为通道增加一个有限大小的存储空间形成带缓冲通道。带缓冲通道在发送时无需等待接收方接收即可完成发送过程,并且不会发生阻塞,只有当存储空间满时才会发生阻塞。同理,如果缓冲通道中有数据,接收时将不会发生阻塞,直到通道中没有数据可读时,通道将会再度阻塞。
chanName := make(chan chanType,size)
/*
chanName:通道实例变量名
chanType:通道所内数据类型
size:通道缓冲区大小
*/
select {
case <-ch1:
// 如果从ch1成功接收数据,则执行此分支
case ch2 <- value:
// 如果成功向ch2发送数据,则执行此分支
case x := <-ch3:
// 如果从ch3成功接收数据,则执行此分支,并将接收的值赋给x
default:
// 如果上面的case都没有准备好,则执行此分支(可选)
}
select{}
)会永远阻塞使用channel发送信号通知其他goroutine某个事件已经发生
func worker(done chan struct{}) {
fmt.Println("工作开始...")
time.Sleep(3 * time.Second)
fmt.Println("工作完成")
done <- struct{}{} // 发送完成信号
}
func main() {
done := make(chan struct{})
go worker(done)
<-done // 等待工作完成
fmt.Println("收到完成信号,主程序继续执行")
}
结合select
和time.After
实现超时控制:
func main() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "操作完成"
}()
select {
case result := <-ch:
fmt.Println(result)
case <-time.After(1 * time.Second):
fmt.Println("操作超时")
}
}
使用带缓冲的channel实现工作池,限制并发数量
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("worker %d started job %d\n", id, job)
time.Sleep(time.Second) // 模拟工作耗时
fmt.Printf("worker %d finished job %d\n", id, job)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 启动3个worker
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送9个任务
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
// 收集结果
for a := 1; a <= 9; a++ {
<-results
}
}
使用channel组合多个处理阶段,形成数据流水线:
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// 组合成处理流水线: generator -> square -> consumer(打印)
for n := range square(generator(1, 2, 3, 4, 5)) {
fmt.Println(n)
}
}
上面的goroutine和channel。这些机制主要基于CSP(通信顺序进程)模型,强调"通过通信来共享内存"。然而,在某些情况下,我们需要直接控制对共享资源的访问。这时,Go语言标准库中的sync
包就派上用场了。
Mutex提供了一种互斥机制,确保同一时间只有一个goroutine可以访问共享资源。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mutex sync.Mutex
counter := 0
// 并发更新counter
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mutex.Lock() // 加锁
defer mutex.Unlock() // 解锁
counter++
}()
}
wg.Wait()
fmt.Println("计数器最终值:", counter) // 输出: 计数器最终值: 1000
}
Lock()
:获取锁。如果锁已被其他goroutine获取,则阻塞直到锁可用Unlock()
:释放锁。应在与Lock()相同的goroutine中调用TryLock()
:(Go 1.18+)尝试获取锁,如果锁不可用则立即返回false而不阻塞defer mutex.Unlock()
确保锁被释放RWMutex允许多个读操作并发执行,但写操作是互斥的。当有写锁时,所有的读操作都会被阻塞。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var rwMutex sync.RWMutex
data := make(map[string]string)
// 写入操作
go func() {
for i := 0; i < 10; i++ {
rwMutex.Lock() // 写锁
key := fmt.Sprintf("key%d", i)
data[key] = fmt.Sprintf("value%d", i)
time.Sleep(100 * time.Millisecond) // 模拟写入耗时
rwMutex.Unlock()
time.Sleep(200 * time.Millisecond) // 给读操作时间
}
}()
// 多个并发读取操作
var wg sync.WaitGroup
for r := 0; r < 5; r++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for i := 0; i < 10; i++ {
rwMutex.RLock() // 读锁
for k, v := range data {
fmt.Printf("读取者 %d: %s = %s\n", id, k, v)
}
rwMutex.RUnlock()
time.Sleep(150 * time.Millisecond)
}
}(r)
}
wg.Wait()
}
Lock()/Unlock()
:获取/释放写锁。与Mutex相同,写操作是互斥的RLock()/RUnlock()
:获取/释放读锁。多个goroutine可以同时持有读锁TryLock()/TryRLock()
:(Go 1.18+)尝试获取写锁/读锁,非阻塞当共享资源的读操作远多于写操作时,RWMutex比Mutex更有效。例如:
WaitGroup用于等待一组goroutine完成执行。它提供了一种简单的方式来协调多个并发操作的完成。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 工作完成时通知WaitGroup
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second) // 模拟工作
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
// 启动5个worker
for i := 1; i <= 5; i++ {
wg.Add(1) // 增加计数器
go worker(i, &wg)
}
// 等待所有worker完成
wg.Wait()
fmt.Println("All workers completed")
}
Add(delta int)
:增加WaitGroup的计数器值Done()
:减少WaitGroup的计数器值,相当于Add(-1)
Wait()
:阻塞直到计数器变为0Add()
Done()
被调用Once用于确保某个函数只被执行一次,即使在多个goroutine中并发调用也是如此。它通常用于单例模式、延迟初始化或执行只需要一次的设置操作。
在下面例子中,尽管有10个goroutine尝试执行初始化函数,但once.Do()
确保只有一个goroutine能够执行它。
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
done := make(chan bool)
// 尝试在多个goroutine中执行初始化
for i := 0; i < 10; i++ {
go func(id int) {
fmt.Printf("Goroutine %d trying to initialize\n", id)
once.Do(func() {
fmt.Printf("Initialization done by goroutine %d\n", id)
})
done <- true
}(i)
}
// 等待所有goroutine完成
for i := 0; i < 10; i++ {
<-done
}
}
Do()
方法完成调用,对同一个Once实例的后续Do()
调用将不会执行提供的函数Do()
调用的函数中发生panic,Once将认为操作已完成Cond实现了一个条件变量,它是等待或宣布事件发生的goroutine的会合点。它允许goroutine等待某个条件成立,然后在条件成立时得到通知。
条件变量总是与互斥锁关联,并通过锁来保护条件的检查和更新。Cond提供了三个主要方法:
Wait()
:释放关联的锁,等待通知,被唤醒后重新获取锁Signal()
:唤醒一个等待的goroutineBroadcast()
:唤醒所有等待的goroutinepackage main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
ready := false
// 消费者goroutine
for i := 0; i < 3; i++ {
go func(id int) {
mu.Lock()
defer mu.Unlock()
fmt.Printf("Consumer %d is waiting...\n", id)
for !ready { // 使用循环检查条件
cond.Wait() // 等待信号
}
fmt.Printf("Consumer %d received signal\n", id)
}(i)
}
// 给消费者一点时间启动
time.Sleep(time.Second)
// 生产者goroutine
go func() {
mu.Lock()
defer mu.Unlock()
fmt.Println("Producer is ready")
ready = true
fmt.Println("Producer broadcasts signal")
cond.Broadcast() // 通知所有等待的goroutine
}()
// 等待足够时间让所有goroutine完成
time.Sleep(3 * time.Second)
}
在上面的例子中:
ready
条件变为trueWait()
等待Broadcast()
通知所有等待的消费者Signal()
:唤醒单个等待者,适用于任务队列等场景,只需一个工作者处理Broadcast()
:唤醒所有等待者,适用于状态变化等场景,需要所有人都知道sync.Pool
提供了一个可以重复使用临时对象的池,有助于减少垃圾回收压力,特别是在高并发环境下。
Pool的主要特性:
Pool提供了两个主要方法:
Get() interface{}
:从池中获取对象。如果池为空,则调用New函数创建一个新对象Put(x interface{})
:将对象放回池中以供后续重用package main
import (
"bytes"
"fmt"
"sync"
)
func main() {
// 创建一个池,用于复用bytes.Buffer
var bufferPool = sync.Pool{
New: func() interface{} {
fmt.Println("Creating a new buffer")
return new(bytes.Buffer)
},
}
// 获取一个Buffer
buffer1 := bufferPool.Get().(*bytes.Buffer)
buffer1.WriteString("Hello")
fmt.Println("Buffer1:", buffer1.String())
// 清空并放回池中
buffer1.Reset()
bufferPool.Put(buffer1)
// 获取一个Buffer(可能是刚才放回的那个)
buffer2 := bufferPool.Get().(*bytes.Buffer)
buffer2.WriteString("World")
fmt.Println("Buffer2:", buffer2.String())
// 清空并放回池中
buffer2.Reset()
bufferPool.Put(buffer2)
// 同时获取多个Buffer
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 获取Buffer
buf := bufferPool.Get().(*bytes.Buffer)
// 使用Buffer
buf.WriteString(fmt.Sprintf("Goroutine %d", id))
fmt.Printf("Goroutine %d: %s\n", id, buf.String())
// 清空并放回
buf.Reset()
bufferPool.Put(buf)
}(i)
}
wg.Wait()
}
并发模式是在并发环境中解决特定问题的常见结构和方法。就像设计模式帮助我们组织代码一样,并发模式帮助我们组织并发逻辑,使其更易于理解、测试和维护。
好的并发模式应该具备以下特点:
掌握并发模式有以下好处:
生产者-消费者是最基本也是最常用的并发模式之一。它将"生产数据"和"消费数据"的过程解耦,通过channel在两者之间传递数据。
package main
import (
"fmt"
"sync"
"time"
)
func producer(jobs chan<- int) {
defer close(jobs)
for i := 1; i <= 5; i++ {
fmt.Printf("生产任务: %d\n", i)
jobs <- i
time.Sleep(time.Millisecond * 500)
}
}
func consumer(id int, jobs <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("消费者 %d 处理任务: %d\n", id, job)
time.Sleep(time.Second) // 模拟处理时间
}
}
func main() {
jobs := make(chan int, 3)
var wg sync.WaitGroup
// 启动一个生产者
go producer(jobs)
// 启动两个消费者
for i := 1; i <= 2; i++ {
wg.Add(1)
go consumer(i, jobs, &wg)
}
wg.Wait()
fmt.Println("所有工作完成")
}
这个例子展示了生产者-消费者模式的基本用法:
现实应用中,我们通常需要处理错误:
package main
import (
"errors"
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Err error
}
type Result struct {
JobID int
Value string
Err error
}
func producer(jobs chan<- Job) {
defer close(jobs)
for i := 1; i <= 5; i++ {
var err error
if i == 3 {
err = errors.New("模拟生产错误")
}
jobs <- Job{ID: i, Err: err}
time.Sleep(time.Millisecond * 500)
}
}
func consumer(jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
if job.Err != nil {
results <- Result{JobID: job.ID, Err: job.Err}
continue
}
// 模拟处理
time.Sleep(time.Second)
var err error
if job.ID == 4 {
err = errors.New("模拟处理错误")
}
result := Result{
JobID: job.ID,
Value: fmt.Sprintf("处理结果 %d", job.ID),
Err: err,
}
results <- result
}
}
func main() {
jobs := make(chan Job, 5)
results := make(chan Result, 5)
var wg sync.WaitGroup
// 启动生产者
go producer(jobs)
// 启动消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(jobs, results, &wg)
}
// 收集结果
go func() {
wg.Wait()
close(results)
}()
// 处理结果
for result := range results {
if result.Err != nil {
fmt.Printf("任务 %d 出错: %v\n", result.JobID, result.Err)
continue
}
fmt.Printf("任务 %d 完成: %s\n", result.JobID, result.Value)
}
}
工作池模式是生产者-消费者模式的扩展,它维护一组工作者(goroutine)来处理一系列任务。与简单的生产者-消费者模式相比,工作池模式更强调工作者的管理和任务的分发。
基本工作池的特点:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("工作者 %d 开始处理任务 %d\n", id, job)
time.Sleep(time.Second) // 模拟工作耗时
fmt.Printf("工作者 %d 完成任务 %d\n", id, job)
results <- job * 2
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
// 启动工作者
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// 等待所有工作者完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("结果: %d\n", result)
}
}
在实际应用中,我们可能需要限制并发数量或请求速率:
限流工作池的关键功能:
package main
import (
"fmt"
"sync"
"time"
)
type RateLimiter struct {
interval time.Duration
ticker *time.Ticker
stopCh chan struct{}
}
func NewRateLimiter(rps int) *RateLimiter {
interval := time.Second / time.Duration(rps)
return &RateLimiter{
interval: interval,
ticker: time.NewTicker(interval),
stopCh: make(chan struct{}),
}
}
func (rl *RateLimiter) Allow() bool {
select {
case <-rl.ticker.C:
return true
case <-rl.stopCh:
return false
default:
return false
}
}
func (rl *RateLimiter) Stop() {
rl.ticker.Stop()
close(rl.stopCh)
}
func processRequest(id int, limiter *RateLimiter, wg *sync.WaitGroup) {
defer wg.Done()
start := time.Now()
// 等待限流器允许
for !limiter.Allow() {
time.Sleep(time.Millisecond * 10)
}
// 模拟处理请求
time.Sleep(time.Millisecond * 50)
fmt.Printf("请求 %d 处理完成,等待时间: %v\n", id, time.Since(start))
}
func main() {
// 每秒5个请求
limiter := NewRateLimiter(5)
defer limiter.Stop()
var wg sync.WaitGroup
// 模拟20个并发请求
for i := 1; i <= 20; i++ {
wg.Add(1)
go processRequest(i, limiter, &wg)
}
wg.Wait()
fmt.Println("所有请求处理完成")
}
管道模式将数据处理分成多个阶段,每个阶段通过channel连接起来。这种模式特别适合处理数据流,每个阶段都可以并发执行。
基本管道模式的特点:
package main
import (
"fmt"
)
// 生成器:生成整数
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// 平方:计算输入值的平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// 过滤:只保留奇数
func onlyOdd(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 != 0 {
out <- n
}
}
}()
return out
}
func main() {
// 构建管道
nums := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squares := square(nums)
odds := onlyOdd(squares)
// 消费结果
for odd := range odds {
fmt.Println(odd)
}
}
实际应用中,我们需要处理每个管道阶段可能出现的错误:
带错误处理的管道特点:
package main
import (
"errors"
"fmt"
)
type Result struct {
Value int
Err error
}
func generator(nums ...int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for _, n := range nums {
if n < 0 {
out <- Result{Err: errors.New("负数不被允许")}
return
}
out <- Result{Value: n}
}
}()
return out
}
func square(in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for res := range in {
if res.Err != nil {
out <- res // 传递错误
continue
}
out <- Result{Value: res.Value * res.Value}
}
}()
return out
}
func main() {
// 构建管道
values := generator(1, 2, 3, -4, 5)
squares := square(values)
// 消费结果
for res := range squares {
if res.Err != nil {
fmt.Printf("错误: %v\n", res.Err)
continue
}
fmt.Println(res.Value)
}
}
管道模式的主要优势:
实际应用场景:
扇出是将任务分配给多个worker并行处理,扇入是将多个结果汇总到一个channel。这种模式适合处理可以并行的任务,然后需要汇总结果的场景。
扇入扇出模式的关键组件:
package main
import (
"fmt"
"sync"
)
// 扇出:将一个输入拆分为多个并行处理
func fanOut(in <-chan int, numWorkers int) []<-chan int {
outputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
outputs[i] = processWorker(in, i)
}
return outputs
}
// 扇入:将多个输入合并到一个输出
func fanIn(inputs []<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个输入启动一个goroutine
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for val := range ch {
out <- val
}
}(input)
}
// 当所有输入处理完毕后关闭输出
go func() {
wg.Wait()
close(out)
}()
return out
}
// 工作者:处理输入并产生输出
func processWorker(in <-chan int, workerID int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for val := range in {
// 模拟处理:计算平方
result := val * val
fmt.Printf("工作者 %d 处理 %d -> %d\n", workerID, val, result)
out <- result
}
}()
return out
}
func main() {
// 创建输入
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 10; i++ {
input <- i
}
}()
// 扇出:分配给3个工作者
workers := fanOut(input, 3)
// 扇入:汇总所有结果
results := fanIn(workers)
// 处理最终结果
var sum int
for res := range results {
sum += res
}
fmt.Printf("所有结果的总和: %d\n", sum)
}
扇入扇出模式在以下场景特别有用:
在并发程序中,我们常常需要处理超时和取消操作。Go的context包和select语句提供了优雅的方式来实现这些功能。
Context的核心功能:
context.WithTimeout
和context.WithDeadline
context.WithCancel
context.WithValue
Done()
channel通知取消事件package main
import (
"context"
"fmt"
"sync"
"time"
)
// 一个可能耗时的操作
func slowOperation(ctx context.Context) (string, error) {
select {
case <-time.After(2 * time.Second):
return "操作完成", nil
case <-ctx.Done():
return "", ctx.Err()
}
}
// 使用超时控制
func withTimeout() {
// 创建一个带1秒超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel() // 确保取消函数被调用
result, err := slowOperation(ctx)
if err != nil {
fmt.Printf("超时示例出错: %v\n", err)
return
}
fmt.Printf("超时示例结果: %s\n", result)
}
// 使用手动取消
func withCancellation() {
ctx, cancel := context.WithCancel(context.Background())
// 启动一个goroutine在1秒后取消
time.AfterFunc(1*time.Second, cancel)
result, err := slowOperation(ctx)
if err != nil {
fmt.Printf("取消示例出错: %v\n", err)
return
}
fmt.Printf("取消示例结果: %s\n", result)
}
// 使用上下文控制多个goroutine
func workerWithContext(ctx context.Context, id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("工作者 %d 启动\n", id)
select {
case <-time.After(3 * time.Second):
fmt.Printf("工作者 %d 完成工作\n", id)
case <-ctx.Done():
fmt.Printf("工作者 %d 接收到取消信号: %v\n", id, ctx.Err())
}
}
func multipleWorkers() {
// 创建一个2秒后自动取消的上下文
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
var wg sync.WaitGroup
// 启动多个工作者
for i := 1; i <= 3; i++ {
wg.Add(1)
go workerWithContext(ctx, i, &wg)
}
// 等待工作者完成或超时
wg.Wait()
fmt.Println("所有工作者已退出")
}
func main() {
fmt.Println("-- 超时示例 --")
withTimeout()
fmt.Println("\n-- 取消示例 --")
withCancellation()
fmt.Println("\n-- 多工作者示例 --")
multipleWorkers()
}
在网络请求等不稳定操作中,重试是常见需求:
重试模式的关键点:
package main
import (
"context"
"errors"
"fmt"
"math/rand"
"time"
)
// 模拟可能失败的操作
func doOperation(ctx context.Context) (string, error) {
// 模拟随机失败
if rand.Float32() < 0.7 {
return "", errors.New("操作失败")
}
select {
case <-time.After(500 * time.Millisecond):
return "操作成功", nil
case <-ctx.Done():
return "", ctx.Err()
}
}
// 带重试的操作
func doOperationWithRetry(maxRetries int, timeout time.Duration) (string, error) {
var lastErr error
for retry := 0; retry < maxRetries; retry++ {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
result, err := doOperation(ctx)
cancel() // 及时释放资源
if err == nil {
return result, nil
}
lastErr = err
fmt.Printf("尝试 %d 失败: %v,重试中...\n", retry+1, err)
time.Sleep(time.Millisecond * 200 * time.Duration(retry+1)) // 退避策略
}
return "", fmt.Errorf("在 %d 次尝试后失败: %w", maxRetries, lastErr)
}
func main() {
rand.Seed(time.Now().UnixNano())
result, err := doOperationWithRetry(5, 800*time.Millisecond)
if err != nil {
fmt.Printf("最终错误: %v\n", err)
return
}
fmt.Printf("最终结果: %s\n", result)
}
这些模式在以下场景尤为重要:
有时我们需要精确控制并发数量或并发操作。这在资源受限的环境中尤为重要。
使用带缓冲的channel实现信号量:
信号量模式的特点:
package main
import (
"fmt"
"sync"
"time"
)
// 信号量实现
type Semaphore struct {
tokens chan struct{}
}
func NewSemaphore(limit int) *Semaphore {
return &Semaphore{
tokens: make(chan struct{}, limit),
}
}
func (s *Semaphore) Acquire() {
s.tokens <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.tokens
}
// 并发执行但限制最大并发数
func processWithLimit(items []int, concurrency int) {
sem := NewSemaphore(concurrency)
var wg sync.WaitGroup
for i, item := range items {
wg.Add(1)
// 获取令牌
sem.Acquire()
go func(id, val int) {
defer wg.Done()
defer sem.Release() // 释放令牌
// 模拟处理
fmt.Printf("处理项目 %d: %d 开始\n", id, val)
time.Sleep(time.Second)
fmt.Printf("处理项目 %d: %d 完成\n", id, val)
}(i, item)
}
wg.Wait()
}
func main() {
items := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
fmt.Println("限制最大3个并发:")
processWithLimit(items, 3)
}
有些应用需要在某些阶段并发,而在另一些阶段串行执行:
这种模式适用于:
package main
import (
"fmt"
"sync"
"time"
)
// 串行阶段
func prepare(data []int) []int {
fmt.Println("准备阶段(串行)开始")
result := make([]int, len(data))
for i, val := range data {
result[i] = val * 2
time.Sleep(100 * time.Millisecond) // 模拟处理
}
fmt.Println("准备阶段(串行)完成")
return result
}
// 并行阶段
func process(data []int) []int {
fmt.Println("处理阶段(并行)开始")
result := make([]int, len(data))
var wg sync.WaitGroup
for i, val := range data {
wg.Add(1)
go func(idx, value int) {
defer wg.Done()
// 模拟复杂处理
time.Sleep(500 * time.Millisecond)
result[idx] = value * value
fmt.Printf("处理项目 %d 完成\n", idx)
}(i, val)
}
wg.Wait()
fmt.Println("处理阶段(并行)完成")
return result
}
// 串行阶段
func finalize(data []int) int {
fmt.Println("最终阶段(串行)开始")
sum := 0
for _, val := range data {
sum += val
time.Sleep(100 * time.Millisecond) // 模拟处理
}
fmt.Println("最终阶段(串行)完成")
return sum
}
func main() {
input := []int{1, 2, 3, 4, 5}
// 串行-并行-串行处理流程
prepared := prepare(input) // 串行
processed := process(prepared) // 并行
result := finalize(processed) // 串行
fmt.Printf("最终结果: %d\n", result)
}