Semaphore(信号量)
信号量 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量 简单的说就是通过获取资源和释放资源来进行同步的一种策略 用法 用法一共有以下几步: 创建信号量 获取信号量 释放信号量 //1.创建信号量为10 sem = semaphore.NewWeighted(10) for i := 0; i <100; i++ { go func() { ctx := context.TODO() //2.获取一个信号量, 信号量一共10个,获取最多获取10,超过的gorutine会挂起 if err := sem.Acquire(ctx, 1); err != nil { doSomething() } //3. 释放信号量,1个 sem.Release(1) }() } 代码解读 Weighted 结构(NewWeighted 返回的数据结构) type Weighted struct { size int64 //总大小,就是NewWeighted传入的个数 cur int64 //当前消耗的个数 mu sync.Mutex //互斥锁 waiters list.List //等待列表, 当信号量不足时等待的列表 } waiter 结构(等待列表保存的结构) type waiter struct { n int64 //需要的资源数 ready chan<- struct{} // 用来通知gorutine } Acquire 方法 func (s *Weighted) Acquire(ctx context.Context, n int64) error { s.mu.Lock() //判断当前资源是否足够,如果足够则累加s.cur直接返回 if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil } //如果n 大于总资源就会等待超时或取消,(如果这种情况使用的是context.TODO,或者Background, Done返回nil,就会一直等待,导致gorutine泄露) if n > s.size { s.mu.Unlock() <-ctx.Done() return ctx.Err() } //这里就是判断,总资源够,但是剩余资源不够的情况, 这时就需要其他获取资源的groutine释放资源 ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) //把当前等待的放入队列 s.mu.Unlock() //等待超时或者资源充足 select { case <-ctx.Done(): //等待超时 err := ctx.Err() s.mu.Lock() select { case <-ready: //如果超时后立刻有足够资源时也会返回 err = nil default: //超时后的逻辑 isFront := s.waiters.Front() == elem //判断当前等待的是队列头部元素(因为资源被释放后优先分配给队列头部的) s.waiters.Remove(elem) //移除超时的waiter if isFront && s.size > s.cur { //去掉头部后并且有剩余就看下,下个waiter能否满足 s.notifyWaiters() } } s.mu.Unlock() return err case <-ready: //等待资源 return nil } } notifyWaiters 方法 func (s *Weighted) notifyWaiters() { for { //取出队列头部的waiter next := s.waiters.Front() if next == nil { //没有等待的waiter就退出循环 break } w := next.Value.(waiter) //如果资源不能满足则退出循环 if s.size-s.cur < w.n { break } //如果能满足,就移除当前元素,关闭waiter.ready(Acquire 中 <-w.ready 就能取消阻塞) s.cur += w.n s.waiters.Remove(next) close(w.ready) } } Release 方法 func (s *Weighted) Release(n int64) { s.mu.Lock() //从当前使用减去n s.cur -= n if s.cur < 0 { //如果Release的资源超过,Acquire的资源就会发生panic s.mu.Unlock() panic("semaphore: released more than held") } //唤醒waiter s.notifyWaiters() s.mu.Unlock() }