5、并发编程

并发编程概念

并发从程序设计的角度,就是希望通过某些机制让计算机可以在一个时间段内,执行多个任务。让一个或多个物理 CPU 在多个程序之间多路复用,提高对计算机资源的利用率。

Go 语言通过编译器运行时(runtime),从语言上支持了并发的特性。Go 语言的并发通过 goroutine 特性完成。goroutine 类似于线程,但是可以根据需要创建多个 goroutine 并发工作**。goroutine 是由 Go 语言的运行时调度完成,而线程是由操作系统调度完成。**

并发编程实现模型

并发模型一般分为:多进程编程、多线程编程、非阻塞/异步 IO 编程以及基于协程的编程。

常见概念

goroutine与线程的区别

GMP模型

Go调度器基于GMP模型工作

image-20250515175138342

CSP 模型

Go的并发哲学基于CSP(Communicating Sequential Processes,通信顺序进程)模型。

CSP原则

CSP模型的核心思想是:不要通过共享内存来通信,而要通过通信来共享内存

这一原则鼓励通过显式的消息传递(channel)而非共享状态来协调并发执行,从而减少锁和同步原语的使用。

并发模式对比

模式 特点 典型语言/库
多线程共享内存 使用锁保护共享状态 Java, C++
Actor模型 独立actors通过消息通信 Erlang, Akka
CSP模型 通过通道协调并发进程 Go

goroutine

goroutine 是协程的 Go 语言实现,它是语言原生支持的,相对于一般由库实现协程的方式,goroutine 更加强大,它的调度一定程度上是由 go 运行时(runtime)管理。

其好处之一是,当某 goroutine 发生阻塞时(例如同步IO操作等),会自动出让 CPU 给其它 goroutine。

goroutine是非常轻量级的,它就是一段代码,一个函数入口,以及在堆上为其分配的一个堆栈(初始大小为4K,会随着程序的执行自动增长删除)。所以它非常廉价,我们可以很轻松的创建上万个 goroutine。

Go 程序从 main 包的 main() 函数开始,在程序启动时,Go 程序就会为 main() 函数创建一个默认的 goroutine。

启动与调度

启动

package main

import (
    "fmt"
    "time"
)

func sayHello() {
    fmt.Println("Hello from goroutine!")
}

func main() {
    // 启动一个goroutine
    go sayHello()
    
    // 防止主goroutine退出太快
    time.Sleep(100 * time.Millisecond)
    fmt.Println("Main function finished")
}

调度时机

Go运行时会在以下情况触发goroutine调度:

如下,则是主动让出 CPU

package main

import (
    "fmt"
    "runtime"
)

func main() {
    go func() {
        for i := 0; i < 3; i++ {
            fmt.Println("Goroutine:", i)
        }
    }()
    
    for i := 0; i < 3; i++ {
        // 让出CPU,给其他goroutine执行的机会
        runtime.Gosched()
        fmt.Println("Main:", i)
    }
}

channel

channel是Go语言提供的一种数据结构,它像一个管道,可以在不同的goroutine之间安全地传递数据。channel有以下特点:

从本质上讲,channel是一个数据结构,内部包含一个缓冲区、一个互斥锁以及两个等待队列(发送者队列和接收者队列)。

为什么需要channel

在并发编程中,我们通常需要解决以下问题:

传统的共享内存并发模型通常使用锁来解决这些问题,但锁机制容易导致复杂性增加、死锁和性能问题。channel提供了一种更简洁、更符合Go语言哲学的方案。

声明和创建

定义一个 channel 时,也需要定义发送到 channel 的值的类型,注意,chan 类型的空值是 nil,必须使用 make 创建 channel

// 声明,chanName:通道变量名称,chanType:通道内的数据类型
var chanName chan chanType
// 声明并定义
var chanName chan chanType := make(chan chanType)
// 声明并定义,使用自动类型推导
chanName := make(chan chanType)

// 例子
var c chan int = make(chan int)
c := make(chan int)
c := make(chan string)

关闭通道

关闭 channel 非常简单,直接使用Go语言内置的 close() 函数即可

close(chName)

关闭channel时应遵循以下原则:

发送和接收

通道创建后,就可以使用通道进行发送和接收操作。

发送

通道的发送使用特殊的操作符<-,将数据通过通道发送的格式为:

chanName <- value

把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时能智能地发现一些永远无法发送成功的语句并做出提示,例如,现在创建一个chan,但是没有接收方:

