信号量概念

信号量(Semaphore)是另一种临界区的保护机制,它是操作系统提供的一种协调共享资源访问的方法。它将资源纳入全局考虑,从操作系统的层面对资源进行宏观的调配

这个机制由Dijkstra在20世纪60年代提出,是早期操作系统的主要同步机制。

信号量是一种抽象的数据结构:

  • 由一个整型变量(sem)和两个原子操作组成
  • P() sem - 1 若 sem < 0 进入等待 否则继续
  • V() sem + 1 若 sem <= 0 唤醒一个等待进程

信号量中的整数sem就是这个系统资源剩余量。具体的实现接口P()和V(),分别是荷兰语中增加prolagg减少verhoog的缩写。

信号量的具体实现伪代码(其中sem是剩余资源)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Semaphore {
int sem;
WaitQueue q; // 等待队列
}

Semaphore::P(){
sem--;
if (sem < 0) {
Add thread t to q;
block(p);
}
}
Semaphore::V(){
sem++;
if (sem <= 0) { // 说明队列中有线程在等待
Remove a thread t from q;
Wakeup(t);
}
}

信号量的特性:

  • sem是由操作系统全局保护的整数变量

    • 信号量的相关操作是原子操作
    • sem初始化完成后,只能通过P()V()操作更改
    • P()可能阻塞,V()不会阻塞
  • 通常假定信号量是“公平的”:

    • 线程不会无限地阻塞在P()操作
    • 假定信号量等待按先进先出

信号量的使用

信号量可分为二进制信号量(资源数目为0或1)和资源信号量(资源数目为任何非负值),基于其一可以实现另一个(两者等价)。

信号量可以用于实现临界区的互斥访问和线程间的条件同步

用信号量实现临界区的互斥访问,示例如下:

1
2
3
4
5
mutex = new Semaphore(1);

mutex->P();
Critical Section;
mutex->V();

必须成对使用P()操作和V()操作,不能颠倒、重复或遗漏

此外,利用P和V配对的性质,可以将P和V分开,从而实现条件同步,示例如下:

同步目标是线程B执行完X部分的代码后,线程A才能执行N部分的代码。

实现方式如下图所示:线程A中,如果调用P,那么信号量将变为-1,进入等待状态(这是信号量设计中很精髓的FIFO)。线程B调用V之后,信号量变为0,这时A重新开始运行,从而就实现了两个线程的同步。

img

生产者-消费者问题

Producer-consumer_problem
  • 任何时候只能有一个线程操作缓冲区(互斥访问)
  • 缓冲区空时,消费者必须等待生产者(条件同步)
  • 缓冲区满时,生产者必须等待消费者(条件同步)

用信号量描述在这个问题当中的约束(缓冲区大小为n):

  • 二进制信号量mutex用于互斥访问缓冲区,初始化为1
  • 资源信号量fullBuffers表示当前占满的缓冲区数量,用于限制读出,初始化为0
  • 资源信号量emptyBuffers表示当前为空的缓冲区数量,用于限制写入,初始化为n

伪代码如下:

1
2
3
4
5
class BoundedBuffer{
mutex = new Semaphore(1);
fullBuffers = new Semaphore(0);
emptyBuffers = new Semaphore(n);
}

其生产、消费两个过程相互对称,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
BoundedBuffer::Deposit(c){
emptyBuffers->P();//条件同步
mutex->P(); //互斥访问
Add c to buffer; //核心操作
mutex->V(); //互斥访问
fullBuffers->V();//条件同步
}
BoundedBuffer::Withdraw(c){
fullBuffers->P();//条件同步
mutex->P(); //互斥访问
Add c to buffer; //核心操作
mutex->V(); //互斥访问
emptyBuffers->V();//条件同步
}

使用信号量的困难在于:

  • 读/开发都比较困难
  • 容易出错
  • 不能系统性地解决死锁问题

管程

管程是一种更为现代的实现互斥访问的方法。与临界区相比,管程有更好的封装,如果设计得当,则处理各种问题时,使用将极为便捷。

Synchronization_method_monitor

管程的组成

  • 一个锁
    • 控制管程代码的互斥访问
  • 0或者多个条件变量(0个条件变量 则和 信号量差不多)
    • 管理共享数据的并发访问
Monitor

条件变量(Condition Variable)

管程内的等待机制

  • 进入管程的线程因资源被占用而进入等待状态
  • 每个条件变量表示一种等待原因 对应一个等待队列
  • Wait()操作
    • 将自己阻塞在等待队列中
    • 唤醒一个等待者或释放管程的互斥访问
  • Signal()操作
    • 将等待队列中的一个线程唤醒
    • 如果等待队列为空 则等同于空操作

