Mutex 基础

sync.Mutex 是 Go 提供的互斥锁,在多 goroutine 环境下用来保护共享资源。可以确保同一时刻只有一个 goroutine 能执行锁边界内的代码段,防止数据竞争。

1
2
3
4
5
6
7
8
var mu sync.Mutex
var count int

func increment() {
mu.Lock() // 加锁,防止其他 goroutine 进入临界区,直到当前 goroutine 释放锁
count++
mu.Unlock() // 解锁,允许其他 goroutine 继续
}

特点:

  1. 完全互斥,同一时刻只有一个 goroutine 能执行
  2. 阻塞行为,如果一个 goroutine 已加锁,其他尝试加锁的 goroutine 会阻塞
  3. 简单易用,但在读多写少的场景下性能较差

RWMutex 基础

sync.RWMutex 是 Go 提供的读写锁,允许多个 goroutine 同时读取,但写操作互斥。读锁允许并发读取,期间其他 goroutine 尝试获取写锁会阻塞,写锁阻止其他 goroutine 的读和写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var rw sync.RWMutex
var data = make(map[string]string)

func readData(key string) string {
rw.RLock() // 加读锁,允许多个 goroutine 同时读取
defer rw.RUnlock() // 解读锁
return data[key]
}

func writeData(key, value string) {
rw.Lock() // 加写锁,阻止其他所有读写操作,确保数据一致性
data[key] = value
rw.Unlock() // 解写锁
}

特点:

  1. 读写分离,适合读多写少的场景,提高并发性能
  2. 写锁阻塞读和写锁,写入时保障数据完整性
  3. 读锁不互相阻塞,提高读取性能

sync 包——Mutex 细节(源码解析)


接下来结合 sync.Mutex 的源码,为每个步骤添加相应的代码块,方便大家更直观地理解 Mutex 的加锁和唤醒过程。代码文件:go/src/sync/mutex.go,部份代码是伪代码,核心代码如判断是否自旋代码位于 Go 运行时,使用伪代码便于理解。

1. CAS 操作(Compare-And-Swap)

  • 原理:
    • 通过 CAS(Compare-And-Swap)尝试加锁,如果锁当前空闲(state=0),则直接获取锁。
    • 如果其他 goroutine 已经持有锁,CAS 操作会失败,进入下一步(自旋或等待)。
  • 源码:
1
2
3
4
5
6
7
8
9
10
11
func (m *Mutex) Lock() {
// 快路径:如果锁空闲,直接加锁成功
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 慢路径
m.lockSlow()
}
  • 解释:
    • 这段代码展示了 Mutex 加锁的快路径逻辑。
    • atomic.CompareAndSwapInt32 是核心的 CAS 操作,尝试将 state 从 0 置为 mutexLocked

2. 自旋

  • 原理:
    • 如果第一次 CAS 失败,goroutine 不会立即进入等待队列,而是尝试短暂自旋。
    • 自旋是在不释放 CPU 的情况下多次尝试获取锁,如果在此过程中锁释放,goroutine 可直接成功加锁,避免进入等待队列。
  • 源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (m *Mutex) lockSlow() {
var waitStartTime int64
var starved bool
iter := 0

for {
old := m.state
// 自旋尝试再次 CAS
if old&mutexLocked == 0 {
if atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
break
}
continue
}
iter++
if iter < 4 {
runtime.Gosched() // 自旋几次后让出 CPU
continue
}
// 自旋失败后进入队列
if !starved {
waitStartTime = runtimeNano()
starved = true
}
}
}
  • 解释:
    • 自旋最多进行 4 次,防止浪费过多 CPU 时间。
    • 如果自旋失败,goroutine 会进入慢路径,进入等待队列。

3. 加入队列

  • 原理:
    • 如果自旋后仍然无法获取锁,goroutine 进入等待队列排队,等待锁释放后被唤醒。
    • 在等待期间,goroutine 被挂起,不再占用 CPU。
  • 源码:
1
2
3
4
5
6
7
8
9
10
func (m *Mutex) lockSlow() {
new := old + 1<<mutexWaiterShift
if old&mutexStarving != 0 {
new |= mutexStarving
}
if old&(mutexLocked|mutexStarving) == mutexLocked &&
atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime.Semacquire(&m.sema)
}
}
  • 解释:
    • mutexWaiterShift 记录等待者数量,每次有新 goroutine 进入队列,这个计数器会加一。
    • runtime.Semacquire 是挂起 goroutine 的核心函数,等待锁释放。

4. 被唤醒(从队列中唤醒)

  • 正常模式:
    • 被唤醒的 goroutine 需要和新来的 goroutine 竞争锁,锁可能再次被插队的新 goroutine 抢走。
  • 饥饿模式:
    • 如果 goroutine 等待锁的时间超过 1ms,Mutex 切换到饥饿模式,唤醒的 goroutine 将直接获取锁,不再和新来的 goroutine 竞争。
  • 源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