package main

func main() {
	ch := make(chan string)
	ch <- "hello"
}

/*
fatal error: all goroutines are asleep - deadlock!
*/

运行时发现所有的 goroutine(包括main)都处于等待 goroutine。也就是说所有 goroutine 中的 channel 并没有形成发送和接收对应的代码。

接收

阻塞接收数据

阻塞模式接收数据时,将接收变量作为<-操作符的左值,执行该语句时将会阻塞,直到接收到数据并赋值给 value变量。

value := <-chanName

阻塞接收数据后,忽略从通道返回的数据,执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步。

<-chanName
非阻塞接收数据

使用非阻塞方式从通道接收数据时,语句不会发生阻塞,非阻塞的通道接收方法可能造成高的 CPU 占用,因此使用非常少。

value, ok := <-chanName
/*
  value:表示接收到的数据。未接收到数据时,value 为通道类型的零值。
  ok:表示是否接收到数据,false表示通道已经关闭且为空
*/
循环接收

通道的数据接收可以借用 for range 语句进行多个元素的接收操作,使用后所在的goroutine将会阻塞循环接收

ch := make(chan int, 3)
go func() {
    ch <- 1
    ch <- 2
    ch <- 3
    close(ch) // 必须关闭,否则下面的range循环会死锁
}()

for v := range ch {
    fmt.Println(v)
}

单向通道

所谓的单向 channel 概念,其实只是对 channel 的一种使用限制,比如限制一个通道在这个函数中的读写,因此,单向通道有利于代码接口的严谨性。

// 只写的通道
var chanName chan<- chanType
// 只读的通道
var chanName <-chan chanType

类型转换关系

func send(ch chan<- int) {
    ch <- 42 // 只能发送
    // <-ch  // 编译错误:不能从只发送channel接收
}

func receive(ch <-chan int) {
    v := <-ch // 只能接收
    // ch <- 42 // 编译错误:不能向只接收channel发送
}

func main() {
    ch := make(chan int) // 双向channel
    go send(ch)    // 可以将双向channel传给只发送channel参数
    go receive(ch) // 可以将双向channel传给只接收channel参数
}

无缓冲与缓冲channel

无缓冲通道

Go语言中无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。

如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。

创建无缓冲的通道
chanName := make(chan chanType)
chanName := make(chan chanType,0) // 显式指定缓冲大小为0

缓冲通道

Go语言中有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞

这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同**:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。**

在无缓冲通道的基础上,为通道增加一个有限大小的存储空间形成带缓冲通道。带缓冲通道在发送时无需等待接收方接收即可完成发送过程,并且不会发生阻塞,只有当存储空间满时才会发生阻塞。同理,如果缓冲通道中有数据,接收时将不会发生阻塞,直到通道中没有数据可读时,通道将会再度阻塞。

创建带缓冲的通道
chanName := make(chan chanType,size)
/*
  chanName:通道实例变量名
  chanType:通道所内数据类型
  size:通道缓冲区大小
*/

select 语句

工作原理

语法

select {
case <-ch1:
    // 如果从ch1成功接收数据,则执行此分支
case ch2 <- value:
    // 如果成功向ch2发送数据,则执行此分支
case x := <-ch3:
    // 如果从ch3成功接收数据,则执行此分支,并将接收的值赋给x
default:
    // 如果上面的case都没有准备好,则执行此分支(可选)
}

特性与规则

channel 应用场景

信号通知

使用channel发送信号通知其他goroutine某个事件已经发生

func worker(done chan struct{}) {
    fmt.Println("工作开始...")
    time.Sleep(3 * time.Second)
    fmt.Println("工作完成")
    done <- struct{}{} // 发送完成信号
}

func main() {
    done := make(chan struct{})
    go worker(done)
    
    <-done // 等待工作完成
    fmt.Println("收到完成信号,主程序继续执行")
}

控制超时

结合selecttime.After实现超时控制:

func main() {
    ch := make(chan string)
    
    go func() {
        time.Sleep(2 * time.Second)
        ch <- "操作完成"
    }()
    
    select {
    case result := <-ch:
        fmt.Println(result)
    case <-time.After(1 * time.Second):
        fmt.Println("操作超时")
    }
}

工作池模式

