生产者消费者模式
1 | package main |
发布订阅模式
1 | package pubsub |
1 | package main |
素数过滤器
1 | package main |
select
用select实现一个生成随机数序列的程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21package main
import (
"fmt"
)
func main() {
ch := make(chan int)
//当有多个管道均可操作时,
//select会随机选择一个管道。
go func() {
for {
select {
case ch <- 0:
case ch <- 1:
}
}
}()
for v := range ch {
fmt.Println(v)
}
}基于
select
实现的管道的超时判断1
2
3
4
5
6select {
case v := <-in:
fmt.Println(v)
case <-time.After(time.Second):
return // 超时
}通过
select
的default
分支实现非阻塞的管道发送或接收操作1
2
3
4
5
6select {
case v := <-in:
fmt.Println(v)
default:
// 没有数据
}通过
select
来阻止main
函数退出1
2
3
4func main() {
// do some thins
select{}
}select
和default
分支可以很容易实现一个Goroutine的退出控制1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19func worker(cannel chan bool) {
for {
select {
default:
fmt.Println("hello")
// 正常工作
case <-cannel:
// 退出
}
}
}
func main() {
cannel := make(chan bool)
go worker(cannel)
time.Sleep(time.Second)
cannel <- true
}通过
close
关闭一个管道来实现广播退出消息的作用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22func worker(cannel chan bool) {
for {
select {
default:
fmt.Println("hello")
// 正常工作
case <-cannel:
// 退出
}
}
}
func main() {
cancel := make(chan bool)
for i := 0; i < 10; i++ {
go worker(cancel)
}
time.Sleep(time.Second)
close(cancel)
}添加
sync.WaitGroup
确保所有的Goroutine都成功退出后再关闭主线程1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26func worker(wg *sync.WaitGroup, cannel chan bool) {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-cannel:
return
}
}
}
func main() {
cancel := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(&wg, cancel)
}
time.Sleep(time.Second)
close(cancel)
wg.Wait()
}用
context
包来重新实现前面的线程安全退出或超时的控制1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func worker(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-ctx.Done():
return ctx.Err()
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(ctx, &wg)
}
time.Sleep(time.Second)
cancel()
wg.Wait()
}通过
context
避免当main
函数不再使用管道时后台Goroutine有泄漏的风险。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45// 返回生成自然数序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context) chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
select {
case <- ctx.Done():
return
case ch <- i:
}
}
}()
return ch
}
// 管道过滤器: 删除能被素数整除的数
func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
select {
case <- ctx.Done():
return
case out <- i:
}
}
}
}()
return out
}
func main() {
// 通过 Context 控制后台Goroutine状态
ctx, cancel := context.WithCancel(context.Background())
ch := GenerateNatural(ctx) // 自然数序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出现的素数
fmt.Printf("%v: %v\n", i+1, prime)
ch = PrimeFilter(ctx, ch, prime) // 基于新素数构造的过滤器
}
cancel()
}