func (m *Mutex) unlockSlow() {
new := old &^ mutexLocked
if old&mutexStarving == 0 {
new -= 1 << mutexWaiterShift
}
if old&(mutexStarving|mutexLocked) == mutexStarving {
new |= mutexStarving
}
// 唤醒一个等待者
if old>>mutexWaiterShift != 0 {
runtime.Semrelease(&m.sema, false, 1)
}
}
  • 解释:
    • mutexWaiterShift 减少一个等待者数量。
    • 如果锁处于饥饿模式,锁会直接交给等待最久的 goroutine。

完整逻辑流程总结

  1. CAS 快路径尝试加锁
    • 如果锁空闲,直接加锁成功,返回。
    • 如果锁被持有,进入慢路径。
  2. 自旋
    • 在自旋过程中尝试多次 CAS 获取锁,避免直接进入等待队列。
    • 自旋失败后,goroutine 进入等待队列,排队等待锁释放。
  3. 队列等待
    • 锁释放后,唤醒等待队列中的 goroutine,进入竞争。
    • 在正常模式下,新来的 goroutine 也可以参与竞争,可能抢到锁。
  4. 饥饿模式防止插队
    • 如果 goroutine 长时间未获取锁,锁进入饥饿模式,确保锁交给等待最久的 goroutine。

Q & A

  • 为什么 Mutex 会有正常模式和饥饿模式?

    “Go 的 Mutex 采用正常模式和饥饿模式主要是为了在锁的吞吐量和公平性之间找到平衡。正常模式下优先追求高吞吐量,新来的 goroutine 可以在锁释放后插队。但如果等待时间过长(超过 1ms),锁切换到饥饿模式,确保最早等待的 goroutine 直接获取锁,防止长时间饥饿。”

  • 自旋和进入队列的逻辑如何工作?

    “自旋是为了减少锁竞争导致的上下文切换,通过短时间多次尝试获取锁。如果自旋失败,goroutine 进入等待队列。等待锁释放时,goroutine 进入休眠,锁释放后被唤醒,再次竞争锁。”


使用建议

将锁与资源放在同一结构体,目的是:

  1. 保护数据一致性,防止竞态条件
  2. 封装性强,减少直接访问数据的机会
  3. 防止锁的误用或泄漏,提高代码安全性
  4. 符合面向对象的设计原则,提高代码的可维护性和可读性

双重检查锁

一种优化的并发设计模式,通常用于懒加载或减少锁的竞争。它结合了读锁和写锁,确保在多 goroutine 环境下,只有在必要时才加锁,从而提升性能。

实现原理

  1. 第一次检查:加读锁快速判断资源是否存在,存在则直接返回
  2. 第二次检查:在资源不存在的情况下,加写锁并再次检查,防止并发写入
  3. 插入数据,只有在第二次检查后数据仍不存在,才执行写操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type SafeMap struct {
data map[string]string
mutex sync.RWMutex
}

func (s *SafeMap) LoadOrStore(key, value string) (string, bool) {
// 第一次检查,尝试加读锁
s.mutex.RLock()
res, ok := s.data[key]
s.mutex.RUnlock()
if ok {
return res, true
}

// 第二次检查,加写锁
s.mutex.Lock()
defer s.mutex.Unlock()
res, ok = s.data[key]
if ok {
return res, true
}

// 数据不存在,插入新值
s.data[key] = value
return value, false
}

特点和优势:

  1. 减少不必要的写锁,如果数据已经存在,读锁可以快速返回结果,避免频繁加写锁,提升性能
  2. 线程安全,即使在第一次检查和加写锁之间存在竞态条件,第二次检查能确保数据一致性
  3. 使用场景:适合在读多写少的场景,例如缓存加载或初始化操作

Mutex 细节

Mutex 是一种二元状态,0 是解锁,1 是锁定。Mutex 有两种路径,快路径和慢路径。

  1. 快路径

    1. 场景:当锁当前是空闲状态时,goroutine 尝试获取锁,不会阻塞,直接成功。

    2. 细节:

      1. 快路径通过 CAS 原子操作完成,速度非常快。
      2. 当多个 goroutine 并发访问时,只有一个 goroutine 能够成功获取锁,其他 goroutine 进入慢路径等待。
      1
      2
      3
      4
      5
      6
      if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
      }
      return
      }
  2. 慢路径

    1. 场景:当锁已经被其他 goroutine 持有,goroutine获取锁失败后,进入慢路径并阻塞,等待锁释放。

    2. 细节:

      1. 慢路径中,goroutine 会被加入到锁的等待队列中,并挂起阻塞。
      2. 等待持有锁的 goroutine 调用 unlock() 后,系统会唤醒等待队列中的一个或多个 goroutine,重新竞争锁。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      func (m *Mutex) Lock() {
      // Fast path: grab unlocked mutex.
      if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
      }
      return
      }
      // Slow path (outlined so that the fast path can be inlined)
      m.lockSlow()
      }