使用带缓冲的channel实现工作池,限制并发数量

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("worker %d started job %d\n", id, job)
        time.Sleep(time.Second) // 模拟工作耗时
        fmt.Printf("worker %d finished job %d\n", id, job)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // 启动3个worker
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }
    
    // 发送9个任务
    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 收集结果
    for a := 1; a <= 9; a++ {
        <-results
    }
}

流水线模式

使用channel组合多个处理阶段,形成数据流水线:

func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // 组合成处理流水线: generator -> square -> consumer(打印)
    for n := range square(generator(1, 2, 3, 4, 5)) {
        fmt.Println(n)
    }
}

sync 包

上面的goroutine和channel。这些机制主要基于CSP(通信顺序进程)模型,强调"通过通信来共享内存"。然而,在某些情况下,我们需要直接控制对共享资源的访问。这时,Go语言标准库中的sync包就派上用场了。

Mutex(互斥锁)

Mutex提供了一种互斥机制,确保同一时间只有一个goroutine可以访问共享资源。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mutex sync.Mutex
    counter := 0
    
    // 并发更新counter
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mutex.Lock()         // 加锁
            defer mutex.Unlock() // 解锁
            counter++
        }()
    }
    
    wg.Wait()
    fmt.Println("计数器最终值:", counter) // 输出: 计数器最终值: 1000
}

常用方法

注意点

RWMutex(读写锁)

RWMutex允许多个读操作并发执行,但写操作是互斥的。当有写锁时,所有的读操作都会被阻塞。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var rwMutex sync.RWMutex
    data := make(map[string]string)
    
    // 写入操作
    go func() {
        for i := 0; i < 10; i++ {
            rwMutex.Lock() // 写锁
            key := fmt.Sprintf("key%d", i)
            data[key] = fmt.Sprintf("value%d", i)
            time.Sleep(100 * time.Millisecond) // 模拟写入耗时
            rwMutex.Unlock()
            time.Sleep(200 * time.Millisecond) // 给读操作时间
        }
    }()
    
    // 多个并发读取操作
    var wg sync.WaitGroup
    for r := 0; r < 5; r++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            for i := 0; i < 10; i++ {
                rwMutex.RLock() // 读锁
                for k, v := range data {
                    fmt.Printf("读取者 %d: %s = %s\n", id, k, v)
                }
                rwMutex.RUnlock()
                time.Sleep(150 * time.Millisecond)
            }
        }(r)
    }
    
    wg.Wait()
}

常用方法

使用场景

当共享资源的读操作远多于写操作时,RWMutex比Mutex更有效。例如:

WaitGroup(等待组)

WaitGroup用于等待一组goroutine完成执行。它提供了一种简单的方式来协调多个并发操作的完成。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 工作完成时通知WaitGroup
    
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second) // 模拟工作
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    // 启动5个worker
    for i := 1; i <= 5; i++ {
        wg.Add(1) // 增加计数器
        go worker(i, &wg)
    }
    
    // 等待所有worker完成
    wg.Wait()
    fmt.Println("All workers completed")
}

常用方法

注意点

Once(只被执行一次)

Once用于确保某个函数只被执行一次,即使在多个goroutine中并发调用也是如此。它通常用于单例模式、延迟初始化或执行只需要一次的设置操作。

在下面例子中,尽管有10个goroutine尝试执行初始化函数,但once.Do()确保只有一个goroutine能够执行它。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var once sync.Once
    done := make(chan bool)
    
    // 尝试在多个goroutine中执行初始化
    for i := 0; i < 10; i++ {
        go func(id int) {
            fmt.Printf("Goroutine %d trying to initialize\n", id)
            once.Do(func() {
                fmt.Printf("Initialization done by goroutine %d\n", id)
            })
            done <- true
        }(i)
    }
    
    // 等待所有goroutine完成
    for i := 0; i < 10; i++ {
        <-done
    }
}

特性与限制

Cond(条件变量)

Cond实现了一个条件变量,它是等待或宣布事件发生的goroutine的会合点。它允许goroutine等待某个条件成立,然后在条件成立时得到通知。

基本概念

