Go语言设计(5)-RWMutex

Go语言设计(5)-RWMutex

Date
Feb 14, 2022
Tags
Go
Concurrency

读写锁实现机制

读写锁的设计一般满足如下规则:
  • 写写互斥: 与互斥锁语义相同
  • 读写互斥: 包含两部分含义, 都是为了避免读不一致的情况产生:
1. 在拥有写锁的时候其他协程不能获取到读锁
2. 在拥有读锁的时候其他协程不能获取到写锁
  • 读读不互斥: 不同的协程可以同时获取到读锁,可以提高读性能
读不一致: 在读锁加锁期间,读取同一个变量的结果不相同,这个Mysql的”幻读”相同: 假设读锁不会阻塞写锁 mu.lock() —> a= 1 —> 写锁/写入更改a —> a=2 , 就造成了不一致。所以读写互斥是解决这类幻读的一种方案

源码解析

源码地址

RWMutex结构体

type RWMutex struct {
  // 写操作锁, 实现写写操作互斥
	w           Mutex 
   // 写锁信号量
	writerSem   uint32
  // 读锁信号量
	readerSem   uint32
  // 当前读操作的数量,包含所有已经获取到读锁或者被写操作阻塞的等待获取读锁的读操作数量
	readerCount int32  
   // 获取写锁需要等待读锁释放的数量
	readerWait  int32 
}
读写锁中写操作需要在等待读锁释放的时候才能获取写锁,然而这就会存在一个问题,就是写操作在等待读锁释放期间可能还会有新的读操作获取到了锁,尤其是在读并发量特别大的场景下。如果读操作一直能够获取到锁,那么写操作就会一直阻塞下去,从而造成“写饥饿”。那么这种情况该如何避免呢?
RWMutex 利用 readerCount / readerWait 属性,提供了一个非常巧妙的思路。写操作到来的时候,会把 readerCount 的值复制给 readerWait,用来标记在当前写操作之前的读操作的数量。当读锁释放的时候,会递减 readerWait,当它变成 0 的时候,就代表前面的读操作全部完成了,此时写操作会被唤醒。至于如何告诉 RWMutex 当前有正在阻塞中的写操作,这里其实是通过将 readerCount-rwmutexMaxReaders 来实现的,由于 rwmutexMaxReaders 是一个极大值,现实情况并发读的数量远不可能达到这个值,因此读操作一旦发现 readerCount 的值小于 0,便可以知道当前有写操作阻塞了,具体实现可以看下面的相关源码。
RWMutex对外提供了五个方法:
  • Lock() / UnLock(): 写加锁和解锁操作
  • RLock() / RUnlock(): 读加锁和解锁操作
  • RLocker():读锁操作对象,实现 Locker 接口,可以通过 Locker 接口的方法实现读锁的加锁和解锁。

Lock()

写锁加锁。一方面需要获取互斥锁,用于写互斥,基于 Mutex 实现;同时告诉读操作有写操作正在排队,如果当前有正在进行中的读操作的话,那么会阻塞等待所有进行中的读操作完成。
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
// 除去data-race后的源码
func (rw *RWMutex) Lock() {
  // 实现写写互斥
	rw.w.Lock()

 // 实现读写互斥
 // 首先通知读锁,有一个写操作就阻塞
// 如果这里赋值成功,那么后续读锁加锁的时候,都不可能使 readerCount > 0
    // 当然以上是基于读并发数量不可能 > rwmutexMaxReaders 这个前提
    // 当读加锁的时候发现 readerCount < 0 了,就可以知道当前有被阻塞的写操作
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders

	// 等待读锁全部释放,两个条件,满足其一即可:
    // 1. r = 0
    // 2. atomic.AddInt32(&rw.readerWait, r) = 0
    // 否则需要阻塞等待写信号量
  if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
    // 和 RUnlock() 释放写锁信号量相呼应
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}

Unlock()

写锁解锁,需要唤醒被阻塞的读操作,告诉它们可以继续了;同时需要释放写写互斥锁。
func (rw *RWMutex) Unlock() {

  // 和加锁时减 rwmutexMaxReaders 相呼应,告诉读协程没有活跃中的写操作了
  // 这之后 readerCount 又能恢复正常(> 0),代表当前读操作的数量
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		throw("sync: Unlock of unlocked RWMutex")
	}
  // 唤醒读操作,与下文 RLock() 读操作等待信号量相呼应
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()

}

