信号量
信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量
简单的说就是通过获取资源和释放资源来进行同步的一种策略
用法
用法一共有以下几步:
- 创建信号量
- 获取信号量
- 释放信号量
//1.创建信号量为10
sem = semaphore.NewWeighted(10)
for i := 0; i <100; i++ {
go func() {
ctx := context.TODO()
//2.获取一个信号量, 信号量一共10个,获取最多获取10,超过的gorutine会挂起
if err := sem.Acquire(ctx, 1); err != nil {
doSomething()
}
//3. 释放信号量,1个
sem.Release(1)
}()
}
代码解读
Weighted 结构(NewWeighted 返回的数据结构)
type Weighted struct {
size int64 //总大小,就是NewWeighted传入的个数
cur int64 //当前消耗的个数
mu sync.Mutex //互斥锁
waiters list.List //等待列表, 当信号量不足时等待的列表
}
waiter 结构(等待列表保存的结构)
type waiter struct {
n int64 //需要的资源数
ready chan<- struct{} // 用来通知gorutine
}
Acquire 方法
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
//判断当前资源是否足够,如果足够则累加s.cur直接返回
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
//如果n 大于总资源就会等待超时或取消,(如果这种情况使用的是context.TODO,或者Background, Done返回nil,就会一直等待,导致gorutine泄露)
if n > s.size {
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
//这里就是判断,总资源够,但是剩余资源不够的情况, 这时就需要其他获取资源的groutine释放资源
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w) //把当前等待的放入队列
s.mu.Unlock()
//等待超时或者资源充足
select {
case <-ctx.Done(): //等待超时
err := ctx.Err()
s.mu.Lock()
select {
case <-ready: //如果超时后立刻有足够资源时也会返回
err = nil
default: //超时后的逻辑
isFront := s.waiters.Front() == elem //判断当前等待的是队列头部元素(因为资源被释放后优先分配给队列头部的)
s.waiters.Remove(elem) //移除超时的waiter
if isFront && s.size > s.cur { //去掉头部后并且有剩余就看下,下个waiter能否满足
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready: //等待资源
return nil
}
}
notifyWaiters 方法
func (s *Weighted) notifyWaiters() {
for {
//取出队列头部的waiter
next := s.waiters.Front()
if next == nil { //没有等待的waiter就退出循环
break
}
w := next.Value.(waiter)
//如果资源不能满足则退出循环
if s.size-s.cur < w.n {
break
}
//如果能满足,就移除当前元素,关闭waiter.ready(Acquire 中 <-w.ready 就能取消阻塞)
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
}
Release 方法
func (s *Weighted) Release(n int64) {
s.mu.Lock()
//从当前使用减去n
s.cur -= n
if s.cur < 0 { //如果Release的资源超过,Acquire的资源就会发生panic
s.mu.Unlock()
panic("semaphore: released more than held")
}
//唤醒waiter
s.notifyWaiters()
s.mu.Unlock()
}