条件变量总是与互斥锁关联,并通过锁来保护条件的检查和更新。Cond提供了三个主要方法:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    
    ready := false
    
    // 消费者goroutine
    for i := 0; i < 3; i++ {
        go func(id int) {
            mu.Lock()
            defer mu.Unlock()
            
            fmt.Printf("Consumer %d is waiting...\n", id)
            for !ready { // 使用循环检查条件
                cond.Wait() // 等待信号
            }
            fmt.Printf("Consumer %d received signal\n", id)
        }(i)
    }
    
    // 给消费者一点时间启动
    time.Sleep(time.Second)
    
    // 生产者goroutine
    go func() {
        mu.Lock()
        defer mu.Unlock()
        
        fmt.Println("Producer is ready")
        ready = true
        
        fmt.Println("Producer broadcasts signal")
        cond.Broadcast() // 通知所有等待的goroutine
    }()
    
    // 等待足够时间让所有goroutine完成
    time.Sleep(3 * time.Second)
}

在上面的例子中:

使用模式

Pool

sync.Pool提供了一个可以重复使用临时对象的池,有助于减少垃圾回收压力,特别是在高并发环境下。

基本概念

Pool的主要特性:

常用方法

Pool提供了两个主要方法:

package main

import (
    "bytes"
    "fmt"
    "sync"
)

func main() {
    // 创建一个池,用于复用bytes.Buffer
    var bufferPool = sync.Pool{
        New: func() interface{} {
            fmt.Println("Creating a new buffer")
            return new(bytes.Buffer)
        },
    }
    
    // 获取一个Buffer
    buffer1 := bufferPool.Get().(*bytes.Buffer)
    buffer1.WriteString("Hello")
    fmt.Println("Buffer1:", buffer1.String())
    
    // 清空并放回池中
    buffer1.Reset()
    bufferPool.Put(buffer1)
    
    // 获取一个Buffer(可能是刚才放回的那个)
    buffer2 := bufferPool.Get().(*bytes.Buffer)
    buffer2.WriteString("World")
    fmt.Println("Buffer2:", buffer2.String())
    
    // 清空并放回池中
    buffer2.Reset()
    bufferPool.Put(buffer2)
    
    // 同时获取多个Buffer
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            // 获取Buffer
            buf := bufferPool.Get().(*bytes.Buffer)
            
            // 使用Buffer
            buf.WriteString(fmt.Sprintf("Goroutine %d", id))
            fmt.Printf("Goroutine %d: %s\n", id, buf.String())
            
            // 清空并放回
            buf.Reset()
            bufferPool.Put(buf)
        }(i)
    }
    
    wg.Wait()
}

并发模式

并发模式是在并发环境中解决特定问题的常见结构和方法。就像设计模式帮助我们组织代码一样,并发模式帮助我们组织并发逻辑,使其更易于理解、测试和维护。

好的并发模式应该具备以下特点:

掌握并发模式有以下好处:

生产者-消费者模式

生产者-消费者是最基本也是最常用的并发模式之一。它将"生产数据"和"消费数据"的过程解耦,通过channel在两者之间传递数据。

package main

import (
    "fmt"
    "sync"
    "time"
)

func producer(jobs chan<- int) {
    defer close(jobs)
    for i := 1; i <= 5; i++ {
        fmt.Printf("生产任务: %d\n", i)
        jobs <- i
        time.Sleep(time.Millisecond * 500)
    }
}

func consumer(id int, jobs <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("消费者 %d 处理任务: %d\n", id, job)
        time.Sleep(time.Second) // 模拟处理时间
    }
}

func main() {
    jobs := make(chan int, 3)
    var wg sync.WaitGroup

    // 启动一个生产者
    go producer(jobs)

    // 启动两个消费者
    for i := 1; i <= 2; i++ {
        wg.Add(1)
        go consumer(i, jobs, &wg)
    }

    wg.Wait()
    fmt.Println("所有工作完成")
}

这个例子展示了生产者-消费者模式的基本用法:

带错误处理的生产者-消费者

现实应用中,我们通常需要处理错误:

package main

import (
    "errors"
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID  int
    Err error
}

type Result struct {
    JobID     int
    Value     string
    Err       error
}

func producer(jobs chan<- Job) {
    defer close(jobs)
    for i := 1; i <= 5; i++ {
        var err error
        if i == 3 {
            err = errors.New("模拟生产错误")
        }
        jobs <- Job{ID: i, Err: err}
        time.Sleep(time.Millisecond * 500)
    }
}

func consumer(jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        if job.Err != nil {
            results <- Result{JobID: job.ID, Err: job.Err}
            continue
        }

        // 模拟处理
        time.Sleep(time.Second)
        
        var err error
        if job.ID == 4 {
            err = errors.New("模拟处理错误")
        }
        
        result := Result{
            JobID: job.ID,
            Value: fmt.Sprintf("处理结果 %d", job.ID),
            Err:   err,
        }
        
        results <- result
    }
}

