信号量

信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量

简单的说就是通过获取资源和释放资源来进行同步的一种策略

用法

用法一共有以下几步:

  1. 创建信号量
  2. 获取信号量
  3. 释放信号量
//1.创建信号量为10
sem = semaphore.NewWeighted(10)
for i := 0; i <100; i++ {
  go func() {
    ctx := context.TODO()
    //2.获取一个信号量, 信号量一共10个,获取最多获取10,超过的gorutine会挂起
    if err := sem.Acquire(ctx, 1); err != nil {
      doSomething()
    } 
    //3. 释放信号量,1个
   sem.Release(1)
  }()
}

代码解读

Weighted 结构(NewWeighted 返回的数据结构)

type Weighted struct {
	size    int64 //总大小,就是NewWeighted传入的个数
	cur     int64 //当前消耗的个数
	mu      sync.Mutex //互斥锁
	waiters list.List //等待列表, 当信号量不足时等待的列表
}

waiter 结构(等待列表保存的结构)

type waiter struct {
	n     int64 //需要的资源数
	ready chan<- struct{} // 用来通知gorutine
}

Acquire 方法

func (s *Weighted) Acquire(ctx context.Context, n int64) error {
	s.mu.Lock()
  //判断当前资源是否足够,如果足够则累加s.cur直接返回
	if s.size-s.cur >= n && s.waiters.Len() == 0 {
		s.cur += n
		s.mu.Unlock()
		return nil
	}

  //如果n 大于总资源就会等待超时或取消,(如果这种情况使用的是context.TODO,或者Background, Done返回nil,就会一直等待,导致gorutine泄露)
	if n > s.size {
		s.mu.Unlock()
		<-ctx.Done()
		return ctx.Err()
	}

  //这里就是判断,总资源够,但是剩余资源不够的情况, 这时就需要其他获取资源的groutine释放资源
	ready := make(chan struct{})
	w := waiter{n: n, ready: ready}
	elem := s.waiters.PushBack(w) //把当前等待的放入队列
	s.mu.Unlock()

  //等待超时或者资源充足
	select {
	case <-ctx.Done(): //等待超时
		err := ctx.Err()
		s.mu.Lock()
		select {
		case <-ready: //如果超时后立刻有足够资源时也会返回
			err = nil
		default: //超时后的逻辑
			isFront := s.waiters.Front() == elem //判断当前等待的是队列头部元素(因为资源被释放后优先分配给队列头部的)
			s.waiters.Remove(elem) //移除超时的waiter
			if isFront && s.size > s.cur { //去掉头部后并且有剩余就看下,下个waiter能否满足
				s.notifyWaiters()
			}
		}
		s.mu.Unlock()
		return err

	case <-ready: //等待资源
		return nil
	}
}

notifyWaiters 方法

func (s *Weighted) notifyWaiters() {
	for {
    //取出队列头部的waiter
		next := s.waiters.Front()
		if next == nil { //没有等待的waiter就退出循环
			break 
		}

		w := next.Value.(waiter) 
    //如果资源不能满足则退出循环
		if s.size-s.cur < w.n {
			break
		}

    //如果能满足,就移除当前元素,关闭waiter.ready(Acquire 中 <-w.ready 就能取消阻塞)
		s.cur += w.n
		s.waiters.Remove(next)
		close(w.ready)
	}
}

Release 方法

func (s *Weighted) Release(n int64) {
	s.mu.Lock()
  //从当前使用减去n
	s.cur -= n
	if s.cur < 0 { //如果Release的资源超过,Acquire的资源就会发生panic
		s.mu.Unlock()
		panic("semaphore: released more than held")
	}
  //唤醒waiter
	s.notifyWaiters()
	s.mu.Unlock()
}