条件变量实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Class Condition {
int numWaiting = 0; // 等待线程数
WaitQueue q;
}
Condition::Wait(lock){
numWaiting++; // 有线程处于等待状态
Add this thread t to q;
release(lock); // 释放管程的访问权
schedule(); //need mutex
require(lock); // 请求管程访问权
}
Condition::Signal(){
if (numWaiting > 0) {
// 如果等待队列为空 则为空操作
Remove a thread t from q;
wakeup(t); //need mutex
numWaiting--;
}
}

与信号量的一个最为重要的不同在于,使用条件变量时,如果进入等待状态,还能暂时放弃管程的互斥访问等待事件出现时恢复,从而并行进行其他操作,而不是简单地像对临界区那样“固守”。
这一点是使用了条件变量的好处,可以看到中途释放了lock。

用管程解决生产者-消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class BoundedBuffer {
// 管程入口的锁
Lock lock;
// 缓冲区里的个数
int count = 0;
// 条件变量
Condition notFull, notEmpty;
}
/* 用信号量解决的时候 不能把检查条件放在申请互斥操作的后面 是因为会产生死锁 因为都申请了互斥操作了 别的线程 * 不可读不可写 当前线程只能一直在那等
*
* 而用管程不同 它可以放弃管程的互斥访问权限 调度到其他线程去
*/
BoundedBuffer::Deposit(c) {
// 管程进入申请
lock->Acquire();
// 判断缓冲区是否已经满了,满了就等待条件满足时再继续执行
while (count == n)
notFull.Wait(&lock);

Add c to the buffer;
count++;
notEmpty.Signal();
// 管程释放
lock->Release();
}

BoundedBuffer::Remove(c) {
lock->Acquire();
while (count == 0)
notEmpty.Wait(&lock);
Remove c from buffer;
count--;
// 将等待线程中的其中一个唤醒
notFull.Signal();

lock->Release();
}

通过良好的设计封装,可以降低使用过程的使用难度。具体使用情景下,Deposit和Remove都只需要简单的对外接口,但有着有效的互斥访问机制。

管程分类

管程中的条件变量按照不同的释放处理方式分为两种:

  • Hansen管程(当前正在执行的线程更优先)
    • 用于真实OS或Java中
  • Hoare管程
    • 多见于教材

Hansen管程

当前正在执行的线程优先级更高,因此线程连续执行,所以其效率更高

  • 条件变量释放仅是一个提示,需要重新检查条件 (用 while)

因为另一个线程中释放了条件变量后还是在继续往下执行,有可能在这段期间条件变量发生变化,所以仍然需要检查条件

1
2
3
4
5
6
7
8
9
10
Hansen_style::Deposit(){
lock->acquire();
while (count == n) { // 这里是 while
notFull.wait(&lock);
}
Add thing;
count++;
notEmpty.signal();
lock->release();
}
Hansen_monitor

Hoare管程

等待条件变量的优先级更高

  • 条件变量释放同时表示放弃管程访问
  • 释放后条件变量的状态可用
1
2
3
4
5
6
7
8
9
10
Hoare_style::Deposit(){
lock->acquire();
if (count == n) { // 这里是 if
notFull.wait(&lock);
}
Add thing;
count++;
notEmpty.signal();
lock->release();
}
Hoare_monitor

经典同步问题

哲学家就餐问题

5个哲学家围绕一张圆桌坐

  • 桌子放5支叉子
  • 每两个哲学家之间放一支
  • 哲学家动作包含思考和就餐
    • 进餐时需同时拿到左右两边两把叉子
    • 思考时将两支叉子放回原处
  • 要求不出现有人永远拿不到叉子的情况
Dining_philosophers_problem

信号量解决哲学家就餐问题

方案1

  • 会发生死锁!
  • 当五个哲学家同时拿起左边的叉子 所有人都在等待别人放下右边的叉子
1
2
3
4
5
6
7
8
9
10
11
12
13
#define N 5 // 哲学家个数
semaphore fork[5]; // 5支叉子的信号量初值为1
void philosopher(int i) {
// 哲学家编号:0 - 4
while (ture) {
think();
P(fork[i]); // 拿左边的叉子
P(fork[(i + 1) % N]); // 拿右边的叉子
eat();
V(fork[i]); // 放下左边的叉子
V(fork[(i + 1) % N]); // 放下右边的叉子
}
}