func main() {
    jobs := make(chan Job, 5)
    results := make(chan Result, 5)
    
    var wg sync.WaitGroup
    
    // 启动生产者
    go producer(jobs)
    
    // 启动消费者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go consumer(jobs, results, &wg)
    }
    
    // 收集结果
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 处理结果
    for result := range results {
        if result.Err != nil {
            fmt.Printf("任务 %d 出错: %v\n", result.JobID, result.Err)
            continue
        }
        fmt.Printf("任务 %d 完成: %s\n", result.JobID, result.Value)
    }
}

实际应用场景

工作池模式 (Worker Pool)

工作池模式是生产者-消费者模式的扩展,它维护一组工作者(goroutine)来处理一系列任务。与简单的生产者-消费者模式相比,工作池模式更强调工作者的管理和任务的分发。

基本工作池的特点:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("工作者 %d 开始处理任务 %d\n", id, job)
        time.Sleep(time.Second) // 模拟工作耗时
        fmt.Printf("工作者 %d 完成任务 %d\n", id, job)
        results <- job * 2
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3
    
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    
    var wg sync.WaitGroup
    
    // 启动工作者
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }
    
    // 发送任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 等待所有工作者完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    for result := range results {
        fmt.Printf("结果: %d\n", result)
    }
}

可限流的工作池

在实际应用中,我们可能需要限制并发数量或请求速率:

限流工作池的关键功能:

package main

import (
    "fmt"
    "sync"
    "time"
)

type RateLimiter struct {
    interval time.Duration
    ticker   *time.Ticker
    stopCh   chan struct{}
}

func NewRateLimiter(rps int) *RateLimiter {
    interval := time.Second / time.Duration(rps)
    return &RateLimiter{
        interval: interval,
        ticker:   time.NewTicker(interval),
        stopCh:   make(chan struct{}),
    }
}

func (rl *RateLimiter) Allow() bool {
    select {
    case <-rl.ticker.C:
        return true
    case <-rl.stopCh:
        return false
    default:
        return false
    }
}

func (rl *RateLimiter) Stop() {
    rl.ticker.Stop()
    close(rl.stopCh)
}

func processRequest(id int, limiter *RateLimiter, wg *sync.WaitGroup) {
    defer wg.Done()
    
    start := time.Now()
    
    // 等待限流器允许
    for !limiter.Allow() {
        time.Sleep(time.Millisecond * 10)
    }
    
    // 模拟处理请求
    time.Sleep(time.Millisecond * 50)
    
    fmt.Printf("请求 %d 处理完成,等待时间: %v\n", id, time.Since(start))
}

func main() {
    // 每秒5个请求
    limiter := NewRateLimiter(5)
    defer limiter.Stop()
    
    var wg sync.WaitGroup
    
    // 模拟20个并发请求
    for i := 1; i <= 20; i++ {
        wg.Add(1)
        go processRequest(i, limiter, &wg)
    }
    
    wg.Wait()
    fmt.Println("所有请求处理完成")
}

实际应用场景

管道模式 (Pipeline)

管道模式将数据处理分成多个阶段,每个阶段通过channel连接起来。这种模式特别适合处理数据流,每个阶段都可以并发执行。

基本管道模式的特点:

package main

import (
    "fmt"
)

// 生成器:生成整数
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// 平方:计算输入值的平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// 过滤:只保留奇数
func onlyOdd(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n%2 != 0 {
                out <- n
            }
        }
    }()
    return out
}

func main() {
    // 构建管道
    nums := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squares := square(nums)
    odds := onlyOdd(squares)
    
    // 消费结果
    for odd := range odds {
        fmt.Println(odd)
    }
}

带错误处理的管道

实际应用中,我们需要处理每个管道阶段可能出现的错误:

带错误处理的管道特点:

package main

import (
    "errors"
    "fmt"
)

type Result struct {
    Value int
    Err   error
}

func generator(nums ...int) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for _, n := range nums {
            if n < 0 {
                out <- Result{Err: errors.New("负数不被允许")}
                return
            }
            out <- Result{Value: n}
        }
    }()
    return out
}

func square(in <-chan Result) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for res := range in {
            if res.Err != nil {
                out <- res // 传递错误
                continue
            }
            out <- Result{Value: res.Value * res.Value}
        }
    }()
    return out
}

