桃之夭夭,灼灼其华

go 常见的并发模式

Word count: 1.5kReading time: 7 min
2021/03/25 Share

生产者消费者模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"os"
"os/signal"
"syscall"
)
// 生产者: 生成 factor 整数倍的序列
func Producer(factor int, out chan<- int) {
for i := 0; ; i++ {
out <- i*factor
}
}
// 消费者
func Consumer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 64) // 队列

go Producer(3, ch) // 生成 3 的倍数的序列
go Producer(5, ch) // 生成 5 的倍数的序列
go Consumer(ch) // 消费 生成的队列

// Ctrl+C 退出
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("quit (%v)\n", <-sig)
}

发布订阅模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package pubsub

import (
"sync"
"time"
)

type (
subscriber chan interface{} // 订阅者为一个管道
topicFunc func(v interface{}) bool // 主题为一个过滤器
)

// 发布者对象
type Publisher struct {
m sync.RWMutex // 读写锁
buffer int // 订阅队列的缓存大小
timeout time.Duration // 发布超时时间
subscribers map[subscriber]topicFunc // 订阅者信息
}

// 构建一个发布者对象, 可以设置发布超时时间和缓存队列的长度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}

// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}

// 添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}

// 退出订阅
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
//map删除元素
delete(p.subscribers, sub)
//关闭channel
close(sub)
}

// 发布一个主题
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()

var wg sync.WaitGroup
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, &wg)
}
wg.Wait()
}

// 关闭发布者对象,同时关闭所有的订阅者管道。
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()

for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}

// 发送主题,可以容忍一定的超时
func (p *Publisher) sendTopic(
sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}

select {
case sub <- v:
case <-time.After(p.timeout):
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"fmt"
"strings"
"time"
"demo/22-pubsub/pubsub"
)

func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)
defer p.Close()

all := p.Subscribe()
golang := p.SubscribeTopic(func(v interface{}) bool {
if s, ok := v.(string); ok {
return strings.Contains(s, "golang")
}
return false
})

p.Publish("hello, world!")
p.Publish("hello, golang!")

go func() {
for msg := range all {
fmt.Println("all:", msg)
}
} ()

go func() {
for msg := range golang {
fmt.Println("golang:", msg)
}
} ()

// 运行一定时间后退出
time.Sleep(3 * time.Second)
}

素数过滤器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main
import (
"fmt"
)

// 返回生成自然数序列的管道: 2, 3, 4, ...
func GenerateNatural() chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
ch <- i
}
}()
return ch
}

// 管道过滤器: 删除能被素数整除的数
func PrimeFilter(in <-chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
out <- i
}
}
}()
return out
}

func main() {
ch := GenerateNatural() // 自然数序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出现的素数
fmt.Printf("%v: %v\n", i+1, prime)
ch = PrimeFilter(ch, prime) // 基于新素数构造的过滤器
}
}

select

  • 用select实现一个生成随机数序列的程序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    package main
    import (
    "fmt"
    )
    func main() {
    ch := make(chan int)
    //当有多个管道均可操作时,
    //select会随机选择一个管道。
    go func() {
    for {
    select {
    case ch <- 0:
    case ch <- 1:
    }
    }
    }()

    for v := range ch {
    fmt.Println(v)
    }
    }
  • 基于select实现的管道的超时判断

    1
    2
    3
    4
    5
    6
    select {
    case v := <-in:
    fmt.Println(v)
    case <-time.After(time.Second):
    return // 超时
    }
  • 通过selectdefault分支实现非阻塞的管道发送或接收操作

    1
    2
    3
    4
    5
    6
    select {
    case v := <-in:
    fmt.Println(v)
    default:
    // 没有数据
    }
  • 通过select来阻止main函数退出

    1
    2
    3
    4
    func main() {
    // do some thins
    select{}
    }
  • selectdefault分支可以很容易实现一个Goroutine的退出控制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    func worker(cannel chan bool) {
    for {
    select {
    default:
    fmt.Println("hello")
    // 正常工作
    case <-cannel:
    // 退出
    }
    }
    }

    func main() {
    cannel := make(chan bool)
    go worker(cannel)

    time.Sleep(time.Second)
    cannel <- true
    }

    通过close关闭一个管道来实现广播退出消息的作用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    func worker(cannel chan bool) {
    for {
    select {
    default:
    fmt.Println("hello")
    // 正常工作
    case <-cannel:
    // 退出
    }
    }
    }

    func main() {
    cancel := make(chan bool)

    for i := 0; i < 10; i++ {
    go worker(cancel)
    }

    time.Sleep(time.Second)
    close(cancel)
    }

    添加sync.WaitGroup确保所有的Goroutine都成功退出后再关闭主线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    func worker(wg *sync.WaitGroup, cannel chan bool) {
    defer wg.Done()

    for {
    select {
    default:
    fmt.Println("hello")
    case <-cannel:
    return
    }
    }
    }

    func main() {
    cancel := make(chan bool)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
    wg.Add(1)
    go worker(&wg, cancel)
    }

    time.Sleep(time.Second)
    close(cancel)
    wg.Wait()
    }

    context包来重新实现前面的线程安全退出或超时的控制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    func worker(ctx context.Context, wg *sync.WaitGroup) error {
    defer wg.Done()

    for {
    select {
    default:
    fmt.Println("hello")
    case <-ctx.Done():
    return ctx.Err()
    }
    }
    }

    func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
    wg.Add(1)
    go worker(ctx, &wg)
    }

    time.Sleep(time.Second)
    cancel()

    wg.Wait()
    }

    通过context避免当main函数不再使用管道时后台Goroutine有泄漏的风险。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    // 返回生成自然数序列的管道: 2, 3, 4, ...
    func GenerateNatural(ctx context.Context) chan int {
    ch := make(chan int)
    go func() {
    for i := 2; ; i++ {
    select {
    case <- ctx.Done():
    return
    case ch <- i:
    }
    }
    }()
    return ch
    }

    // 管道过滤器: 删除能被素数整除的数
    func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
    out := make(chan int)
    go func() {
    for {
    if i := <-in; i%prime != 0 {
    select {
    case <- ctx.Done():
    return
    case out <- i:
    }
    }
    }
    }()
    return out
    }

    func main() {
    // 通过 Context 控制后台Goroutine状态
    ctx, cancel := context.WithCancel(context.Background())

    ch := GenerateNatural(ctx) // 自然数序列: 2, 3, 4, ...
    for i := 0; i < 100; i++ {
    prime := <-ch // 新出现的素数
    fmt.Printf("%v: %v\n", i+1, prime)
    ch = PrimeFilter(ctx, ch, prime) // 基于新素数构造的过滤器
    }

    cancel()
    }
CATALOG
  1. 1. 生产者消费者模式
  2. 2. 发布订阅模式
  3. 3. 素数过滤器
  4. 4. select