tags:
- OS
在之前的学习中,我们了解到许多并发给我们带来的诸多好处,如更好的资源利用率、更低的响应时间、更好的用户体验。但在享受这些便利的同时,并发还会为系统带来许多灾难。在设计系统的并发时,我们要考虑资源共享的问题,不当的程序可能会为系统带来条件竞争(Race condition)、饥饿和死锁。这些问题一定要在开始就规避掉,不然会酿成大祸。
同步指通过协调多个并发线程(或进程)的执行,是他们能够安全的共享资源或进行通信。同步的目的是防止竞争、数据不一致和死锁等问题,从而保证系统的正确性和稳定性。我们下面先举两个例子展示不同的同步模式。
在多核系统上,两个线程是有并行执行的能力的。但有时候,两个线程虽然可以同时执行,但是不可以同时访问某些资源,我们称这种情况为互相排斥的同步问题(mutual exclusion)。在之前的学习中,共享内存的访问就需要进行互斥的访问(虽然也需要serialization的同步)。
还有另一种情况,两个线程的执行必须要有一定的顺序,比如B线程需要A线程执行完毕后才能执行或是两个线程交替着执行亦或是更复杂的情况。我们一律将这种情况称作serialization的同步问题。同样的,我们之前学习管道的进程间通讯方式就需要进行serialization的同步。
我们很多写的简单测试程序都是确定性的程序,即我们在程序运行前我们就能知道输出是什么(比如简单的打印"Helloworld\n")。但当程序涉及到并发时,我们的程序有时就不再是确定性的程序了,我们称之为不确定性程序(Non-deterministic program)。
下面的C++程序就是不确定性的程序:
#include <iostream>
#include <thread>
int counter = 0;
int main(){
std::thread newThread([](){
for (int i = 0; i < 10000000; i++){
counter++;
}
});
for(int i = 0; i < 10000000; i++){
counter++;
}
newThread.join();
printf("I counted: %d\n", counting);
return 0;
}
这个程序很奇怪,当你打印结果,你会发现它的输出千奇百怪,总之就是不为20000000
。为什么?虽然看上去并不需要同步,但高级语言中的++
、--
不仅仅只是看上去那么简单。这些指令被称为read-modify-write指令,你需要先读值、修改最后写回结果。造成这种情况完全是因为read-modify-write指令并不是一气呵成的。(两线程都先读,之后两线程先后写,会造成什么情况?)
由这种不确定性引起的程序bug,我们将其戏称为Heisenbug(得名于 "Heisenberg Uncertainty Principle")。这时,我们就需要对资源的操作进行 mutual exclusion 的同步操作。
我们现在明白,造成 Heisenbug 的主要原因在于资源的分配问题和执行顺序的控制问题。上边的程序中并发引起的 Heisenbug 就是由于单一资源分配不当导致的。当然,资源个数也可以有很多,这时要考虑的问题就会随着资源个数的增加而变得不同。
对执行顺序的控制能够确保多个进程或线程按照预期的顺序执行,以满足特定的任务依赖关系和执行逻辑。对于这些非确定性程序,我们可以用一些同步工具来管理和控制资源或执行顺序。通过这些同步工具,我们能够有效防止竞态条件和资源冲突,确保系统的正确性和稳定性。但在此之前,我们有必要先了解一下临界区的管理。
在之前的示例中,我们发现程序的不确定性是由一条 read-modify-write 高级语言指令(如 i++;
、i--;
)引起的。为了避免数据竞争,我们只需要对这条指令进行管理即可。这也正是我们即将要学习的临界资源和临界区中所要关注的内容。
临界资源是指那些在同一时刻只能被一个进程访问的共享资源,如共享内存、文件、打印机及其他系统互斥资源。对临界资源的访问必须是不可中断的。前面引起Heisenbug的C++程序中,那个全局变量就是需要互斥访问的临界资源。
临界区就是并发线程(或进程)中访问了临界资源的一段代码。要使得系统运行稳定、规避并发带来的一系列问题,我们需要保证时刻只有一个进程在临界区内代码来避免出现竞争条件或是数据不一致现象。之前的例子中,counter++;
就是对临界区的访问,这种情况下,我们的临界区可以划分的很小。
for(int i = 0; i < 10000000; i++){
// Critical Section starts form here
counter++;
// To here
}
现实生活中,订票系统是我们身边较为常见的可能引起数据不一致的场景。我们通过 pthread 库给这个进程中生成了两个线程。我们共有两张票,两个线程都会 booking 一张票,因此我们期望进程执行后余票为0。但由于并发执行,两个线程在进入临界区时可能会导致数据的不一致性。
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int ticket_amount = 2;
void* ticketAgent(void* arg){
int t = ticketAmount;
if (t > 0){
printf("One ticket sold\n");
t--;
}else{
printf("Ticket sold out\n");
}
ticketAmount = t;
pthread_exit(0);
}
int main(int argc, char const* agrv[]){
pthread_t ticketAgent_tid[2];
for(int i = 0; i < 2; i++){
pthread_create(ticketAgent_tid+i, NULL, ticketAgent, NULL);
}
for (int i = 0; i < 2; i++){
pthread_join(ticketAgent_tid[i], NULL);
}
sleep(1);
printf("The left ticket is %d\n", ticketAmount);
return 0;
}
运行结果如下:
代码执行后我们看到一些结果是正确的,还有一些执行中售出两张票但仍留了一张。这是因为并发线程交替执行”临界区“中的代码,在上面我们也提到了 --
这种 read-modify-write 指令。如此,并发执行会引起 Heisenbug 。对于这种情况,我们提出临界区管理的原则。
管理临界区的本质就是要对访问临界区的并发进程进行同步,要实现的目标有互斥、前进和有限等待。通过这些目标,我们能够在不同的同步模式下管理临界区并避免竟态条件和资源冲突所带来的一系列问题。
互斥锁是由操作系统提供的临界区同步工具,可以保证临界区的管理目标要求。互斥锁的上锁、解锁操作是原子化的,因而保证了每次只会有一个进程/线程进入临界区。互斥锁的同步模型如下:
acquireLock();
//+--------------------+
//| |
//| critical section |
//| |
//+--------------------+
releaseLock();
在这段同步模型的伪代码中,我们满足了临界区管理的三大原则,即:
下面是一些POSIX系统提供的一些互斥锁的系统调用。相关操作有:
#include <pthread.h>
pthread_mutex_t lock;
pthread_mutex_init(&lock,NULL);
pthread_mutex_lock(&lock);
pthread_mutex_unlock(&lock);
pthread_mutex_destroy(&lock);
借助pthread
库,我们就可以实现线程间的互斥。我们在 ticket booking 中加入互斥锁,如下:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int ticketAmout = 2; // 票的数量: 全局变量
pthread_mutex_t lock; // 定义互斥锁
void* ticketAgent(void* arg){
pthread_mutex_lock(&lock); // 上锁
int t = ticketAmout;
if (t > 0){
printf("One ticket sold\n");
t--;
}else{
printf("Ticket sold out\n");
}
ticketAmout = t;
pthread_mutex_unlock(&lock); // 解锁
pthread_exit(0);
}
int main(int argc, char const* agrv[]){
pthread_t ticketAgent_tid[2];
pthread_mutex_init(&lock, NULL); // 锁的初始化
for(int i = 0; i < 2; i++){
pthread_create(ticketAgent_tid+i, NULL, ticketAgent, NULL);
}
for (int i = 0; i < 2; i++){
pthread_join(ticketAgent_tid[i], NULL);
}
sleep(1);
printf("The left ticket is %d\n", ticketAmout);
return 0;
}
观察输出,你会发现此时不会有资源共享问题导致的数据不一致性了。pthread库除了提供互斥锁,还有读写锁、自旋锁(Spin-lock) 等,这些都可以为我们临界区的管理提供同步的帮助。
从上节课的例子中,我们看到互斥锁的使用十分简单。借助 pthread
库,我们只需要定义并初始化一把锁,接着进行上锁、解锁和锁的摧毁操作即可。要实现互斥锁对临界区的管理,就需要满足互斥、前进、有限等待三个条件。本节课,带着对临界区的管理原则,我们来尝试实现一把锁。
在第一次尝试中,互斥锁并不满足互斥和有限等待的条件,因为测试和上锁并不是一气呵成的。因而,线程1和线程2可能同时通过测试并上锁,导致两个线程都认为自己获得了锁。此外,没有其他的条件分支,这个锁实际上并不能发挥其应有的作用。
bool mutex_lock = false;
lock(mutex_lock){
if(mutex_lock == false){ // test
mutex_lock = true; // lock
}
}
//+--------------------+
//| |
//| critical section |
//| |
//+--------------------+
unlock(mutex_lock){
mutex = false;
}
为了满足有限等待的条件,我们将判断嵌入到 while
循环中。如果线程2在进入循环时不满足 mutex_lock = false
的条件就会进入空循环,等待线程1解锁释放临界区资源。但是在第二次尝试中,测试和上锁仍然是分开的,不符合互斥的条件。
bool mutex_lock = false;
lock(mutex_lock){
while(mutex_lock != true){ // test
;
}
mutex_lock = true; // lock
}
//+--------------------+
//| |
//| critical section |
//| |
//+--------------------+
unlock(mutex_lock){
mutex_lock = false;
}
为了使得测试和上锁操作连贯且不可打断,我们需要引入原子操作的概念。原子操作就是在执行过程中需要一气呵成、不会分割、不能也不会被中断的操作。原子操作也称为原语,一般由硬件实现或系统提供。无论是什么原子操作,需要实现其原子性就离不了硬件平台的支持。
硬件支持的原子操作是由处理器提供的一些指令,确保这些操作在执行时不会被其他操作中断。下面列举了几个常见的硬件原子操作指令的实现:
// The atomic hardware instruction CAS would not be interrupted.
bool compare_and_swap(int* ptr, int old_val, int new_val){
if(*ptr == old_val){
*ptr = new_val;
return true;
} else {
return false;
}
}
// Same, wouldn't be interrupted
int test_and_set(int* ptr, int new_val){
int old_val = *ptr;
*ptr = new_val;
return old_val;
}
// Same...
int fetch_and_add(int* ptr, int value){
int old_val = *ptr;
*ptr += value;
return old_val;
}
系统提供的原子操作主要通过操作系统和编程语言的库函数实现,来确保在多线程环境下操作的安全性。以下是一些常见的方法:
pthread_mutex_lock
和 pthread_mutex_unlock
用于加锁和解锁。std::atomic
来定义原子变量,提供原子性的读写操作。需要注意的是,系统所提供的这些原子性操作通常是通过对硬件封装所得的。如果硬件层面不支持原子操作,系统很难在不借助这些底层支持的情况下实现真正的原子性。
用原子操作指令,我们能够完成一个如下的简易版 spinlock。
//未上锁:0
//已上锁:1
bool mutex = 0;
lock(mutex){
while(compare_and_swap(mutex, 0, 1) == false){ //测试+上锁
//while(test_and_set(mutex, 1) == 1){
;
}
}
//+--------------------+
//| |
//| critical section |
//| |
//+--------------------+
unlock(mutex){
mutex = 0;
}
忙式等待是指一个进程或线程在等待某个条件满足时,不断地循环检查该条件是否满足的技术,这个循环等待的过程仍然在消耗CPU资源。我们本节课前面的互斥锁正是应用了这种技术,这类锁也称作自旋锁(SpinLocks)。
若进程请求锁时发现锁已不可用,那就让该进程或线程睡觉(阻塞/等待状态),这样就不会消耗CPU来执行等待。但是,临界区中的进程离开临界区时在解锁操作中要唤醒之前等待的进程(从等待状态迁移到就绪状态)。上节课我们用到的 pthread_mutex
就是这类锁,即阻塞锁。
上节课我们简单地实现了自己的自旋锁,在课程末尾,我们提到了阻塞锁。我们实现自旋锁时的忙等待白白浪费CPU资源,这不是我们想要的。这节课,我们就来学习互斥锁。并比较两者的差别。
pthread_mutex
POSIX 系统提供了pthread_mutex
互斥锁来实现不同操作之间的互斥。我们之前也在课程中使用过pthread_mutex
。当检查到互斥锁已经被其他线程所侵占时,线程就会阻塞自己,由于这种实现方式,互斥锁也被称作阻塞锁。
下面是POSIX thread库中所提供的互斥锁的系统调用。我们最熟悉的是lock和unlock的系统调用,简单明了。当我们使用pthread_mutex_lock()
时,这个函数会先检查锁是否可用,若是不可用就阻塞自己,锁可用后会向阻塞线程发送信号。
int pthread_mutex_lock(pthread_mutex_t *mutex);
/*
Parameters:
1. mutex: Pointer to the mutex to lock.
Return value: Returns 0 on success, otherwise an error number.
*/
如果你想实现一些条件分支,例如当锁不可用时怎么怎么办,pthread还提供trylock的逻辑。不同于mutex_lock,trylock并不会阻塞线程。当如果互斥锁已经被其他线程占有时trylock会返回错误码 EBUSY
,表示锁当前不可用。
int pthread_mutex_trylock(pthread_mutex_t *mutex);
/*
Parameters:
1. mutex: Pointer to the mutex to try to lock.
Return value: Returns 0 on success, otherwise an error number.
*/
下面是对锁的解锁操作。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
/*
Parameters:
1. mutex: Pointer to the mutex to unlock.
Return value: Returns 0 on success, otherwise an error number.
*/
说完了加锁和解锁,要使锁可用,锁的初始化和销毁是绕不开的点。使用pthread_mutex_init()
我们可用将锁初始化为几种不同的状态。如果状态参数设置为NULL,那么就会使用默认的mutex类型,即PTHREAD_MUTEX_DEFAULT
,行为未被定义。此外,我们还有其他的几种行为,如下:
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
/*
Parameters:
1. mutex: Pointer to the mutex to initialize.
2. attr: Pointer to a mutex attributes object, or NULL for default attributes.
- PTHREAD_MUTEX_NORMAL: Mutex relocking is not allowed, will cause deadlock.
- PTHREAD_MUTEX_ERRORCHECK: Mutex relocking will return an error.
- PTHREAD_MUTEX_RECURSIVE: Allowing the same mutex relock multiple times.
- PTHREAD_MUTEX_DEFAULT: Default mutex, behavior undefined.
Return value: Returns 0 on success, otherwise an error number.
*/
int pthread_mutex_destroy(pthread_mutex_t *mutex);
/*
Parameters:
1. mutex: Pointer to the mutex to destroy.
Return value: Returns 0 on success, otherwise an error number.
*/
一旦调用 pthread_mutex_destroy()
销毁了一个互斥锁,该互斥锁对象就不能再被使用了。销毁互斥锁会释放与该互斥锁相关的所有资源。如果在销毁后尝试使用该互斥锁,可能会导致未定义行为,甚至程序崩溃。
int pthread_mutex_lock(pthread_mutex_t *mutex){
//尝试使用CAS操作来获取锁
if(atomic_compare_and_swap(&mutex->lock,0,1)){ //这是一个快速路径
//成功获取锁
return 0;
}
//锁已经被其他执行流占有,进入慢速路径(阻塞自己,省略实现)
return -1;
}
pthread_spinlock
POSIX 系统也提供了 pthread_spinlock
自旋锁来实现不同操作之间的互斥。我们前面实现过自己的一个自旋锁。自旋锁在检查到锁已经被其他线程占用时,不会阻塞线程,而是会在一个循环中不断检查锁的状态,直到锁可用。下面是 POSIX thread 库中所提供的自旋锁的系统调用:
#include <pthread.h>
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
/*
Parameters:
1. lock: Pointer to the spinlock to initialize.
2. pshared: If non-zero, the spinlock is shared between processes; if zero, it is shared between threads of the same process.
Return value: Returns 0 on success, otherwise an error number.
*/
int pthread_spin_destroy(pthread_spinlock_t *lock);
/*
Parameters:
1. lock: Pointer to the spinlock to destroy.
Return value: Returns 0 on success, otherwise an error number.
*/
int pthread_spin_lock(pthread_spinlock_t *lock);
/*
Parameters:
1. lock: Pointer to the spinlock to lock.
Return value: Returns 0 on success, otherwise an error number.
*/
int pthread_spin_trylock(pthread_spinlock_t *lock);
/*
Parameters:
1. lock: Pointer to the spinlock to try to lock.
Return value: Returns 0 on success, otherwise an error number.
*/
int pthread_spin_unlock(pthread_spinlock_t *lock);
/*
Parameters:
1. lock: Pointer to the spinlock to unlock.
Return value: Returns 0 on success, otherwise an error number.
*/
自旋锁的系统调用和阻塞锁的系统调用很类似,唯一的差别就在于阻塞还是忙等待。你可能注意到初始化函数也有所不同,但是你实际上可以通过下面这种方式设置互斥锁的状态:
pthread_mutex_t mutex;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&mutex, &attr);
下面是其函数原型:
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int pshared);
/*
Parameters:
1. attr: Pointer to the mutex attributes object.
2. pshared: The process-shared attribute. It can be set to:
- PTHREAD_PROCESS_SHARED: The mutex can be shared between processes.
- PTHREAD_PROCESS_PRIVATE: The mutex is shared only between threads of the same process.
Return value: Returns 0 on success, otherwise an error number.
*/
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#define NUM_ITERATIONS 100000
pthread_mutex_t mutex;
int counter = 0;
void* mutex_thread_func(void* arg){
for (int i = 0; i < NUM_ITERATIONS; ++i)
{
pthread_mutex_lock(&mutex); //Get mutex lock
counter++;
for (int j = 0; j < 10000; ++j); //Increase system load
pthread_mutex_unlock(&mutex); //Release the lock
}
return NULL;
}
int main(int argc, char const *argv[])
{
pthread_t t1,t2;
struct timespec start, end;
pthread_mutex_init(&mutex, NULL); //Initialize the lock
clock_gettime(CLOCK_MONOTONIC, &start); //Start time counting
pthread_create(&t1, NULL, mutex_thread_func, NULL);
pthread_create(&t2, NULL, mutex_thread_func, NULL);
pthread_join(t2, NULL);
pthread_join(t1, NULL);
clock_gettime(CLOCK_MONOTONIC, &end); //Finish time counting
pthread_mutex_destroy(&mutex); //Destroy the lock
double elapsed = (end.tv_sec - start.tv_sec)
+ (end.tv_nsec - start.tv_nsec) / 1e9;
printf("Final counter value with pthread_mutex_t: %d\n",counter);
printf("Elapsed time with pthread_mutex_t: %f\n",elapsed);
return 0;
}
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#define NUM_ITERATIONS 100000
pthread_spinlock_t spinlock;
int counter = 0;
void* mutex_thread_func(void* arg){
for (int i = 0; i < NUM_ITERATIONS; ++i)
{
pthread_spin_lock(&spinlock); //Get spin lock
counter++;
for (int j = 0; j < 10000; ++j); //Increase system load
pthread_spin_unlock(&spinlock); //Release the lock
}
return NULL;
}
int main(int argc, char const *argv[])
{
pthread_t t1,t2;
struct timespec start, end;
pthread_spin_init(&spinlock, PTHREAD_PROCESS_PRIVATE); //Initialize the lock
clock_gettime(CLOCK_MONOTONIC, &start); //Start time counting
pthread_create(&t1, NULL, mutex_thread_func, NULL);
pthread_create(&t2, NULL, mutex_thread_func, NULL);
pthread_join(t2, NULL);
pthread_join(t1, NULL);
clock_gettime(CLOCK_MONOTONIC, &end); //Finish time counting
pthread_spin_destroy(&spinlock); //Destroy the lock
double elapsed = (end.tv_sec - start.tv_sec)
+ (end.tv_nsec - start.tv_nsec) / 1e9;
printf("Final counter value with pthread_spinlock_t: %d\n",counter);
printf("Elapsed time with pthread_spinlock_t: %f\n",elapsed);
return 0;
}
运行的结果因机器而异,但这两种锁的实现方式决定了它们的特点。对于互斥锁而言,线程/进程切换的开销是其主要的开销;而对于自旋锁,忙等待让CPU空转是其主要的开销。我们知道,进程/线程的切换需要保存上下文信息,这需要一定的代价,从而,两种锁的比较就变成了空转的时间更长还是上下文切换的时间更长。
不难想象,线程占有锁的时间越长,相比忙等待,线程切换的系统开销占比就会越小,这时mutex显然更优。但由于空循环忙等待的开销占比基本一成不变,当短时间持有锁时则 spinlock 更优。
特性 | Mutex | Spinlock |
---|---|---|
基础结构 | pthread_mutex_t |
pthread_spinlock_t |
加锁操作 | pthread_mutex_lock |
pthread_spin_lock |
解锁操作 | pthread_mutex_unlock |
pthread_spin_unlock |
等待机制 | 阻塞,进入内核态等待 | 自旋,忙等占用CPU资源 |
使用场景 | 长时间持有锁 | 短时间持有锁 |
性能 | 适合长时间等待,这时CPU利用率提高 | 适合短时间等待,免去上下文切换的开销 |
复杂度 | 较高,依赖 mutex 等系统调用 |
较低,基于原子操作实现 |
我们学过了临界区的管理原则有:互斥、前进、有限等待。在原子操作和互斥锁工具还没有被应用的时代,Dekker算法和Peterson算法就是当是两种为了解决临界区管理问题所应用的算法。
Peterson算法是软件解决管理临界区问题经典算法。它利用线程间访问共享资源的方式实现进程/线程间互斥访问临界区的算法。
Peterson算法的伪代码如下:
//Peterson算法的变量
bool flag[2] = {false,false}; //flag[i]表示第i个线程/进程有进入临界区的意愿
int turn; //turn的值j表示现在第j个值有进入临界区的令牌
/*
当flag[i] && turn == i,这时只有第i个进程才能进入临界区。实现了对临界区的管理
*/
P0进程:
flag[0] = true; //P0a
turn = 1; //P0b
while(flag[1] && turn == 1){
//busy wait
}
//start of critical section
...
//end of critical section
flag[0] = false; //unlock
P1进程:
flag[1] = true; //P1a
turn = 0; //P1b
while(flag[0] && turn == 0){
//busy wait
}
//start of critical section
...
//end of critical section
flag[1] = false; //unlock
在算法执行时,由于进程的并发性,语句执行顺序及结构可能出现如下几种可能:
flag[0] | flag[1] | turn | 可能的执行顺序 | 最终执行 |
---|---|---|---|---|
true | false | 1 | P0a-P0b-(P1a-P1b) | P0 |
false | true | 0 | P1a-P1b-(P0a-P0b) | P1 |
true | ture | 1 | P0a-P1a-P0b-(P1b) / P1a-P0a-P0b-(P1b) | P1 |
true | true | 0 | P0a-P1a-P1b-(P0b) / P1a-P0a-P1b-(P0b) | P0 |
Peterson 算法是一种不依赖硬件实现的原子操作机制。如今大多数CPU以指令乱序执行来提高执行效率,此时实现 Peterson 算法就得使用相关内存屏障指令。现在一般使用硬件支持的原子操作机制(比如 test-and-set 或 compare-and-swap),这些机制往往只需要很少的硬件支持。
和 Peterson 算法一样,Dekker 算法也是用进程间共享资源的方式实现进程间的互斥操作。同年 Dijkstra 提出信号量的概念也是受 Dekker 算法的影响。
Dekker 算法的伪代码如下:
// Dekker算法的变量
bool wants_to_enter[2] = {false, false}; // 表示进程是否想要进入临界区
int turn = 0; // 表示当前的优先权
进程P0:
// 进程P0
wants_to_enter[0] = true; //P0a
while (wants_to_enter[1]) {
if (turn != 0) {
wants_to_enter[0] = false;
while (turn != 0) {
// 忙等待
}
wants_to_enter[0] = true;
}
}
// 临界区
...
turn = 1;
wants_to_enter[0] = false;
// 剩余部分
进程P1:
// 进程P1
wants_to_enter[1] = true; //P1a
while (wants_to_enter[0]) {
if (turn != 1) {
wants_to_enter[1] = false;
while (turn != 1) {
// 忙等待
}
wants_to_enter[1] = true;
}
}
// 临界区
...
turn = 0;
wants_to_enter[1] = false;
// 剩余部分
wte[0] | wte[1] | turn | 最终执行 |
---|---|---|---|
true | false | 0 | P0 |
false | true | 0 | P1 |
true | ture | 0 | P0 |
信号量概念最早由荷兰计算机科学家Edsger W. Dijkstra在1965年提出,是用与多线程环境下同步和互斥的一种机制。这个机制包含一个值和两个原子操作:
信号量是一种比互斥锁更强大的同步工具,它的引入极大地简化了复杂的并发控制问题,使得操作系统和并发编程中的资源管理变得更有效和可靠。
如果信号量只包含数值,且在数值不可用时实施忙式等待,那么就成这对PV操作为自旋PV操作。(也叫”整数信号量“)
semaphore sem;
wait(sem){
while(s <= 0){
//busy waiting
}
s--;
}
post(sem){
s++;
}
要消除自旋等待,我们可以把信号量打造成一个包含值和队列指针的结构体。具体思路如下:我们在执行P操作时,先判断数值是否可用,若不可用,利用进程指针将进程给阻塞掉。等到资源可用后,V操作在阻塞队列中再一个个地唤醒阻塞进程。这就是阻塞PV操作,也叫记录型信号量。
typedef struct{
int value;
struct process* L;
}semaphore;
semaphore sem;
wait(sem){
s.value
if(s.value < 0){
block(s.L);
}
}
post(sem){
s.value++;
if(s.value <= 0){
wakeup(P);
}
}
我们假设有一个信号量s,它的初始值为1,我们还有3个并发进程 P1、P2、P3 都要对s先进行P操作,再进行V操作,下面是其中一种可能的顺序:
操作 | 自旋PV操作 | 阻塞PV操作 |
---|---|---|
P1执行P操作 | s ← 0 |
s ← 0 |
P2执行P操作 | busy waiting | s ← -1 and block P2 |
P3执行P操作 | busy waiting | s ← -2 and block P3 |
P1执行V操作 | s ← 1 |
s ← -1 and unblock P2 |
P2被”唤醒“并执行 | terminates waiting and s ← 0 |
P2 running |
P2执行V操作 | s ← 1 |
s ← 0 and unblock P3 |
P3被“唤醒”并执行 | terminates waiting and s ← 0 |
P3 running |
P3执行V操作 | s ← 1 |
s ← 1 |
从之前互斥锁实现中,我们已经学会如何使用“锁”来管理临界区,我们使用信号量和PV操作同样可以完成临界区管理的任务。以下我们给出一段伪代码:
semaphore mutex = 1;
Thread n:
//其他代码
wait(mutex); //相当于lock
//+--------------------+
//| |
//| critical section |
//| |
//+--------------------+
post(mutex); //相当于unlock
//其他代码
结合上面的伪代码,很容易想明白当有多个进程并发地要访问临界区时,只有第一个执行P操作的进程能够进入临界区,并抢占这里的唯一的信号量资源。其他进程由于没有资源可以使用,因此只能自旋或阻塞等待释放临界区资源。
这种实现互斥的信号量初值总是1,它的值总是在0和1之间变化,因此被称作二值信号量。
我们知道,在计算机世界中由于并发进程执行是异步的。如果我们想要多个进程按照某个顺序执行要怎么办呢?我们可以利用PV操作实现这多个进程之间的同步。
假设我们现在有两个进程P1和P2,P2需要在P1完成某些操作后才能继续执行任务,我们可以先定义一个名为sync的信号量,如下:
semaphore sync = 0; //初始化为0,表示P2需要等待
P1:
//其他代码
//执行P2需要等待的操作
post(sync);
//其他代码
P2:
//其他代码
wait(sync); //等待P1相关操作执行完毕
//继续执行
//其他代码
通过上面的伪代码,我们不难看出,即使机器中的代码是异步执行的,但是P2相关操作还是可以同步等待P1相关操作的完成。不论大厨炒菜多么快速,他都需要等待菜全部切好才能开始炒菜,不然就可能导致不好的事情出现。
这个例子中,不论是P1先执行还是P2先执行,没有P1进行V操作,P2就无法执行P操作从而继续执行其他代码。从这个例子中也能明白,我们同步的目标是保证P2在P1完成相关操作后才能继续执行。
我们已经学习过如何用信号量实现同步和互斥,但是我们仍然有一种情况没有考虑到。如果在多进程系统中,并发的进程要同时访问共享资源呢?我们考虑以下几种情况:
我们已经学过第一种情况的管理机制,那么底下的2、3情况呢?我们需要一种机制来防止资源竞争来确保系统的稳定性。
计数信号量的初始值是一个非负整数,用于表示多个相同资源的可用数量。从这里,我们能看出二值信号量实际上是计数信号量的特殊情况,其中计数信号量的值只能是0或1。引入计数信号量扩展了对临界区资源数量大于1的情况的处理能力,使得多个线程可以同时访问多个相同的资源。
例:假设系统中有3台打印机,多个进程都需要使用这些打印机,我们可以使用一个计数信号量来管理这3台打印机的申请和释放。
semaphore printer = 3;
Process n:
//其他代码
wait(printer);
//+-----------------+
//| |
//| using printer |
//| |
//+-----------------+
post(printer);
//其他代码
应用场景 | 初始值 | PV操作 | |
---|---|---|---|
二值信号量 | 互斥、临界区管理 | init_value = 1 | 同一进程 |
同步信号量 | 进程执行顺序管理 | init_value = 0 | 不同进程 |
计数信号量 | 可用资源数量的申请释放管理 | init_value > 0 | 同一进程 |
POSIX标准提供了几个用于信号量(semaphore)的系统调用,这些调用可以用于进程或线程之间的同步。以下是几个主要的POSIX信号量系统调用:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
/*
Parameters:
1. sem: Pointer to the semaphore to initialize.
2. pshared: If non-zero, the semaphore is shared between processes; if zero, it is shared between threads of the same process.
3. value: Initial value of the semaphore.
Return value: Returns 0 on success, -1 on failure and sets errno appropriately.
*/
int sem_destroy(sem_t *sem);
/*
Parameters:
1. sem: Pointer to the semaphore to destroy.
Return value: Returns 0 on success, -1 on failure and sets errno appropriately.
*/
int sem_wait(sem_t *sem);
/*
Parameters:
1. sem: Pointer to the semaphore to decrement (wait).
Return value: Returns 0 on success, -1 on failure and sets errno appropriately.
*/
int sem_trywait(sem_t *sem);
/*
Parameters:
1. sem: Pointer to the semaphore to decrement (try to wait).
Return value: Returns 0 on success, -1 on failure and sets errno appropriately.
*/
int sem_post(sem_t *sem);
/*
Parameters:
1. sem: Pointer to the semaphore to increment (signal).
Return value: Returns 0 on success, -1 on failure and sets errno appropriately.
*/
int sem_getvalue(sem_t *sem, int *sval);
/*
Parameters:
1. sem: Pointer to the semaphore.
2. sval: Pointer to an integer to store the current value of the semaphore.
Return value: Returns 0 on success, -1 on failure and sets errno appropriately.
*/
之前我们学习二值信号量实现的同步中,我们实际上就已经接触了signaling的同步模式,相当于一个进程/线程对另一个进程/线程的等待。我们另外举一个例子,假如我们有信号量sem
,初始化为0。通过信号量的PV操作,我们就能实现两个进程/线程之间的同步。
Thread A
Statement A1;
post(sem);
Thread B
wait(sem);
Statement B1;
上面的例子中,如果B线程开始执行,由于Statement A1并没有完成(信号量没有被post),所以线程B会被阻塞,等待信号量的释放。所以只要线程A调用post(sem)
之后,线程B才会开始执行。
Rendezvous的同步模式是一种双向的同步模式,它相当于是对signaling的拓展。与signaling不同,rendezvous不仅仅是一个进程/线程在等待另一个进程/线程的信号,而是两个进程/线程彼此都在等待对方。因而,在这种同步模式中,我们需要两个信号量sem1
和sem2
并初始化为0。
Thread A
Statement A1;
post(sem1);
wait(sem2);
Statement A2;
Thread B
Statement B1;
post(sem2);
wait(sem1);
Statement B2;
上述的例子中,只有当线程A和线程B都走完Statement A1和Statement B1时,下一阶段的任务才会开始。这种双向同步确保了两个线程在特定的同步点上达成一致,然后才能继续各自的任务。
问题来了,如果线程的数量越来越多,会发生什么?如果线程数量不断增加,会导致同步和管理的复杂性显著增加。为了避免死锁,必须确保PV操作平衡,即每个post
操作应有相应的wait
操作。
在有三个线程(A、B、C)的情况下,可以如下进行rendezvous的同步:
Thread A
Statement A1;
post(sem1);
post(sem1);
wait(sem2);
wait(sem2);
Statement A2;
Thread B
Statement B1;
post(sem2);
post(sem2);
wait(sem1);
wait(sem3);
Statement B2;
Thread C
Statement C1;
post(sem3);
post(sem3);
wait(sem1);
wait(sem2);
Statement C2;
这种方式中,每个线程在执行完自己的第一部分工作后(Statement A1, B1, C1),分别post
一个信号量并wait
两个信号量。每个线程都在等待另外两个线程的信号,从而确保所有线程都在同步点相遇后才能继续执行剩余工作。
虽然这种设计可以防止死锁,但随线程数量增加,同步复杂度和信号量的管理也会增加。为了简化管理和实现,我们可以使用一些更高级的同步原语或模式,我们将介绍的下一个同步模式屏障(barrier)就是为此而生的。
Thread N
wait(mutex)
count++
post(mutex)
if count == n
for i from 1 to n
post(barrier)
end for
end if
wait(barrier)
Turnstile pattern 是一种设计模式,主要用于在并发编程中控制对共享资源的访问。它像个旋转门一样,只允许一个线程在特定时间内通过,从而确保多个线程之间能够有序地进行协调。
条件变量和 turnstile pattern 是相关的。在 turnstile pattern 中,条件变量可以用来管理线程的等待和唤醒。当资源不可用时,线程会被阻塞并放入条件变量中。当资源可用时,通过条件变量发出信号,唤醒等待的线程,使其继续执行。
生产者-消费者问题描述了两个进程,一个是生成数据的生产者进程,一个是负责消费数据的消费者,两者通过共享缓冲区进行通信。由于问题描述的固定大小的缓冲区,也被称有限缓冲问题。核心挑战在于如何有效地管理缓冲区,确保生产者不会在缓冲区满时继续生产,也确保消费者不会在缓冲区空时继续消费。这个问题有很多变体,例如我们可以设置 x 个生产者, y 个消费者之类的。
在生产者-消费者问题中,有许多规则需要双方遵守的。当我们设置缓冲区大小为BUFFER_SIZE
。我们规定:
通过这三条规则,我们可以用以下的伪代码对生产-消费的过程进行模拟。下面是一个忙等待加互斥的例子,我们用互斥锁对临界区进行了保护(缓冲区相关操作)。如果缓冲区满,生产者会一直查看缓冲区是否有空隙;若是缓冲区空,消费者也会一直查看缓冲区是否有可读信息。
Producer
added = false;
while(added = false){
wait(mutex);
if(count < BUFFER_SIZE){
// Add item.
count++;
added = true;
}
post(mutex);
}
Consumer
removed = false;
while(removed = false){
wait(mutex);
if(count > 0){
// Remove item.
count--;
removed = true;
}
post(mutex);
}
我们之前比较过互斥锁和自旋锁的优劣,如果我们要长时间持有锁,忙等待显然会一直白白浪费资源。这种情况下,检查一次缓冲区,如果不满足相应的条件就直接阻塞进程/线程明显是更好的办法。使用锁好像效益来的不再可观,我们需要使用其他的工具。
我们可以使用两个信号量,他们的最大值都是BUFFER_SIZE
。两个信号量的描述如下:
items
信号量:从0开始,表示缓冲区内现有多少可读的数据。spaces
信号量:从BUFFER_SIZE
开始,表示缓冲区还有多大的可用空间。下面的伪代码中,mutex
用于确保对缓冲区的访问是互斥的,从而避免了数据竞争问题。spaces
和items
信号量则用于控制缓冲区的容量和可读数据量。这样可以确保生产者和消费者在操作缓冲区时遵守规则,从而实现同步互斥。
Producer
wait(mutex);
wait(spaces);
// Add item.
post(items);
post(mutex);
Consumer
wait(mutex);
wait(items);
// Remove item.
post(spaces);
post(mutex);
这个示例会有什么问题?我们的确保证了对缓冲区的互斥访问。但是如果缓冲区满时,生产者率先进入临界区,生产者会阻塞在wait(spaces);
。因为缓冲区满了,需要消费者进入缓冲区消耗资源但是由于生产者持有互斥锁,消费者无法进入缓冲区消耗资源。这就会导致死锁(Deadlock) 的发生。如果缓冲区为空,消费者先进入临界区也会导致类似的死锁问题。
我们对伪代码进行一些调整,生产者和消费者在等待spaces
和items
信号量时不再持有互斥锁,从而避免了死锁的发生。从而生产者和消费者可以在缓冲区满或空的情况下正确地等待和释放资源。
Producer
wait(spaces);
wait(mutex);
// Add item.
post(mutex);
post(items);
Consumer
wait(items);
wait(mutex);
// Remove item.
post(mutex);
post(spaces);
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#define BUFFER_SIZE 20
#define PRODUCER_NUMBER 10
#define CONSUMER_NUMBER 10
int* buffer;
int pindex = 0;
int cindex = 0;
sem_t spaces;
sem_t items;
pthread_mutex_t mutex;
unsigned int seed = 252;
int produce(int id){
int r = rand_r(&seed); // rand_r for thread safe.
printf("Producer: %d\tproduced: %d.\n", id, r);
return r;
}
void consume(int id, int value){
printf("Consumer: %d\tconsumed %d.\n", id, value);
}
void* producer(void* arg){
int* id = (int*) arg;
for(int counter = 0; counter < 100; ++counter){
int num = produce(*id);
sem_wait(&spaces);
pthread_mutex_lock(&mutex);
buffer[pindex] = num;
pindex = (pindex + 1) % BUFFER_SIZE;
pthread_mutex_unlock(&mutex);
sem_post(&items);
}
free(arg);
pthread_exit(NULL);
}
void* consumer(void* arg){
int* id = (int*) arg;
for(int counter = 0; counter < 100; ++counter){
sem_wait(&items);
pthread_mutex_lock(&mutex);
int num = buffer[cindex];
buffer[cindex] = -1;
cindex = (cindex + 1) % BUFFER_SIZE;
pthread_mutex_unlock(&mutex);
sem_post(&spaces);
consume(*id, num);
}
free(id);
pthread_exit(NULL);
}
int main(int argc, char** argv){
buffer = malloc(BUFFER_SIZE * sizeof(int));
for(int i = 0; i < BUFFER_SIZE; i++){
buffer[i] = -1;
}
sem_init(&spaces, 0, BUFFER_SIZE);
sem_init(&items, 0, 0);
pthread_mutex_init(&mutex, NULL);
pthread_t producer_thread[PRODUCER_NUMBER];
pthread_t consumer_thread[CONSUMER_NUMBER];
for(int i = 0; i < PRODUCER_NUMBER; i++){
int* id = malloc(sizeof(int));
*id = i;
pthread_create(&producer_thread[i], NULL, producer, id);
}
for(int j = 0; j < CONSUMER_NUMBER; j++){
int* id = malloc(sizeof(int));
*id = j;
pthread_create(&consumer_thread[j], NULL, consumer, id);
}
for(int k = 0; k < PRODUCER_NUMBER; k++){
pthread_join(producer_thread[k], NULL);
}
for(int k = 0; k < CONSUMER_NUMBER; k++){
pthread_join(consumer_thread[k], NULL);
}
free(buffer);
sem_destroy(&spaces);
sem_destroy(&items);
pthread_mutex_destroy(&mutex);
pthread_exit(0);
}
读者写者问题描述了多个读者和写者对共享数据的访问。读者-写者问题与生产者-消费者问题相似而又不同。相似的是两个问题中都有数据的输入(生产者/写者)和输出方(消费者/读者),而且生产者/写者修改共享数据时不能有其他线程访问共享数据。
不同的是,读者-写者问题中的输出方(读者)并不涉及到对数据的修改操作(do not modify),这就意味着多个读者在读取数据的同时不会引起冲突。而且很多现实问题中写入很稀有但是读操作非常常见,允许缓冲区中多个读者读数据实际上可以提升很多性能。
现在我们考虑第一种情况,即一个写者对应着多个读者。假设读和写的操作都在一个房间中进行,即进行读写操作的房间实际上是我们的临界区。因而我们需要一个二元信号量roomEmpty
对临界区进行管理。在读者进入临界区时,我们不想计算读者的数量时出现数据竞争的问题,所以我们使用mutex
让读者一个一个地进入临界区。
当写者要进入临界区中时,我们需要保证临界区中没有读者存在,在solution-1中,我们用以下的伪代码表示写者的行为:
Writer
wait(roomEmpty);
// Write something
post(roomEmpty);
我们提到过,在读者-写者问题中,临界区中可以存在多个读者。因此我们在solution-1的伪代码中给出如下的读者行为:
Reader
wait(mutex);
readers++;
if(readers == 1){
wait(roomEmpty);
}
post(mutex);
// Read data.
wait(mutex);
readers--;
if(readers == 0){
post(roomEmpty);
}
post(mutex);
在solution-1中写者的行为逻辑简单又清晰,等待房间里没有读者了,写者进入房间中进行写操作。而由于可能会有很多读者,读者在进出房间时会挨个登记(wait(mutex)
和 post(mutex)
),如果是第一个进入房间的,就负责标识房间已被占用(wait(roomEmpty)
),最后一个出房间就负责标识房间空闲(post(roomEmpty)
。
读者的这种行为模式叫做light switch pattern(先进入房间的人开灯,最后一个离开房间的人关灯)。在solution-1中,读者的行为会为写者带来很多困扰。比如读者占用房间后,后来的读者可以随意进出,只要保证最后一个离开房间的写者离开时释放房间资源就可以了。但是,你没有办法知道后面究竟有多少读者要读。在solution-1中,写者只能干等,这会导致写者长时间得不到资源的问题,也就是饥饿(Starvation):which means a thread may never gets a chance to run.
写者的饥饿可能对博客这种读写问题的影响不会很大,但是对那些对实时性有要求的系统的影响尤其大(比如数据库的读写问题)。
为了避免写者的饥饿问题,我们需要另辟蹊径,重新找一个方法。我们试想一下solution-2的描述:当写者到达,已经在房间里的读者们继续阅读,但是后来者就不可以进入临界区阅读数据了。在solution-2的假设下,写者只需要等待临界区中读者读完数据走出临界区,不需要担心无休止到来的读者。
Solution-2我们引入了另一个二元信号量turnstile
。相当于对“门”进行控制,无论是读者还是写者,要进入房间(临界区)必须通过“门”。通过占有“门”资源,读者和写者就能顺序地进入临界区。Solution-2中的写者行为如下:
Writer
wait(turnstile);
wait(roomEmpty);
// Write data.
post(roomEmpty);
post(trunstile);
Reader
wait(turnstile);
post(turnstile);
wait(mutex);
readers++;
if(readers == 1){
wait(roomEmpty);
}
post(mutex);
// Read data.
wait(mutex);
readers--;
if(readers == 0){
post(roomEmpty);
}
post(mutex);
虽然我们成功地使写者免受了饥饿的困扰,但是写者仍然不对临界区拥有任何特权。想想看,如果前面仍然有许多的读者在写者前面排队等待进入临界区,写者就仍然需要队列前面的所有读者进入临界区-读取-出临界区后才能对临界区资源进行操作。
Solution-2仍然不够实时,怎么办?我们需要划分优先级,使得写者相对读者总是享有有限进入临界区的特权。可以通过将信号量roomEmpty
划分为两个信号量:noReaders
和 noWriters
来实现。通过让写者持有noReaders
信号量来控制读者进入临界区。
在Solution-3中,我们引入优先级机制,写者可以相对读者享有优先权。第一个写者会等待并占有noReaders
信号量,阻止新的读者进入临界区。这样,写者可以在当前读者完成操作后立即进入临界区,而不需要等待新的读者完成操作。
Writer
wait(writeMutex);
writers++;
if(writers == 1){
wait(noReaders)
}
post(writeMutex);
wait(noWriters);
// Write data.
post(noWriters);
wait(writeMutex);
writers--;
if(writers == 0){
post(noReaders)
}
Reader
wait(noReaders);
wait(readMutex);
readers++;
if(readers == 1){
wait(noWriters);
}
post(readMutex);
post(noReaders);
// Read data.
wait(readMutex);
readers--;
if(readers == 0){
post(noWriters);
}
post(readMutex);
#include <pthread.h>
pthread_rwlock_t rwlock;
#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr);
/*
Parameters:
1. rwlock: Pointer to the read-write lock to initialize.
2. attr: Pointer to a read-write lock attributes object, or NULL for default attributes.
- PTHREAD_PROCESS_SHARED: The lock can be shared between processes.
- PTHREAD_PROCESS_PRIVATE: The lock is private to the process (default).
Return value: Returns 0 on success, otherwise an error number.
*/
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
/*
Parameters:
1. rwlock: Pointer to the read-write lock to destroy. The lock must be uninitialized before calling this function.
Return value: Returns 0 on success, otherwise an error number.
*/
// Blocks if the lock is currently held by a writer.
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
/*
Parameters:
1. rwlock: Pointer to the read-write lock to acquire for read access.
Return value: Returns 0 on success, otherwise an error number.
*/
// Does not block. Returns immediately if the lock cannot be acquired.
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
/*
Parameters:
1. rwlock: Pointer to the read-write lock to attempt acquiring for read access.
Return value: Returns 0 on success, otherwise an error number.
*/
// Blocks if the lock is currently held by a reader or writer.
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
/*
Parameters:
1. rwlock: Pointer to the read-write lock to acquire for write access.
Return value: Returns 0 on success, otherwise an error number.
*/
// Does not block. Returns immediately if the lock cannot be acquired.
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
/*
Parameters:
1. rwlock: Pointer to the read-write lock to attempt acquiring for write access.
Return value: Returns 0 on success, otherwise an error number.
*/
// Releases the lock held by either a reader or a writer.
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
/*
Parameters:
1. rwlock: Pointer to the read-write lock to release.
Return value: Returns 0 on success, otherwise an error number.
*/
条件变量和我们学过的条件语句非常类似,我们用不同的条件语句可以使得在条件满足时跳进特定的分支。条件变量也一样,you will be notified to continue when a certain condition is fulfilled 。只不过,条件变量是我们达成同步的一种方法,它确保了某些线程在条件满足之前阻塞等待。
那这些阻塞的线程怎么才能知道条件已经满足了呢?在之前,我们可能会使用信号量、互斥锁来实现线程同步和协调。那这些和我们本节学习的条件变量相比有何不足呢?当我们使用信号量时,我们只能给特定的某一个线程发送信号。而使用条件变量,我们就可以选择当条件得到满足后,是给某个特定的线程发送信号还是给所有等待事件发生的线程发送信号(broadcast)。
pthread_cond_init()
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
/*
Parameters:
1. cond: Pointer to the condition variable to initialize.
2. attr: Pointer to a condition variable attributes object, or NULL for default attributes.
- PTHREAD_PROCESS_SHARED: condition variable can be shared between processes.
- PTHREAD_PROCESS_PRIVATE: condition variable is only used within a single process (default).
- PTHREAD_CONDATTR_CLOCKID: clock type used by the condition variable (e.g., CLOCK_REALTIME, CLOCK_MONOTONIC).
Return value: Returns 0 on success, otherwise an error number.
*/
pthread_cond_wait()
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
/*
Parameters:
1. cond: Pointer to the condition variable to wait on.
2. mutex: Pointer to the mutex that should be locked by the calling thread.
Return value: Returns 0 on success, otherwise an error number.
*/
pthread_cond_signal()
int pthread_cond_signal(pthread_cond_t *cond);
/*
Parameters:
1. cond: Pointer to the condition variable to signal.
Return value: Returns 0 on success, otherwise an error number.
*/
pthread_cond_broadcast()
int pthread_cond_broadcast(pthread_cond_t *cond);
/*
Parameters:
1. cond: Pointer to the condition variable to broadcast.
Return value: Returns 0 on success, otherwise an error number.
*/
pthread_cond_destroy()
int pthread_cond_destroy(pthread_cond_t *cond);
/*
Parameters:
1. cond: Pointer to the condition variable to destroy.
Return value: Returns 0 on success, otherwise an error number.
*/
condition variables are always used in conjunction with a mutex
原先实现屏障的方法所用的系统调用太多了,我们可以用条件变量实现屏障。
int count;
pthread_mutex_t lock;
pthread_cond_t cv;
void barrier(){
pthread_mutex_lock(&lock);
count++;
if(count < NUM_THREADS){
pthread_cond_wait(&cv, &lock);
} else{
pthread_cond_broadcast(&cv);
}
pthread_mutex_unlock(&lock);
}
条件变量可以用于管程的创建(更高层级的抽象),所以管程是一个更高级的同步工具,有点类似于OOP中的类。在C++中,我们用类来打包数据和相关的操作,管程的目标就是将那些共享的数据和对这些共享数据的操作进行打包封装起来。有了管程,我们就不需要手动地操作那些共享数据了,减少了出错的可能性。
在传统的方法中,我们需要人为地对锁进行管理。下面这里例子中,我们有一个加锁和一个解锁,乍一看是正确的,但是程序中的if
条件判断使得程序实际上存在着两个分支。我们需要给这两个分支的互斥锁都做好善后工作,因此实际上我们需要两个解锁函数。
void foo(){
pthread_mutex_lock(&lock);
/* Read some data. */
if(condition_is_true){
printf("Cannot continue due to reasons.\n");
// Missing something here~
return;
}
/* Do something more. */
pthread_mutex_unlock(&lock);
}
这种人为管理资源的模式太麻烦了,我们需要一种更加智能的管理方式。在C语言中,我们可以使用pthread库中的条件变量和互斥锁来实现管程。而C++的RAII机制使得管程的实现尤为容易。
#include <pthread.h>
#include <stdio.h>
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
int count;
} Monitor;
void init(Monitor *m) {
pthread_mutex_init(&m->mutex, NULL);
pthread_cond_init(&m->cond, NULL);
m->count = 0;
}
void increment(Monitor *m) {
pthread_mutex_lock(&m->mutex);
m->count++;
pthread_cond_signal(&m->cond);
pthread_mutex_unlock(&m->mutex);
}
void wait_for_count(Monitor *m, int target) {
pthread_mutex_lock(&m->mutex);
while (m->count < target) {
pthread_cond_wait(&m->cond, &m->mutex);
}
pthread_mutex_unlock(&m->mutex);
}
void destroy(Monitor *m) {
pthread_mutex_destroy(&m->mutex);
pthread_cond_destroy(&m->cond);
}
int main() {
Monitor m;
init(&m);
pthread_t t1, t2;
pthread_create(&t1, NULL, (void *(*)(void *))increment, &m);
pthread_create(&t2, NULL, (void *(*)(void *))wait_for_count, &m);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
printf("Final count: %d\n", m.count);
destroy(&m);
return 0;
}
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
class Monitor {
private:
std::mutex mtx;
std::condition_variable cond;
int count = 0;
public:
void increment() {
std::unique_lock<std::mutex> lock(mtx);
count++;
cond.notify_one();
}
void wait_for_count(int target) {
std::unique_lock<std::mutex> lock(mtx);
cond.wait(lock, [this, target] { return count >= target; });
}
int get_count() const {
return count;
}
};
int main() {
Monitor monitor;
std::thread t1(&Monitor::increment, &monitor);
std::thread t2(&Monitor::wait_for_count, &monitor, 1);
t1.join();
t2.join();
std::cout << "Final count: " << monitor.get_count() << std::endl;
return 0;
}
在学习原子类型变量之前,要规避并发访问共享数据带来的竞争条件问题,我们可能想当然地使用锁来解决。将共享数据的访问放到一个临界区中,然后对临界区进行加锁和解锁来实现这一过程。但是会带来额外的性能开销(两次系统调用)。
#include <pthread.h>
pthread_mutex_lock(); // system call * 1
shared_var++;
pthread_mutex_unlock(); // system call * 2
上面利用互斥锁实现的对共享资源访问的方法中,我们看到,系统调用的开销要远远大于对共享资源操作所带来的开销。这时只会使用锁的单一解决方法就成为了一种枷锁。那有没有办法降低这种资源损耗?当然,我们没必要绕一个远路(因为锁和原子类型都是封装机器提供的指令得来的)。
我们前面已经学习过像 test-and-set、compare-and-swap 等硬件提供的原子操作指令。没有硬件提供的原子指令,软件再怎么模拟也不可能实现相似的原子性操作。利用机器提供的这些原子操作指令的接口,我们可以在上层封装这些原子操作为己所用。实际上,锁和原子类型都是对这些机器指令的封装和抽象,但不同的是,原子类型直接利用指令提供的原子性操作,避开了系统调用的开销,而锁更加高级,常用于处理复杂的竞争条件问题。对于单一共享资源的处理,我们简单地利用原子类型就可以了。
早期C标准库没有引入原子操作时,GNU标准下的C库(glibc)就通过对下层机器指令的封装提供了原子类型。这些原子类型操作保证了操作的原子性,规避简单的条件竞争。在 C++11 之后,我们可以包含<atomic>
头文件来使用C++中的原子操作。当我们在高级语言中使用这些原子类型时,编译器会将这些数据类型转换成硬件能够提供的原子操作指令。
下面我们展示了C++11中的原子类型。对 std::atomic
类型的操作是原子性的,不可被打断的。
#include <atomic>
std::atomic<int> shared_var(0);
shared_var++; // Operations to shared_var is unbreakable.
下面是GCC(GNU Compiler Collection)提供的一组内建函数,主要用于进行原子操作和实现无锁编程。它们利用硬件提供的原子指令,确保操作的原子性,避免了竞态条件。
type __sync_lock_test_and_set(type *ptr, type value)
bool __sync_bool_compare_and_swap(type *ptr, type oldval, type newval)
type __sync_val_compare_and_swap(type *ptr, type oldval, type newval)
type __sync_lock_test_and_set(type *ptr, type value)
: 该函数将value
写入*ptr
,并返回*ptr
的旧值。这是一个原子的“测试并设置”操作,常用于实现简单的锁。
bool __sync_bool_compare_and_swap(type *ptr, type oldval, type newval)
: 该函数如果*ptr
等于oldval
,则将newval
写入*ptr
,返回true
;否则,不修改*ptr
,返回false
。
type __sync_val_compare_and_swap(type *ptr, type oldval, type newval)
: 该函数如果*ptr
等于oldval
,则将newval
写入*ptr
,并返回*ptr
的旧值。
除此之外,gcc还提供其他的一些内建函数。如下:
// Return the old value:
type __sync_fetch_and_add(type *ptr, type value);
type __sync_fetch_and_sub(type *ptr, type value);
type __sync_fetch_and_or(type *ptr, type value);
type __sync_fetch_and_and(type *ptr, type value);
type __sync_fetch_and_xor(type *ptr, type value);
type __sync_fetch_and_nand(type *ptr, type value);
// Return the new value:
type __sync_add_and_fetch(type *ptr, type value);
type __sync_sub_and_fetch(type *ptr, type value);
type __sync_or_and_fetch(type *ptr, type value);
type __sync_and_and_fetch(type *ptr, type value);
type __sync_xor_and_fetch(type *ptr, type value);
type __sync_nand_and_fetch(type *ptr, type value);
自始至终,我们学到的原子操作都是关于 modifying 的原子操作,但从来没有提到读指令的原子操作。那该如何保证每次读到的数据都是最新的呢?
x86机器并没有 build-in 读原子操作指令。由于机器上的读指令在大多数情况下是原子性的,单独的读操作通常是不可分割的。在x86机器上,读取一个32位整数或一个64位整数(在64位架构上)通常默认就是原子操作。但如果这样,我们仍然可能得到过时的数据,因为如果读的时候刚好写入了新的数据,那么读到的数据可能就不再是最新的了。
我们其实可以用 modifying 的原子操作来做一个读的事情。比如对原先的变量原子加或减0,就能够保证我们每次读到的数值都是最新的。如果不在乎性能开销,使用锁也是可以的。