大家好,今天将梳理出的 Go语言并发知识内容,分享给大家。 请多多指教,谢谢。
本文主要介绍 sync 标准库中基本同步原语 sync.Cond
、sync.Once
、sync.Pool
介绍及使用。
条件变量
type Cond struct {
// L 是在观察或改变状态时保持的
L Locker
// 包含过滤或未导出的字段
}
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
Cond 实现了一个条件变量,用于等待或宣布事件发生时 goroutine 的交汇点。 在这个定义中,“事件”是指两个或更多的goroutine之间的任何信号,仅指事件发生了,不包含其他任何信息。 通常,你可能想要在收到某个 goroutine 信号前令其处于等待状态。
每个 Cond 都有一个相关联的Locker L (通常是 Mutex
或 RWMutex
),当改变条件和调用 Wait()
方法时必须持有它。Cond在第一次使用后不得复制。
NewCond()
函数: 返回一个新的Cond与Locker l
func (*Cond) Broadcast()
方法:广播会唤醒所有等待c的goroutine
func (*Cond) Signal()
方法:信号唤醒了一个等待c的goroutine
func (*Cond) Wait()
方法:Wait自动解锁c.L并暂停调用goroutine的执行。在稍后恢复执行后,在返回之前等待锁定c.L。(主要为等待信号通知)
补充:
Wait()
方法会自动的对该条件变量关联的那个锁进行解锁,并且使它所在的 goroutine 阻塞。 一旦接收到通知,该方法所在的 goroutine 就会被唤醒,并且该方法会立即尝试锁定该锁。
Singnal()
和Broadcast()
方法的作用都是发送通知,以唤醒正在为此阻塞的 goroutine。Singnal 的目标只有一个,Broadcast 的目标则是所有。
举例1:假设我们有一个固定长度为2的队列,并且我们要将10个元素放入队列中。希望一有空间就能放入,所以在队列中有空间时需要立即通知。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
c := sync.NewCond(&sync.Mutex{}) // 1
queue := make([]interface{}, 0, 10) // 2
removeFromQueue := func(delay time.Duration) {
time.Sleep(delay)
c.L.Lock() // 8
queue = queue[1:] // 9
fmt.Println("Remove from queue")
c.L.Unlock() // 10
c.Signal() // 11
}
for i := 0; i <10; i++ {
c.L.Lock() // 3
for len(queue) == 2 { // 4
c.Wait() // 5
}
fmt.Println("Adding to queue")
queue = append(queue, struct{}{})
go removeFromQueue(1 * time.Second) // 6
c.L.Unlock() //7
}
}
输出
Adding to queue
Adding to queue
Remove from queue
Adding to queue
Remove from queue
Adding to queue
Remove from queue
Remove from queue
Adding to queue
Adding to queue
Remove from queue
Adding to queue
Remove from queue
Adding to queue
Remove from queue
Adding to queue
Remove from queue
Adding to queue
main() goroutine
,直到接受到信号。举例2:介绍另一个方法 Broadcast()
, 它提供了一种同时与多个 goroutine 进行通信的解决方案。假设创建了一个带有按钮的GUI程序,该程序需要注册任意数量的函数,当点击按钮时运行这些函数,可以使用 Cond的 Broadcast
来通知所有已注册的函数。
package main
import (
"fmt"
"sync"
)
func main() {
type Button struct {
Clicked *sync.Cond // 1
}
button := Button{Clicked: sync.NewCond(&sync.Mutex{})}
subscribe := func(c *sync.Cond, fn func()) { // 2
var tempwg sync.WaitGroup
tempwg.Add(1)
go func() {
tempwg.Done()
c.L.Lock()
defer c.L.Unlock()
c.Wait()
fn()
}()
tempwg.Wait()
}
var wg sync.WaitGroup // 3
wg.Add(3)
subscribe(button.Clicked, func(){ // 4
fmt.Println("Clicked 1")
wg.Done()
})
subscribe(button.Clicked, func(){ // 5
fmt.Println("Clicked 2")
wg.Done()
})
subscribe(button.Clicked, func(){ // 6
fmt.Println("Clicked 3")
wg.Done()
})
button.Clicked.Broadcast() // 7
wg.Wait()
}
输出
Clicked 3
Clicked 1
Clicked 2
sync.Cond
指针类型的 Clicked 属性,这是 goroutine 接收通知的关键条件。一次性执行
type Once struct {
// contains filtered or unexported fields
}
func (o *Once) Do(f func())
Once 主要作用是只执行一次处理,在第一次使用后将不可复制。
func (o *Once) Do(f func())
方法:当 Once 的这个实例第一次调用Do时,Do将调用函数f。换句话说,如果once.Do(f)被多次调用,只有第一个调用会调用f,即使f在每次调用中具有不同的值。每个函数执行时都需要一个Once的新实例。
package main
import (
"fmt"
"sync"
)
func main() {
var count int
increment := func() {
count++
}
var once sync.Once
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i <100; i++ {
go func() {
defer wg.Done()
once.Do(increment)
}()
}
wg.Wait()
fmt.Printf("Count is %d\n", count)
}
输出
Count is 1
sync.Once
确保了即使在不同的 goroutine上,调用 Do 传入的函数只执行一次。
临时对象池
type Pool struct {
// New 可选指定要生成的函数
// 一个值,否则Get将返回nil
// 它可能不会在调用Get时并发更改
New func() any
// contains filtered or unexported fields
}
func (p *Pool) Get() any
func (p *Pool) Put(x any)
Pool 是一组可以单独保存和检索的临时对象。存储在池中的任何项目可以在任何时候自动删除而不通知。如果发生这种情况时Pool持有唯一的引用,则可能会释放该项。
Pool 可以安全的被多个 goroutine 同时使用。
Pool 的目的是缓存已分配但未使用的项,以便稍后重用,减轻垃圾收集器的压力。也就是说,它使构建高效的、线程安全的空闲列表变得容易。然而,它并不适用于所有的免费列表。
Pool 适当使用是管理一组临时项,这些临时项在包的并发独立客户端之间共享,并可能被包的并发独立客户端重用。Pool 提供了一种在多个客户端之间摊销分配开销的方法。
在较高的层次上,池模式是一种创建和提供固定数量可用对象的方式。它通常用于约束创建资源昂贵的事物(例如数据库连接)。Go的 sync.Pool
可以被多个例程安全地使用。
一个很好 Pool 的例子是在 fmt 包中,它维护一个动态大小的临时输出缓冲区存储。存储在负载下伸缩(当许多goroutines正在积极地打印时),在静默时收缩。
注意,作为生存期较短的对象一部分维护的空闲列表不适合用于Pool,因为在这种情况下,开销不会很好地摊销。让这些对象实现它们自己的空闲列表会更有效率。
func (p *Pool) Get() any
方法:Get()
从池中选择任意项,将其从池中移除,并将其返回给调用者。Get()
可以选择忽略池并将其视为空的。调用方不应假定传递给 Put()
的值与 Get()
返回的值之间存在任何关系。
如果Get将返回nil而p.New
是非nil,则Get将返回调用p.New
的结果。
func (p *Pool) Put(x any)
方法:Put将x添加到 Pool 中。
举例1:Pool的主要接口是它的Get方法。 被调用时,Get将首先检查池中是否有可用实例返回给调用者,如果没有,则创建一个新成员变量。使用完成后,调用者调用Put将正在使用的实例放回池中供其他进程使用。
package main
import (
"fmt"
"sync"
)
func main() {
myPool := &sync.Pool{
New: func() interface{} {
fmt.Println("create new instance")
return struct{}{}
},
}
myPool.Get() // 1
instance := myPool.Get() // 1
myPool.Put(instance) // 2
myPool.Get() // 3
}
输出
create new instance
create new instance
举例2:指定分配内存
package main
import (
"fmt"
"sync"
)
func main() {
var numCalcsCreated int
calcPool := &sync.Pool{
New: func() interface{} {
numCalcsCreated += 1
mem := make([]byte, 1024)
return &mem // 1
},
}
// 将池扩充到4KB
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
const numWorkers = 1024 * 1024
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := numWorkers; i > 0; i-- {
go func() {
defer wg.Done()
mem := calcPool.Get().(*[]byte) // 2
defer calcPool.Put(mem)
}()
}
wg.Wait()
fmt.Printf("numCalcsCreated = %d\n", numCalcsCreated)
}
输出
numCalcsCreated = 4
案例中,如果不使用 sync.Pool
方式,可能需要分配千兆字节的内存。目前使用 sync.Pool
设置对象池方式只分配了 4KB。
举例3:Pool 另一种常见情况是预热分配对象缓存,用于必须尽快运行的操作。通过预先加载获取对另一个对象的引用来减少消费者的时间消耗。
package main
import (
"log"
"fmt"
"net"
"sync"
"time"
)
func main() {
connPool := warmServiceConnCache()
server, err := net.Listen("tcp", "127.0.0.1:8080")
if err != nil {
log.Fatalf("cannot listen: %v", err)
}
defer server.Close()
for {
conn, err := server.Accept()
if err != nil {
log.Printf("cannot accept connection: %v", err)
continue
}
svcConn := connPool.Get()
fmt.Fprintln(conn, "")
connPool.Put(svcConn)
conn.Close()
}
}
// 模拟服务连接
func connectToService() interface{} {
time.Sleep(1 * time.Second)
return struct{}{}
}
// 对象连接池
func warmServiceConnCache() *sync.Pool {
p := &sync.Pool{
New: connectToService,
}
for i := 0; i <10; i++ {
p.Put(p.New())
}
return p
}
正如这个例子所展现的,池模式非常适合于这种需要并发进程,或者构建这些对象可能会对内存产生负面影响的应用程序。
但是,在确定是否应该使用池时有一点需要注意:如果使用池子里东西在内存上不是大致均匀的,则会花更多时间将从池中检索,这比首先实例化它要耗费更多的资源。
因此,在使用 Pool 时,请记住以下几点:
sync.Pool
时,给它一个新元素,该元素应该是线程安全的。最后总结下临时对象池的特性:
Get()
方法被调用时,它一般会先尝试从与本地 P 对应的本地池和本地共享池中获取一个对象值。 如果获取失败,就会试图从其他 P 共享池中取走一个对象值并直接返回给调用方。 注意,这个对象值生成函数产生的对象值永远不会被放置到池中,而是会被直接返回给调用方。 另外,临时对象池的 Put()
方法会把它的参数存放到本地 P 的本地池中,每个相关的 P 的本地共享池中的所有对象值,都是在当前临时对象池的范围内共享的。技术文章持续更新,请大家多多关注呀~~
搜索微信公众号,关注我【 帽儿山的枪手 】
参考材料
[1] "《Go并发编程实战》书籍"
[2]: "《Concurrency in Go》书籍"
[3] https://pkg.go.dev/sync sync标准库