singlefight是什么
singlefight 直译为"单飞"(雅名到底是啥我也不知道), 顾名思义就是只有一个跑了, 是用来对同一资源控制并发 多个goroutine访问同一个资源时,只有一个goroutine真正的进行访问,其他goroutine等待这一个goroutine返回后共享返回结果
为什么出现singlefight 这个包
上面是什么中已经交代,是为了控制访问同一个资源的并发数,举个例子:假设有个接口访问数据库中id为1的一条数据,如果我们没有控制并发,那么来一百个并发访问这个数据,那么这一百个请求全部取请求数据库(即使有缓存也是全部请求缓存)
如果我们使用了singlefight那么,100个并发讲只有一个请求去数据库,其他99个全部共享那1个返回的结果
怎么用
var g = singleflight.Group{} //初始化了一个singleflight
func SharedRes(id int) (int, error) {
key := fmt.Sprintf("id:%d", id) //同一个group上,相同key的,只会执行一次,也就是说用key标识一个共享资源
ret, err, _ := g.Do(key, func() (interface{}, error) {
//调用共享资源
time.Sleep(time.Second) //这里睡1s是模拟资源执行的延迟
fmt.Println("xxxx")
return 1, nil
})
return ret.(int), err
}
func SingleFlight() {
wg := sync.WaitGroup{} //为了等100个goroutine执行完,开启了一个WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() { //模拟并发
ret, err := SharedRes(1)
fmt.Println("ret", ret, err)
wg.Done()
}()
}
wg.Wait() //等待100个goroutine执行完
}
看一下打印结果
xxxx <----这里只输出了一次xxxx
ret 1 <nil> <----这里是返回结果
ret 1 <nil>
ret 1 <nil>
ret 1 <nil>
....
我们从现象可以看出,真正的业务逻辑执行只输出了一次,其他的goroutine也都返回了结果
源代码分析
Group结构
type Group struct {
mu sync.Mutex // 互斥锁
m map[string]*call // 用来保存共享的调用结构,等会分析这个的作用
}
call结构
type call struct {
wg sync.WaitGroup //用来让共享groutine等待用的
//我们传入Do的函数签名是 func() (interface{}, error)
val interface{} //函数返回结果
err error //函数返回错误
forgotten bool //调用Forget会forgotten=true,并从group.m 中删除这个key对应的call
dups int //有几个一起共享结果的
chans []chan<- Result //通过DoChan配合使用,通过返回chan的方式返回结果
}
Do函数分析,这个是这个包中最精华部分一之
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
//加锁,看下g.m是否初始化,没有的话初始化map
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
//看下g.m是否有这个key,第一个调用do的肯定不在g.m里
if c, ok := g.m[key]; ok {
c.dups++ //如果有就共享数加1
g.mu.Unlock() //解锁
c.wg.Wait() //等待第一个执行业务逻辑的goroutine结束
//看下是否报错
if e, ok := c.err.(*panicError); ok { //内部panic
panic(e)
} else if c.err == errGoexit { //如果内部调用了runtime.Goexit,会导致这个错误
runtime.Goexit()
}
//没报错的话返回结果(执行业务逻辑的goroutine会把结果赋值给c.val, c.err)
return c.val, c.err, true
}
//下面是第一个执行业务逻辑的gorutine执行的逻辑
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
doCall函数逻辑也是核心代码之一
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// 调用了runtime.Goexit 会导致fn执行后面的代码不执行, 也就是会出现normalReturn 和recovered 都为false的情况
if !normalReturn && !recovered {
c.err = errGoexit
}
c.wg.Done() //唤醒其他共享资源的goroutine
g.mu.Lock() //
defer g.mu.Unlock()
if !c.forgotten { //没有调用过g.Forget这个forgotten 字段会是false, 执行完后会删除这个key
delete(g.m, key)
}
//判断是否出错
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e) //在另一个goroutine中panic无法recover, 会造成程序退出
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e) //在当前goroutine中panic, 可以被recover
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans { //调用了doChan会使用换到返回结果
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
//执行业务逻辑的匿名函数
func() {
defer func() {
if !normalReturn {
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
//这里执行,赋值了c.val, c.err,共享返回结果, 如果发送panic会被上面的defer捕捉到,不会执行到下面的normalReturn = true
c.val, c.err = fn() //如果调用了runtime.Goexit() 则下面normalReturn = true不是执行, call下面的判断也不会执行导致recovered = true
normalReturn = true
}()
//如果上面匿名函数 执行到normalReturn = true 说明没有发生panic中断函数, 如果normalReturn=false 说明匿名函数里的fn 执行了panic,中断了函数
if !normalReturn {
recovered = true
}
}
这里先要明确执行顺序,才能正确理解流程,这里为大家梳理一下call内部执行顺序
- 先执行了赋值语句,变量初始化为false
- 执行了匿名函数
- 执行匿名函数中的defer
- call函数最下方的if判断
- call函数最上面的defer