func main() {
    // 构建管道
    values := generator(1, 2, 3, -4, 5)
    squares := square(values)
    
    // 消费结果
    for res := range squares {
        if res.Err != nil {
            fmt.Printf("错误: %v\n", res.Err)
            continue
        }
        fmt.Println(res.Value)
    }
}

管道模式的优势与实际应用

管道模式的主要优势:

实际应用场景:

扇入扇出模式 (Fan-in/Fan-out)

扇出是将任务分配给多个worker并行处理,扇入是将多个结果汇总到一个channel。这种模式适合处理可以并行的任务,然后需要汇总结果的场景。

扇入扇出模式的关键组件:

package main

import (
    "fmt"
    "sync"
)

// 扇出:将一个输入拆分为多个并行处理
func fanOut(in <-chan int, numWorkers int) []<-chan int {
    outputs := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        outputs[i] = processWorker(in, i)
    }
    return outputs
}

// 扇入:将多个输入合并到一个输出
func fanIn(inputs []<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    // 为每个输入启动一个goroutine
    for _, input := range inputs {
        wg.Add(1)
        go func(ch <-chan int) {
            defer wg.Done()
            for val := range ch {
                out <- val
            }
        }(input)
    }
    
    // 当所有输入处理完毕后关闭输出
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// 工作者:处理输入并产生输出
func processWorker(in <-chan int, workerID int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for val := range in {
            // 模拟处理:计算平方
            result := val * val
            fmt.Printf("工作者 %d 处理 %d -> %d\n", workerID, val, result)
            out <- result
        }
    }()
    return out
}

func main() {
    // 创建输入
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 10; i++ {
            input <- i
        }
    }()
    
    // 扇出:分配给3个工作者
    workers := fanOut(input, 3)
    
    // 扇入:汇总所有结果
    results := fanIn(workers)
    
    // 处理最终结果
    var sum int
    for res := range results {
        sum += res
    }
    
    fmt.Printf("所有结果的总和: %d\n", sum)
}

应用场景

扇入扇出模式在以下场景特别有用:

超时与取消模式

在并发程序中,我们常常需要处理超时和取消操作。Go的context包和select语句提供了优雅的方式来实现这些功能。

超时与上下文取消

Context的核心功能:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 一个可能耗时的操作
func slowOperation(ctx context.Context) (string, error) {
    select {
    case <-time.After(2 * time.Second):
        return "操作完成", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

// 使用超时控制
func withTimeout() {
    // 创建一个带1秒超时的上下文
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel() // 确保取消函数被调用
    
    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Printf("超时示例出错: %v\n", err)
        return
    }
    fmt.Printf("超时示例结果: %s\n", result)
}

// 使用手动取消
func withCancellation() {
    ctx, cancel := context.WithCancel(context.Background())
    
    // 启动一个goroutine在1秒后取消
    time.AfterFunc(1*time.Second, cancel)
    
    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Printf("取消示例出错: %v\n", err)
        return
    }
    fmt.Printf("取消示例结果: %s\n", result)
}

// 使用上下文控制多个goroutine
func workerWithContext(ctx context.Context, id int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    fmt.Printf("工作者 %d 启动\n", id)
    
    select {
    case <-time.After(3 * time.Second):
        fmt.Printf("工作者 %d 完成工作\n", id)
    case <-ctx.Done():
        fmt.Printf("工作者 %d 接收到取消信号: %v\n", id, ctx.Err())
    }
}

func multipleWorkers() {
    // 创建一个2秒后自动取消的上下文
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    var wg sync.WaitGroup
    
    // 启动多个工作者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go workerWithContext(ctx, i, &wg)
    }
    
    // 等待工作者完成或超时
    wg.Wait()
    fmt.Println("所有工作者已退出")
}

func main() {
    fmt.Println("-- 超时示例 --")
    withTimeout()
    
    fmt.Println("\n-- 取消示例 --")
    withCancellation()
    
    fmt.Println("\n-- 多工作者示例 --")
    multipleWorkers()
}

超时重试模式

在网络请求等不稳定操作中,重试是常见需求:

重试模式的关键点:

package main

import (
    "context"
    "errors"
    "fmt"
    "math/rand"
    "time"
)

