tags:
- Cpp
Concurrency in C++
我们在操作系统课程中了解到过并发和并行的概念,并发和并行都为我们提供了一种多个线程在同时运行的宏观感受。在多核机器上,不同的线程可以并发执行,也可以并行执行。在我们用户的视角上,线程好像都是并行执行的,但微观上它们可能是交替轮流地执行,也就是并发。
对于这两个概念,我们可以这样理解:并发是操作系统通过时间片轮转提供的,而并行是硬件平台提供的(多核CPU)。对于开发高并发程序,我们的目标可能是降低使用延迟,充分利用多核心的性能。通过封装,我们可以在C++线程库中使程序并发执行。
在早期的 C++ 中,标准库中并不包含相关的线程机制,你要是需要创建线程执行流,你就得调用第三方的库函数(pthread
, Boost
, Thread Building Block
)。关于 POSIX Thread,我们在操作系统的系列课程中已经了解了许多。你可以在 pthread 进一步了解 pthread 库提供的线程 API 。
直到 C++11,也就是 pthread API 初次 release 的 16 年后,标准库才在语言层面封装了自己的线程库。现在,你只需要用 std::thread
来创建一个线程对象。标准库中的 std::thread
简化了线程的创建和使用,屏蔽了操作系统平台带来的差异性,我们还能享受 RAII 带来的资源的自动管理。
在多线程的程序中,每个运行的进程都需要至少包含一个(主线程)的线程执行流。由于同一进程下的不同线程共享进程资源,所以不恰当的多线程程序就很容易出现竞态条件导致。在本文档中,我们也会覆盖C++中的同步互斥的相关内容(std::mutex
和std::atomic<T>
)。
如下,我们用标准库的 std::thread
很轻易的创建了一个线程执行流。std::thread
的构造函数的参数表示你想让线程执行的任务,即任何可调用对象。
#include <iostream>
#include <thread>
void new_thread(){
std::cout << "Creating a thread\n" << std::endl;
}
int main(){
std::cout << "In main thread." << std::endl;
std::thread newThread(&new_thread);
return 0;
}
由于函数很小,我们还可以用 Lambda 表达式来简化代码:
#include <iostream>
#include <thread>
int main(){
std::cout << "In main thread." << std::endl;
std::thread newThread([](){ std::cout << "Creating a thread\n"; });
return 0;
}
运行这段代码,你大概率会得到一个 Abort 。这是因为主线程提前退出而没有等待子线程结束。为什么会得到 abort,你可以参照 joining 。
当线程执行结束,线程变得无事可做,我们描述这种执行结束的线程为 joinable。即调用线程和被调用线程两个执行流合并(join) 为一个执行流。这个时候我们就该调用std::thread.join();
函数来回收线程的资源(调用其析构函数)。值得注意的是, join 操作会阻塞调用线程,直到被调用的线程完成执行。
std::thread new_Thread([](){std::cout << "Creating a thread\n";});
if (new_Thread.joinable()) {
new_Thread.join();
}
join()
in a Thread-Safe Situationjoin()
方法提供的阻塞调用线程在下面这种简单的例子中是线程安全(thread safe) 的。因为这两个线程实际上是同步的,我们并不需要担心有任何数据竞争(data race) 的问题。
int dataVar = 0;
std::thread new_Thread([](){
std::cout << "Creating a thread\n";
dataVar = 15;
});
std::cout << "Doing my work...\n";
if (new_Thread.joinable()) {
new_Thread.join();
}
std::cout << "I have read:" << dataVar << std::endl;
join()
in a Non Thread-Safe Situation当你将这行代码std::cout << "I read:" << dataVar << std::endl;
放到join()
前面就会有这种未定义行为的出现,你会看到输出不确定的dataVar
。
int dataVar = 0;
std::thread new_Thread([](){
std::cout << "Creating a thread\n";
dataVar = 15;
});
std::cout << "Doing my work...\n";
std::cout << "I have read:" << dataVar << std::endl;
if (new_Thread.joinable()) {
new_Thread.join();
}
std::vector
jthread
: Auto Joining Thread (Since C++20)if *this has an associated thread (joinable() == true)
, calls request_stop()
and then join()
.
std::atomic
如果你对数据竞争的概念还不够明晰,下面这个经典的例子能帮助你理解数据竞争及其危害。这段代码逻辑上看起来没问题,但实际操作每次 counter++;
时,都会将 counter
从内存读取到寄存器,对寄存器中的值进行操作,然后将结果写回内存。乍看之下似乎没问题,但麻烦就出在这里。
运行第一个例子时,你会发现 counter
的输出值远远小于 20000000,而不是期望的 20000000。原因可能是:线程 A 将 counter
的值取到寄存器中,线程 B 也将值取到另一个寄存器,两个线程分别操作后各自将结果写回内存。我们期望的是取值到写回之间的操作不间断,但实际上,两次加法操作的结果只写回了一次。
#include <stdio.h>
#include <thread>
#include <atomic>
int counter = 0;
int main(){
std::thread newThread([](){
for (int i = 0; i < 10000000; i++)
{
counter++;
}
});
for(int i = 0; i < 10000000; i++){
counter++;
}
newThread.join();
printf("I counted: %d\n", counting);
return 0;
}
运行后的结果可能是:
du@DVM:~/Desktop/DSA$ for i in {1..10}; do ./stack; done
Counting is 10586215
Counting is 11715366
Counting is 10550633
Counting is 11083560
Counting is 10850192
Counting is 10812720
Counting is 10717796
Counting is 10972930
Counting is 11482931
Counting is 11098842
而通过使用 std::atomic
将 counter
定义为原子变量,为其提供原子性,我们就可以得到我们想要的输出。这里面的逻辑就是当我们进行 counter++;
操作时,原子性使得从读取到寄存器、对寄存器中的值进行加1操作和写回这些操作是不间断的,从而避免了数据竞争。
#include <stdio.h>
#include <thread>
#include <atomic>
std::atomic<int> counter = 0;
int main(){
std::thread newThread([](){
for (int i = 0; i < 10000000; i++)
{
counter++;
}
});
for(int i = 0; i < 10000000; i++){
counter++;
}
newThread.join();
printf("I counted: %d\n", counting.load());
return 0;
}
C++标准库并没有直接提供自旋锁,但我们可以利用原子变量来实现如下的自旋忙等待:
#include <iostream>
#include <thread>
#include <atomic>
int main(){
std::atomic<bool> ready = false;
std::thread chld_thread = std::thread([&](){
while(!ready){
}
printf("HELLO FROM CHILD.\n");
});
printf("CHILD, WAKE-UP.\n");
ready = true;
chld_thread.join();
return 0;
}
在这里,我们需要确保忙等待是有限的等待,不然可能会造成死锁问题。
虽然没有提供自旋锁,但 C++标准库提供了互斥锁这种二元信号量。在使用互斥锁前,我们需要包含头文件 #include <mutex>
。同样是上面的例子,互斥锁的实现方式如下:
#include <iostream>
#include <thread>
#include <mutex>
int main(){
std::mutex mtx;;
std::thread chld_thread = std::thread([&](){
mtx.lock();
printf("HELLO FROM CHILD.\n");
});
printf("CHILD, WAKE-UP.\n");
mtx.unlock();
chld_thread.join();
return 0;
}
与自旋锁不同,阻塞锁并不会浪费CPU资源。如果条件不满足,它会将线程阻塞起来。与我们在操作系统课程中学习的pthread相比,C++的std:mutex
相当于对pthread的封装。
std::shared_mutex
: Read-Write Lockstd::lock_guard
那假如加锁之后程序出现异常退出怎么办?上面的代码是 thread safe 的,但并不是 exception safe 的。假如线程在解锁前异常退出,那么解锁操作将永远无法完成,死锁就会发生。C++ 为我们提供了 RAII ,我们可以将清理和 mutex 解锁操作都放到析构函数中去完成,确保 exception safe。
C++ 为我们提供了 std::lock_guard<T>
,它接受一个 BasicLockable
的类型参数,即它只接受特定的锁类型。为了满足对 BasicLockable
的要求,规定我们必须在类类型中提供 lock()
和 unlock()
的成员函数,不然就会导致编译出错。下面是使用 std::lock_guard<T>
guard 自定义锁的例子:
#include <iostream>
#include <thread>
#include <mutex>
class myLock{
public:
void lock(){
std::cout << "locked" << std::endl;
}
void unlock(){
std::cout << "unlocked" << std::endl;
}
};
int main(){
myLock lk;
std::cout << "In main thread." << std::endl;
std::thread myThread([&lk](){
std::lock_guard<myLock> lock(lk); // calls myLock::lock()
std::cout << "Hello, this is thread.\n";
}); // calls myLock::unlock() automatically
myThread.join();
return 0;
}
一般而言,我们使用 std::lock_guard
作为 std::mutex
的 wrapper。当 std::lock_guard
对象被创建时,立即锁定传递的 std::mutex
。要是 std::lock_guard
对象超出其作用域(例如函数返回或异常抛出)时,它的析构函数会自动解锁 std::mutex
。这就是 std::lock_guard<T>
的作用,确保在异常发生时资源也能得到正确释放。
下面是一个例子:
#include <iostream>
#include <mutex>
#include <vector>
std::mutex mtx;
std::vector<int> data;
void addData(int value) {
std::lock_guard<std::mutex> lk(mtx); // Locks the mutex
data.push_back(value); // If this throws, the mutex is still unlocked
}
int main() {
try {
addData(42);
} catch (const std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}
unique_lock
我们有 unique_ptr
,我们也有unique_lock
。unique_loc
k确保当前锁的持有者是唯一的。相比于lock_guard
,std::unique_lock
更加地灵活。std::unique_lock
同样遵循RAII,你可以在一个作用域内加锁和解锁多次,相比于lock_guard ,你还可以对锁的所有权进行转移。
#include <iostream>
#include <mutex>
#include <vector>
std::mutex mtx;
std::vector<int> data;
void addData(int value) {
std::unique_lock<std::mutex> lk(mtx); // Locks the mutex
data.push_back(value); // If this throws, the mutex is still unlocked
lk.unlock(); // Manually unlock the mutex
// Do some other work
lk.lock(); // Lock the mutex again if needed
}
int main() {
try {
addData(42);
} catch (const std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}
scoped_lock
在C++17引入了std::scoped_lock<Ts...>
作为std::lock_guard<>
的升级版。scoped_lock<>
允许你一次性对多个 mutexes 进行锁定。C++17 还引入一项新特性 CATD(Class Template Argument Deduction),类模板参数推导。CTAD 通过自动推导模板参数类型,使得代码更加简洁和易读。
template <typename T1, typename T2>
struct Pair {
T1 first;
T2 second;
Pair(T1 f, T2 s) : first(f), second(s) {}
};
Pair<int, double> p(42, 3.14);
Pair p(42, 3.14); // CATD to Pair<int, double>
std::counting_semaphore
std::condition_variable
是一种同步原语,用于阻塞一个或多个线程,直到某个条件变为真。它通常与 std::mutex
一起使用,以确保对共享数据的访问是线程安全的。
std::future
and std::async
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
bool videoLoader(size_t video_size){
}
std::latch
and std::barrier