正常模式和饥饿模式

正常模式(Normal Mode)

  1. 默认模式,所有锁的操作在初始状态下都是以正常模式运行的。
  2. 在正常模式下,等待锁的 goroutine 被放入 FIFO 队列,但在解锁时,当前 goroutine 可以优先重新获取锁,即使有其他 goroutine 在等待队列中排队。
  3. 这种方式更偏向于锁的吞吐量,因为持有锁的 goroutine 可以在释放后立即再次获取,减少上下文的切换。
  4. 示例:
    1. goroutine A 持有锁,goroutine B、C 在等待。
    2. A 解锁后,如果 A 需要再次加锁,A 可能优先于 B、C 获取锁,即使 B、C 先排队。
    3. 还有一种情况是,A 释放锁后,D 立即调用了 Lock(),在 B 和 C还没来得及重新竞争锁时,D 成功加锁。这是由于 Go 的 Mutex 设计更偏向高吞吐量,而非严格的公平性。因此,D 在正确的时机(A 解锁后,B 和 C 还没来得及被唤醒)调用 Lock(),则 D 可以直接通过快路径的 CAS 操作获取到锁。

饥饿模式(Starvation Mode)

  1. 如果一个 goroutine 长时间未能获取锁,即使排在等待队列的前面,它将触发饥饿模式。
  2. 在饥饿模式下,锁的分配变得更加公平,锁的所有权严格按照等待队列的顺序分配,防止长时间的 goroutine 被饿死。
  3. 饥饿模式触发条件:当一个 goroutine 在等待锁超过 1ms 并被其他 goroutine 插队时,锁会进入饥饿模式。
  4. 处于饥饿模式的锁会持续保持,直到等待队列为空,然后回复到正常模式。

源码细节解析

在 Go 的 src/sync/mutex.go 文件中,可以看到关于饥饿和正常模式的详细实现。核心代码片段(简化版):

1
2
3
4
5
6
7
8
9
10
11
12
const (
mutexLocked = 1 << iota // 互斥锁已被锁定 (mutex is locked)
mutexWoken // 有 goroutine 被唤醒
mutexStarving // 锁进入饥饿模式
mutexWaiterShift = iota // 等待者数量的偏移量
)

func (m *Mutex) lockSlow() {
if starving || queueNotEmpty {
m.state |= mutexStarving // 进入饥饿模式
}
}

锁饥饿的优化方案

在正常模式下,如果锁总是在 1ms 内被新的 goroutine 抢占,存在锁饥饿的风险。Go 默认不会强制进入饥饿模式,除非等待者超过 1ms 且持续未能获取锁。因此,为了避免每次都在 1ms 内被其他 goroutine 抢占锁,导致饥饿模式不会触发的极端情况,可以有以下方案:

  1. 缩小锁的粒度

    1. 将锁的临界区缩小,将长时间的逻辑移出临界区,尽量减少阻塞其他 goroutine 的时间。
    2. 减少锁竞争和等待时间,提高系统吞吐量。
    1
    2
    3
    4
    5
    func (s *SafeCounter) Increment() {
    s.mu.Lock() // 仅在临界区内加锁
    s.count++
    s.mu.Unlock()
    }
  2. 读写分离(RWMutex)

    1. 在读多写少的场景下,使用 sync.RWMutex,允许多个 goroutine 同时读,只有写操作需要独占锁。
    2. 写操作仍然需要独占锁,当减少了读锁的阻塞时间。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    var rw sync.RWMutex
    data := make(map[string]string)

    func ReadData(key string) string {
    rw.RLock() // 允许多个 goroutine 同时读取
    defer rw.RUnlock()
    return data[key]
    }

    func WriteData(key, value string) {
    rw.Lock() // 写操作独占锁
    defer rw.Unlock()
    data[key] = value
    }
  3. 信号量与条件变量(sync.Cond)

    1. 使用 sync.Cond 实现公平锁机制,确保等待时间较长的 goroutine 优先获得锁。
    2. sync.Cond 允许 goroutine 在满足一定条件时被唤醒,从而减少锁饥饿问题。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    var cond = sync.NewCond(&sync.Mutex{})
    waiting := 0

    func Worker() {
    cond.L.Lock()
    waiting++
    cond.Wait() // 阻塞,直到被唤醒
    waiting--
    cond.L.Unlock()
    }
  4. 乐观锁与 CAS 机制

    1. 通过 sync/atomic 包实现乐观锁机制,减少对互斥锁的依赖,提升并发性能。
    2. 如果 CAS 操作失败,可以主动重试,而不是阻塞等待。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    var count int32

    func Increment() {
    for {
    old := atomic.LoadInt32(&count)
    if atomic.CompareAndSwapInt32(&count, old, old+1) {
    break
    }
    }
    }