errgroup

errgroup是一个很简单的工具包,总共代码量加上注释和空行才100来行 作用就是方便执行的任务,比如 g := errgroup.Group{} g.Go(func1) g.Go(func2) if err := g.Wait(); err != nil { //... } 结构 type Group struct { cancel func() //context 取消函数 wg sync.WaitGroup //用来等待全部执行完成 sem chan token //用来控制并发数 errOnce sync.Once //控制err字段只赋值一次 err error //错误 } 主要函数 func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} //限制并发数, 并发数由管道能容纳下的token个数决定 } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { //如果出错后只赋值一次err g.err = err if g.cancel != nil { //如果使用的是WithContext,这个字段会有值, 调用取消函数 g.cancel() } }) } }() } 主要的东西就这么点, 可以看到errgroup还是一个很简洁的小工具, 配合singleflight一起使用挺不错

<span title='2022-09-06 17:22:20 +0800 +0800'>九月 6, 2022</span>

Semaphore(信号量)

信号量 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量 简单的说就是通过获取资源和释放资源来进行同步的一种策略 用法 用法一共有以下几步: 创建信号量 获取信号量 释放信号量 //1.创建信号量为10 sem = semaphore.NewWeighted(10) for i := 0; i <100; i++ { go func() { ctx := context.TODO() //2.获取一个信号量, 信号量一共10个,获取最多获取10,超过的gorutine会挂起 if err := sem.Acquire(ctx, 1); err != nil { doSomething() } //3. 释放信号量,1个 sem.Release(1) }() } 代码解读 Weighted 结构(NewWeighted 返回的数据结构) type Weighted struct { size int64 //总大小,就是NewWeighted传入的个数 cur int64 //当前消耗的个数 mu sync.Mutex //互斥锁 waiters list.List //等待列表, 当信号量不足时等待的列表 } waiter 结构(等待列表保存的结构) type waiter struct { n int64 //需要的资源数 ready chan<- struct{} // 用来通知gorutine } Acquire 方法 func (s *Weighted) Acquire(ctx context.Context, n int64) error { s.mu.Lock() //判断当前资源是否足够,如果足够则累加s.cur直接返回 if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil } //如果n 大于总资源就会等待超时或取消,(如果这种情况使用的是context.TODO,或者Background, Done返回nil,就会一直等待,导致gorutine泄露) if n > s.size { s.mu.Unlock() <-ctx.Done() return ctx.Err() } //这里就是判断,总资源够,但是剩余资源不够的情况, 这时就需要其他获取资源的groutine释放资源 ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) //把当前等待的放入队列 s.mu.Unlock() //等待超时或者资源充足 select { case <-ctx.Done(): //等待超时 err := ctx.Err() s.mu.Lock() select { case <-ready: //如果超时后立刻有足够资源时也会返回 err = nil default: //超时后的逻辑 isFront := s.waiters.Front() == elem //判断当前等待的是队列头部元素(因为资源被释放后优先分配给队列头部的) s.waiters.Remove(elem) //移除超时的waiter if isFront && s.size > s.cur { //去掉头部后并且有剩余就看下,下个waiter能否满足 s.notifyWaiters() } } s.mu.Unlock() return err case <-ready: //等待资源 return nil } } notifyWaiters 方法 func (s *Weighted) notifyWaiters() { for { //取出队列头部的waiter next := s.waiters.Front() if next == nil { //没有等待的waiter就退出循环 break } w := next.Value.(waiter) //如果资源不能满足则退出循环 if s.size-s.cur < w.n { break } //如果能满足,就移除当前元素,关闭waiter.ready(Acquire 中 <-w.ready 就能取消阻塞) s.cur += w.n s.waiters.Remove(next) close(w.ready) } } Release 方法 func (s *Weighted) Release(n int64) { s.mu.Lock() //从当前使用减去n s.cur -= n if s.cur < 0 { //如果Release的资源超过,Acquire的资源就会发生panic s.mu.Unlock() panic("semaphore: released more than held") } //唤醒waiter s.notifyWaiters() s.mu.Unlock() }

<span title='2022-07-01 23:51:45 +0800 +0800'>七月 1, 2022</span>

Once

once是什么 和singlefight有些相似,singlefight是并发执行时只有一个在执行, once也是并发时只有一个在执行,只不过,只执行一次,再次调用不会在执行 once怎么用 var A int var once = sync.Once{} func initA() int { once.Do(func() { //这里只会执行一次 A = 10 //A=10 只会执行一次,并且所有并发进来的,都需要等待A=10 完成后返回 }) return A // A=10 happens before 读取A, 所以initA()在所有gorutine里,都返回10 } 这个例子我们可以构造一个懒汉模式单例 源码阅读 Once结构 type Once struct { done uint32 m Mutex } Once结构很简单,只有两个字段, done来表示是否执行完成, m为互斥锁 Do函数 func (o *Once) Do(f func()) { //判断done 如果没完成,则执行doSlow函数,否则直接返回退出函数 if atomic.LoadUint32(&o.done) == 0 { o.doSlow(f) } } doSlow (第一次并发执行时才会进入的分支) func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() //上锁 if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) //完成标志设置为1 f() //执行传入的函数 } }

<span title='2022-06-25 20:50:16 +0800 +0800'>六月 25, 2022</span>

go sync包之singleflight原理

singlefight是什么 singlefight 直译为"单飞"(雅名到底是啥我也不知道), 顾名思义就是只有一个跑了, 是用来对同一资源控制并发 多个goroutine访问同一个资源时,只有一个goroutine真正的进行访问,其他goroutine等待这一个goroutine返回后共享返回结果 为什么出现singlefight 这个包 上面是什么中已经交代,是为了控制访问同一个资源的并发数,举个例子:假设有个接口访问数据库中id为1的一条数据,如果我们没有控制并发,那么来一百个并发访问这个数据,那么这一百个请求全部取请求数据库(即使有缓存也是全部请求缓存) 如果我们使用了singlefight那么,100个并发讲只有一个请求去数据库,其他99个全部共享那1个返回的结果 怎么用 var g = singleflight.Group{} //初始化了一个singleflight func SharedRes(id int) (int, error) { key := fmt.Sprintf("id:%d", id) //同一个group上,相同key的,只会执行一次,也就是说用key标识一个共享资源 ret, err, _ := g.Do(key, func() (interface{}, error) { //调用共享资源 time.Sleep(time.Second) //这里睡1s是模拟资源执行的延迟 fmt.Println("xxxx") return 1, nil }) return ret.(int), err } func SingleFlight() { wg := sync.WaitGroup{} //为了等100个goroutine执行完,开启了一个WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func() { //模拟并发 ret, err := SharedRes(1) fmt.Println("ret", ret, err) wg.Done() }() } wg.Wait() //等待100个goroutine执行完 } 看一下打印结果 xxxx <----这里只输出了一次xxxx ret 1 <nil> <----这里是返回结果 ret 1 <nil> ret 1 <nil> ret 1 <nil> .... 我们从现象可以看出,真正的业务逻辑执行只输出了一次,其他的goroutine也都返回了结果 源代码分析 Group结构 type Group struct { mu sync.Mutex // 互斥锁 m map[string]*call // 用来保存共享的调用结构,等会分析这个的作用 } call结构 type call struct { wg sync.WaitGroup //用来让共享groutine等待用的 //我们传入Do的函数签名是 func() (interface{}, error) val interface{} //函数返回结果 err error //函数返回错误 forgotten bool //调用Forget会forgotten=true,并从group.m 中删除这个key对应的call dups int //有几个一起共享结果的 chans []chan<- Result //通过DoChan配合使用,通过返回chan的方式返回结果 } Do函数分析,这个是这个包中最精华部分一之 ...

<span title='2022-06-19 22:02:23 +0800 +0800'>六月 19, 2022</span>