a

singlefight是什么

singlefight 直译为"单飞"(雅名到底是啥我也不知道), 顾名思义就是只有一个跑了, 是用来对同一资源控制并发 多个goroutine访问同一个资源时,只有一个goroutine真正的进行访问,其他goroutine等待这一个goroutine返回后共享返回结果

为什么出现singlefight 这个包

上面是什么中已经交代,是为了控制访问同一个资源的并发数,举个例子:假设有个接口访问数据库中id为1的一条数据,如果我们没有控制并发,那么来一百个并发访问这个数据,那么这一百个请求全部取请求数据库(即使有缓存也是全部请求缓存)

如果我们使用了singlefight那么,100个并发讲只有一个请求去数据库,其他99个全部共享那1个返回的结果

怎么用

var g = singleflight.Group{} //初始化了一个singleflight

func SharedRes(id int) (int, error) {
	key := fmt.Sprintf("id:%d", id)  //同一个group上,相同key的,只会执行一次,也就是说用key标识一个共享资源
	ret, err, _ := g.Do(key, func() (interface{}, error) {
		//调用共享资源
		time.Sleep(time.Second) //这里睡1s是模拟资源执行的延迟
		fmt.Println("xxxx")
		return 1, nil
	})
	return ret.(int), err
}

func SingleFlight() {
	wg := sync.WaitGroup{} //为了等100个goroutine执行完,开启了一个WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() { //模拟并发
			ret, err := SharedRes(1)
			fmt.Println("ret", ret, err)
			wg.Done()
		}()
	}
	wg.Wait() //等待100个goroutine执行完
}

看一下打印结果

xxxx         <----这里只输出了一次xxxx
ret 1 <nil>  <----这里是返回结果
ret 1 <nil>
ret 1 <nil>
ret 1 <nil>
....

我们从现象可以看出,真正的业务逻辑执行只输出了一次,其他的goroutine也都返回了结果

源代码分析

Group结构

type Group struct {
	mu sync.Mutex       // 互斥锁
	m  map[string]*call // 用来保存共享的调用结构,等会分析这个的作用
}

call结构

type call struct {
	wg sync.WaitGroup //用来让共享groutine等待用的

  //我们传入Do的函数签名是 func() (interface{}, error)
	val interface{} //函数返回结果
	err error //函数返回错误

	forgotten bool //调用Forget会forgotten=true,并从group.m 中删除这个key对应的call


	dups  int //有几个一起共享结果的
	chans []chan<- Result //通过DoChan配合使用,通过返回chan的方式返回结果
}

Do函数分析,这个是这个包中最精华部分一之

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
  //加锁,看下g.m是否初始化,没有的话初始化map
	g.mu.Lock() 
	if g.m == nil {
		g.m = make(map[string]*call)
	}

  //看下g.m是否有这个key,第一个调用do的肯定不在g.m里
	if c, ok := g.m[key]; ok {
		c.dups++  //如果有就共享数加1
		g.mu.Unlock() //解锁
		c.wg.Wait() //等待第一个执行业务逻辑的goroutine结束

    //看下是否报错
		if e, ok := c.err.(*panicError); ok { //内部panic
			panic(e)
		} else if c.err == errGoexit { //如果内部调用了runtime.Goexit,会导致这个错误
			runtime.Goexit()
		}
    //没报错的话返回结果(执行业务逻辑的goroutine会把结果赋值给c.val, c.err)
		return c.val, c.err, true
	}

  //下面是第一个执行业务逻辑的gorutine执行的逻辑
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

doCall函数逻辑也是核心代码之一

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

	// use double-defer to distinguish panic from runtime.Goexit,
	// more details see https://golang.org/cl/134395
	defer func() {
		// 调用了runtime.Goexit 会导致fn执行后面的代码不执行, 也就是会出现normalReturn 和recovered 都为false的情况
		if !normalReturn && !recovered {
			c.err = errGoexit 
		}

		c.wg.Done() //唤醒其他共享资源的goroutine
		g.mu.Lock() //
		defer g.mu.Unlock()
		if !c.forgotten { //没有调用过g.Forget这个forgotten 字段会是false, 执行完后会删除这个key
			delete(g.m, key)
		}

    //判断是否出错
		if e, ok := c.err.(*panicError); ok {
			// In order to prevent the waiting channels from being blocked forever,
			// needs to ensure that this panic cannot be recovered.
			if len(c.chans) > 0 {
				go panic(e)  //在另一个goroutine中panic无法recover, 会造成程序退出
				select {} // Keep this goroutine around so that it will appear in the crash dump.
			} else {
				panic(e) //在当前goroutine中panic, 可以被recover
			}
		} else if c.err == errGoexit {
			// Already in the process of goexit, no need to call again
		} else {
			// Normal return
			for _, ch := range c.chans { //调用了doChan会使用换到返回结果
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

  //执行业务逻辑的匿名函数
	func() {
		defer func() {
			if !normalReturn {
					if r := recover(); r != nil {
					c.err = newPanicError(r)
				}
			}
		}()
    //这里执行,赋值了c.val, c.err,共享返回结果, 如果发送panic会被上面的defer捕捉到,不会执行到下面的normalReturn = true
		c.val, c.err = fn() //如果调用了runtime.Goexit() 则下面normalReturn = true不是执行, call下面的判断也不会执行导致recovered = true
		normalReturn = true
	}()

  //如果上面匿名函数 执行到normalReturn = true 说明没有发生panic中断函数, 如果normalReturn=false 说明匿名函数里的fn 执行了panic,中断了函数
	if !normalReturn {
		recovered = true
	}
}

这里先要明确执行顺序,才能正确理解流程,这里为大家梳理一下call内部执行顺序

  1. 先执行了赋值语句,变量初始化为false
  2. 执行了匿名函数
  3. 执行匿名函数中的defer
  4. call函数最下方的if判断
  5. call函数最上面的defer

sync源码地址