更简的并发代码,更强的并发控制
有没感觉
Go
的sync
包不够用?有没遇到类型没有sync/atomic
支持?我们一起看看
go-zero
的syncx
包对标准库的一些增值补充。
name |
作用 |
---|---|
AtomicBool |
bool类型 原子类 |
AtomicDuration |
Duration有关 原子类 |
AtomicFloat64 |
float64类型 原子类 |
Barrier |
栏栅【将加锁解锁包装】 |
Cond |
条件变量 |
DoneChan |
优雅通知关闭 |
ImmutableResource |
创建后不会修改的资源 |
Limit |
控制请求数 |
LockedCalls |
确保方法的串行调用 |
ManagedResource |
资源管理 |
Once |
提供 |
OnceGuard |
一次性使用的资源管理 |
Pool |
pool,简单的池 |
RefResource |
引用计数的资源 |
ResourceManager |
资源管理器 |
SharedCalls |
类似 |
SpinLock |
自旋锁:自旋+CAS |
TimeoutLimit |
Limit + timeout 控制 |
下面开始对以上库组件做分别介绍。
atomic
因为没有 泛型 支持,所以才会出现多种类型的原子类支持。以下采用 float64
作为例子:
- func (f *AtomicFloat64) Add(val float64) float64 {
- for {
- old := f.Load()
- nv := old + val
- if f.CompareAndSwap(old, nv) {
- return nv
- }
- }
- }
-
- func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool {
- return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val))
- }
-
- func (f *AtomicFloat64) Load() float64 {
- return math.Float64frombits(atomic.LoadUint64((*uint64)(f)))
- }
-
- func (f *AtomicFloat64) Set(val float64) {
- atomic.StoreUint64((*uint64)(f), math.Float64bits(val))
- }
Add(val)
:如果CAS
失败,不断for循环重试,获取 old val,并set old+val;CompareAndSwap(old, new)
:调用底层atomic
的CAS
;Load()
:调用atomic.LoadUint64
,然后转换Set(val)
:调用atomic.StoreUint64
至于其他类型,开发者想自己扩展自己想要的类型,可以依照上述,基本上调用原始 atomic
操作,然后转换为需要的类型,比如:遇到 bool
可以借助 0, 1
来分辨对应的 false, true
。
Barrier
这里 Barrier
只是将业务函数操作封装,作为闭包传入,内部将 lock
操作的加锁解锁自行解决了【防止开发者加锁了忘记解锁】
- func (b *Barrier) Guard(fn func()) {
- b.lock.Lock()
- defer b.lock.Unlock()
- // 自己的业务逻辑
- fn()
- }
Cond/Limit/TimeoutLimit
这个数据结构和 Limit
一起组成了 TimeoutLimit
,这里将这3个一起讲:
- func NewTimeoutLimit(n int) TimeoutLimit {
- return TimeoutLimit{
- limit: NewLimit(n),
- cond: NewCond(),
- }
- }
-
- func NewLimit(n int) Limit {
- return Limit{
- pool: make(chan lang.PlaceholderType, n),
- }
- }
limit
这里是有缓冲的channel
;cond
是无缓冲的;
所以这里结合名字来理解:因为 Limit
是限制某一种资源的使用,所以需要预先在资源池中放入预置数量的资源;Cond
类似阀门,需要两边都准备好,才能进行数据交换,所以使用无缓冲,同步控制。
这里我们看看 stores/mongo
中关于 session
的管理,来理解 资源控制:
- func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) {
- // 选项参数注入
- ...
- // 看 limit 中是否还能取出资源
- if err := cs.limit.Borrow(o.timeout); err != nil {
- return nil, err
- } else {
- return cs.Copy(), nil
- }
- }
-
- func (l TimeoutLimit) Borrow(timeout time.Duration) error {
- // 1. 如果还有 limit 中还有资源,取出一个,返回
- if l.TryBorrow() {
- return nil
- }
- // 2. 如果 limit 中资源已经用完了
- var ok bool
- for {
- // 只有 cond 可以取出一个【无缓存,也只有 cond <- 此条才能通过】
- timeout, ok = l.cond.WaitWithTimeout(timeout)
- // 尝试取出一个【上面 cond 通过时,就有一个资源返回了】
- // 看 `Return()`
- if ok && l.TryBorrow() {
- return nil
- }
- // 超时控制
- if timeout <= 0 {
- return ErrTimeout
- }
- }
- }
-
- func (l TimeoutLimit) Return() error {
- // 返回去一个资源
- if err := l.limit.Return(); err != nil {
- return err
- }
- // 同步通知另一个需要资源的协程【实现了阀门,两方交换】
- l.cond.Signal()
- return nil
- }
资源管理
同文件夹中还有 ResourceManager
,从名字上类似,这里将两个组件放在一起讲解。
先从结构上:
- type ManagedResource struct {
- // 资源
- resource interface{}
- lock sync.RWMutex
- // 生成资源的逻辑,由开发者自己控制
- generate func() interface{}
- // 对比资源
- equals func(a, b interface{}) bool
- }
-
- type ResourceManager struct {
- // 资源:这里看得出来是 I/O,
- resources map[string]io.Closer
- sharedCalls SharedCalls
- // 对资源map互斥访问
- lock sync.RWMutex
- }
然后来看获取资源的方法签名:
- func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error)
-
- // 获取一个资源(有就直接获取,没有生成一个)
- func (mr *ManagedResource) Take() interface{}
- // 判断这个资源是否不符合传入的判断要求,不符合则重置
- func (mr *ManagedResource) MarkBroken(resource interface{})
ResourceManager
使用SharedCalls
做防重复请求,并将资源缓存在内部的sourMap
;另外传入的create func
和IO
操作有关,常见用在网络资源的缓存;ManagedResource
缓存资源没有map
而是单一的interface
,说明只有一份,但是它提供了Take()
和传入generate()
说明可以让开发者自行更新resource
;
所以在用途上:
ResourceManager
:用在网络资源的管理。如:数据库连接管理;ManagedResource
:用在一些变化资源,可以做资源前后对比,达到更新资源。如:token
管理和验证
RefResource
这个就和 GC
中引用计数类似:
Use() -> ref++
Clean() -> ref--; if ref == 0 -> ref clean
- func (r *RefResource) Use() error {
- // 互斥访问
- r.lock.Lock()
- defer r.lock.Unlock()
- // 清除标记
- if r.cleaned {
- return ErrUseOfCleaned
- }
- // 引用 +1
- r.ref++
- return nil
- }
SharedCalls
一句话形容:使用SharedCalls可以使得同时多个请求只需要发起一次拿结果的调用,其他请求"坐享其成",这种设计有效减少了资源服务的并发压力,可以有效防止缓存击穿。
这个组件被反复应用在其他组件中,上面说的 ResourceManager
。
类似当需要高频并发访问一个资源时,就可以使用 SharedCalls
缓存。
- // 当多个请求同时使用Do方法请求资源时
- func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
- // 先申请加锁
- g.lock.Lock()
-
- // 根据key,获取对应的call结果,并用变量c保存
- if c, ok := g.calls[key]; ok {
- // 拿到call以后,释放锁,此处call可能还没有实际数据,只是一个空的内存占位
- g.lock.Unlock()
- // 调用wg.Wait,判断是否有其他goroutine正在申请资源,如果阻塞,说明有其他goroutine正在获取资源
- c.wg.Wait()
- // 当wg.Wait不再阻塞,表示资源获取已经结束,可以直接返回结果
- return c.val, c.err
- }
-
- // 没有拿到结果,则调用makeCall方法去获取资源,注意此处仍然是锁住的,可以保证只有一个goroutine可以调用makecall
- c := g.makeCall(key, fn)
- // 返回调用结果
- return c.val, c.err
- }
总结
不重复造轮子,一直是 go-zero
设计主旨之一;也同时将平时业务沉淀到组件中,这才是框架和组件的意义。
关于 go-zero
更多的设计和实现文章,可以持续关注我们。欢迎大家去关注和使用。
项目地址
https://github.com/tal-tech/go-zero
欢迎使用 go-zero 并 star 支持我们!