// 模拟可能失败的操作
func doOperation(ctx context.Context) (string, error) {
    // 模拟随机失败
    if rand.Float32() < 0.7 {
        return "", errors.New("操作失败")
    }
    
    select {
    case <-time.After(500 * time.Millisecond):
        return "操作成功", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

// 带重试的操作
func doOperationWithRetry(maxRetries int, timeout time.Duration) (string, error) {
    var lastErr error
    
    for retry := 0; retry < maxRetries; retry++ {
        ctx, cancel := context.WithTimeout(context.Background(), timeout)
        
        result, err := doOperation(ctx)
        cancel() // 及时释放资源
        
        if err == nil {
            return result, nil
        }
        
        lastErr = err
        fmt.Printf("尝试 %d 失败: %v,重试中...\n", retry+1, err)
        time.Sleep(time.Millisecond * 200 * time.Duration(retry+1)) // 退避策略
    }
    
    return "", fmt.Errorf("在 %d 次尝试后失败: %w", maxRetries, lastErr)
}

func main() {
    rand.Seed(time.Now().UnixNano())
    
    result, err := doOperationWithRetry(5, 800*time.Millisecond)
    if err != nil {
        fmt.Printf("最终错误: %v\n", err)
        return
    }
    
    fmt.Printf("最终结果: %s\n", result)
}

超时与取消的实际应用

这些模式在以下场景尤为重要:

并发控制模式

有时我们需要精确控制并发数量或并发操作。这在资源受限的环境中尤为重要。

信号量模式

使用带缓冲的channel实现信号量:

信号量模式的特点:

package main

import (
    "fmt"
    "sync"
    "time"
)

// 信号量实现
type Semaphore struct {
    tokens chan struct{}
}

func NewSemaphore(limit int) *Semaphore {
    return &Semaphore{
        tokens: make(chan struct{}, limit),
    }
}

func (s *Semaphore) Acquire() {
    s.tokens <- struct{}{}
}

func (s *Semaphore) Release() {
    <-s.tokens
}

// 并发执行但限制最大并发数
func processWithLimit(items []int, concurrency int) {
    sem := NewSemaphore(concurrency)
    var wg sync.WaitGroup
    
    for i, item := range items {
        wg.Add(1)
        
        // 获取令牌
        sem.Acquire()
        
        go func(id, val int) {
            defer wg.Done()
            defer sem.Release() // 释放令牌
            
            // 模拟处理
            fmt.Printf("处理项目 %d: %d 开始\n", id, val)
            time.Sleep(time.Second)
            fmt.Printf("处理项目 %d: %d 完成\n", id, val)
        }(i, item)
    }
    
    wg.Wait()
}

func main() {
    items := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    
    fmt.Println("限制最大3个并发:")
    processWithLimit(items, 3)
}

并发与串行结合

有些应用需要在某些阶段并发,而在另一些阶段串行执行:

这种模式适用于:

package main

import (
    "fmt"
    "sync"
    "time"
)

// 串行阶段
func prepare(data []int) []int {
    fmt.Println("准备阶段(串行)开始")
    result := make([]int, len(data))
    for i, val := range data {
        result[i] = val * 2
        time.Sleep(100 * time.Millisecond) // 模拟处理
    }
    fmt.Println("准备阶段(串行)完成")
    return result
}

// 并行阶段
func process(data []int) []int {
    fmt.Println("处理阶段(并行)开始")
    result := make([]int, len(data))
    var wg sync.WaitGroup
    
    for i, val := range data {
        wg.Add(1)
        go func(idx, value int) {
            defer wg.Done()
            // 模拟复杂处理
            time.Sleep(500 * time.Millisecond)
            result[idx] = value * value
            fmt.Printf("处理项目 %d 完成\n", idx)
        }(i, val)
    }
    
    wg.Wait()
    fmt.Println("处理阶段(并行)完成")
    return result
}

// 串行阶段
func finalize(data []int) int {
    fmt.Println("最终阶段(串行)开始")
    sum := 0
    for _, val := range data {
        sum += val
        time.Sleep(100 * time.Millisecond) // 模拟处理
    }
    fmt.Println("最终阶段(串行)完成")
    return sum
}

func main() {
    input := []int{1, 2, 3, 4, 5}
    
    // 串行-并行-串行处理流程
    prepared := prepare(input)     // 串行
    processed := process(prepared) // 并行
    result := finalize(processed)  // 串行
    
    fmt.Printf("最终结果: %d\n", result)
}