RLock()

读锁加锁,readerCount 减 1 即可,同时如果写锁正在被占用,需要等待写锁释放。写锁被占用的标志就是 readerCount 的值小于 0。

func (rw *RWMutex) RLock() {
// 如果 atomic.AddInt32(&rw.readerCount, 1) < 0,代表写锁正在被占用,需要等待写锁释放
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // 阻塞等待写锁释放读信号量,和上文 Unlock() 释放读锁信号量相呼应
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}

RUnlock()

读锁解锁,需要考虑写操作被阻塞的情况。
func (rw *RWMutex) RUnlock() {
  // readerCount 减 1
  // 如果此时 readerCount < 0,代表有写操作被阻塞,需要走 slow-path,择机唤醒写操作
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
}

// r -> rw.readerCount,此时是读锁解锁减去 1 之后的值
func (rw *RWMutex) rUnlockSlow(r int32) {
  // 如果 r+1 = 0,则 r = -1,代表解锁了一个未加锁的锁
  // 如果 r+1 = -rwmutexMaxReaders,还记得写锁加锁的操作嘛,这里代表解锁了一个写锁
  // 这两种情况都是非法的
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		throw("sync: RUnlock of unlocked RWMutex")
	}

	// 写操作需要等待读操作释放的数量减 1
  // 如果此时值变成了 0,代表前面的读操作全部释放了
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
	// 唤醒一个写锁操作,和 Lock() 等待写锁信号量相呼应
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

RLocker()

返回一个读操作的接口对象。用于操作读锁相关的加锁和解锁方法。

func (rw *RWMutex) RLocker() Locker {
	return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

注意事项

1) 不可复制

RWMutex 结构体是有状态的,一旦被使用,它的几个字段就会记录其运行状态,如果复制了这把锁,就会同时把状态也给复制过来,这对后续新锁的使用会产生不可预知的影响。因此我们在使用读写锁的过程中,需要避免锁的值传递或者值赋值。

2) 加锁和解锁要成对出现

和 Mutex 类似,RWMutex 的 Lock() 与 Unlock(),以及 RLock() 与 RUnlock() 都需要成对出现,只有加锁而没有解锁会导致死锁,只有解锁而没有加锁则会导致程序 panic。在使用过程中我们尤其要注意函数提前退出导致解锁操作未被执行的情况,因此解锁操作可以配合 defer 使用。

3) 避免重入

Golang 标准库提供的锁都是不可重入的:
  • 由于 Mutex 不可重入,所以基于 Mutex 的写锁不可重入,否则会产生死锁;
  • 在读操作的占用读锁的同时需要获取写锁,会产生死锁;
  • 如果读写锁之间相互循环依赖,也会产生死锁问题。

使用示例

构造一个缓存系统
// 回源函数
type Load func(string) interface{}

type Cache interface {
    // Get 从缓存中取值,如果缓存中没有,那么通过 load 方法回源并加载进缓存
    Get(string, Load) interface{}

    // Put 往缓存中写数据
    Put(key, value string)
}
遵循面向接口的编程范式,我们定义了 Cache 接口,包含两个方法。其中 Get()操作需要加读锁防止并发写,如果数据不存在,则需要升级为写锁,回源数据并写缓存;Put()操作需要加写锁,防止并发读写问题。具体实现如下:
// 利用 RWMutex 实现一个缓存
// 缓存是一个典型的读多写少的系统,这与读写锁的使用场景不谋而合
type MCache struct {
    cacheMap map[string]interface{}
    sync.RWMutex
}

// Get 读,按需加载
func (mc *MCache) Get(key string, load func(string) interface{}) (value interface{}) {
    mc.RLock()
    if val, exist := mc.cacheMap[key]; exist {
        value = val
        mc.RUnlock()
        return value
    }
    mc.Lock()
    defer mc.Unlock()
    // 双重检查
    if val, exist := mc.cacheMap[key]; exist {
        value = val
        return
    }
    // 查询数据源并写到缓存中
    value = load(key)
    mc.cacheMap[key] = value
    return
}

// Put 写,需要添加写锁
func (mc *MCache) Put(key, value interface{}) {
    mc.Lock()
    mc.cacheMap[key] = value
    mc.Unlock()
}
 

Loading Comments...