方案2

  • 互斥访问正确
  • 每次只能有一个哲学家进餐 性能差
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#define N 5 // 哲学家个数
semaphore fork[5]; // 信号量初值为1
semaphore mutex; // 互斥信号量,初值1
void philosopher(int i)
// 哲学家编号:0 - 4
while(TRUE) {
think();
P(mutex); // 进入临界区 只有一个哲学家能就餐

P(fork[i]); // 去拿左边的叉子
P(fork[(i + 1) % N]);// 去拿右边的叉子
eat();
V(fork[i]); // 放下左边的叉子
V(fork[(i + 1) % N]); // 放下右边的叉子

V(mutex); // 退出临界区
}

方案3

  • 根据编号不同 采取不同动作 避免都拿到一部分资源构造环路的情况
  • 没有死锁 并允许多人(最多两人)同时就餐
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#define N 5 // 哲学家个数
semaphore fork[5]; // 信号量初值为1
void philosopher(int i) // 哲学家编号:0 - 4
while(TRUE) {
think();

if (i % 2 == 0) {
// 偶数 先拿左 后拿右 奇数 先拿右 后拿左
P(fork[i]); // 去拿左边的叉子
P(fork[(i + 1) % N]); // 去拿右边的叉子
} else {
P(fork[(i + 1) % N]); // 去拿右边的叉子
P(fork[i]); // 去拿左边的叉子
}
eat();

V(fork[i]); // 放下左边的叉子
V(fork[(i + 1) % N]); // 放下右边的叉子
}

读者写者问题

共享资源的两类使用者

  • 读者 只读取数据 不修改
  • 写者 读取和修改数据
  • 读读允许
    • 同一时刻允许有多个读者同时读
  • 读写互斥
    • 没有写者时 读者才能读
    • 没有读者时 写者才能写
  • 写写互斥
    • 没有其他写者时 写者才能写

读者写者问题的优先策略

  • 读者优先策略
    • 只要有读者正在读状态 后来的读者都能直接进入
    • 若读者持续不断进入 则写者就处于饥饿
  • 写者优先策略
    • 只要有写者就绪 写者应尽快执行写操作
    • 若写者持续不断就绪 则读者就处于饥饿

信号量解决读者写者问题

此方案 读者优先用信号量解决读者写者问题的写者优先方案

  • 信号量WriteMutex
    • 控制读写操作互斥
    • 初始化为 1
  • 读者计数Rcount
    • 正在进行读操作的读者数目
    • 初始化为 0
  • 信号量CountMutex(对 Rcount 进行保护)
    • 控制对读者计数的互斥修改
    • 初始化为 1

Write线程

1
2
3
P(WriteMutex); // 读写互斥
write();
V(WriteMutex);

Reader线程

1
2
3
4
5
6
7
8
9
10
11
12
13
P(CountMutex); // 保护 Rcount
if (Rcount == 0)
P(WriteMutex);
// 只在第一个读者 开启读写互斥
++Rcount; // 加一读者
V(CountMutex);
read();
P(CountMutex);
--Rcount;
if (Rcount == 0)
V(WriteMutex);
// 没有读者了 让写者有机会访问临界区去写
V(CountMutex);

管程解决读者写者问题

此方案 写者优先

管程的状态变量

1
2
3
4
5
6
7
AR = 0; // 当前读的读者
AW = 0; // 当前写的写者
WR = 0; // 等待读的读者
WW = 0; // 等待写的写者
Lock lock;
Condition okToRead;
Condition okToWrite;

读者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Database::Read() {
// Wait until no writers;
StartRead();
read database;
// check out – wake up waiting writers;
DoneRead();
}
Database::StartRead() {
lock.Acquire();
// 只要有写者就等 说明写者优先
while ((AW+WW) > 0) {
WR++;
okToRead.wait(&lock);
WR--;
}
AR++;
lock.Release();
}
Database::DoneRead() {
lock.Acquire();
AR--;
// 当前没有读者且有等待写的写者 则唤醒 写者
if (AR == 0 && WW > 0) {
okToWrite.signal();
}
lock.Release();
}

写者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Database::Write() {
// Wait until no readers/writers;
StartWrite();
write database;
// check out-wake up waiting readers/writers;
DoneWrite();
}
Database::StartWrite() {
lock.Acquire();
// 只要有正在写的写者或者正在读的读者 就等(不等 等待读的读者 说明写者优先)
while ((AW+AR) > 0) {
WW++;
okToWrite.wait(&lock);
WW--;
}
AW++;
lock.Release();
}
Database::DoneWrite() {
lock.Acquire();
AW--;
if (WW > 0) {
// 优先唤醒等待写的写者
okToWrite.signal();
}
else if (WR > 0) {
// 如果没有等待写的写者 才唤醒等待读的读者
okToRead.broadcast();
}
lock.Release();
}