三、线程间共享数据的保护
多个线程同时访问修改共享的数据时,如果不加以控制,可能会造成未知的错误,为了解决这个问题,需要采取特殊的手段保证数据在各个线程间可以被正常使用。
这里介绍使用互斥量保护数据的方法。
3.1 使用互斥保护共享数据
互斥保护共享数据是使用互斥量完成的,该互斥量可以看作一个变量:
- 访问数据前,先锁住数据相关的互斥量;访问数据结束后,再解锁互斥
- C++线程库保证,只要有线程锁住了某个互斥量,若其他线程试图再给它加锁,则必须等待互斥量被释放后,才能加锁成功
1. 互斥量的创建和使用
互斥量的类型是std::mutex
,包含在头文件<mutex>
中,一个std::mutex
类型的变量通常使用以下方法实现对数据的互斥访问:
lock()
:阻塞直至成功锁定该互斥量try_lock()
:不阻塞尝试锁定该互斥量,如果锁定成功返回-1,不能锁定时返回0unlock()
:解锁该互斥量
std::string some_str;
std::mutex some_mutex;
void AddStr(std::string str) {
// 锁住一个互斥量
some_mutex.lock();
some_str += str;
// 锁住的互斥量必须在使用完后解锁
some_mutex.unlock();
}
std::mutex
类型是不支持拷贝的(拷贝构造和拷贝赋值函数被定义为删除的)
C++标准库中定义,如果一个类型包含了
lock()
和unlock()
两个成员函数,将该类型归类为BasicLockable;如果还包含了try_lock()
成员函数,则将该类型归类为*Lockable *。之所以定义这两种大类,是为了更好的满足扩展性。C++并发库中的多种互斥量操作函数都要求接收特定大类的实例作为参数,甚至可以自定义满足这些条件的类型作为参数传入。
2. 利用std::lock_guard
确保互斥量解锁
与std::thread
类型的对象类似,在对std::mutex
解锁时,即使中间发生了异常,也要保证程序一定要执行unlock()
,否则可能解锁失败造成其他线程永远阻塞。
为了避免该问题,C++标准库提供了类模板std::lock_gurad<>
,利用RAII实现互斥量的锁定和解锁,即:
- 创建
std::lock_gurad
对象时,需要传入一个满足BasicLockable条件的互斥量,在构造函数中会锁定传入的互斥量 - 当
std::lock_guard
对象被释放时,会在析构函数中解锁管理的互斥量
根据此使用方法,上面的函数可以改写为:
std::string some_str;
std::mutex some_mutex;
void AddStr(std::string str) {
// guard 的构造函数中会锁定 some_mutex
std::lock_guard<std::mutex> guard(some_mutex);
some_str += str;
// 退出函数时,会自动调用 guard 的析构解锁 some_mutex
}
3. 死锁及解决方法std::lock
如果一个数据使用时,需要锁定两个互斥量才能访问。这时如果有多个线程同时访问该数据,在某些情况下,可能两个线程各自锁定了一个互斥量,导致都在等待对方解锁互斥量,形成死锁。
防范死锁通常的方法是,始终使用相同的顺序对需要的互斥量加锁,但是在特定情况下仍然会导致死锁。
为了解决锁定多个互斥量造成的死锁问题,C++库提供了std::lock()
函数,可以接收多个满足Lockable条件的互斥量作为参数:
- 函数中会依次对每个互斥量执行调用
lock()
,若最后所有互斥量都lock()
成功,继续执行函数后的代码(后续需要手动对所有互斥量执行unlock
) - 如果函数执行中发生了异常,函数会保证所有互斥量都是
unlock()
的状态,并向外抛出异常
class Test {
public:
std::mutex mu_;
std::vector<int> vec_;
};
void SwapTestObj(Test &obj1, Test& obj2) {
// 如果两个对象相同,直接返回。否则lock会对同一个mutex执行两次lock造成错误
if (&obj1 == &obj2) {
return;
}
// 同时获取两个互斥量后,再执行 swap,避免多个线程同时执行该函数造成死锁
std::lock(obj1.mu_, obj2.mu_);
swap(obj1.vec_, obj2.vec_);
// 由于 std::lock 中对所有 mutex 都 lock 成功,所以要手动调用 unlock
obj1.mu_.unlock();
obj2.mu_.unlock();
}
4. 使用std::lock_guard
管理锁定后的互斥量
在以上代码中,手动调用std::mutex
对象的unlock
函数前如果发生异常,可能会造成互斥量不能被正确释放,同样可以使用std::lock_guard
解决该问题。
std::lock_guard
有一个重载的接收两个参数的构造函数:
- 其第二个参数可以传入
std::adopt_lock
实例,标识std::lock_guard
的构造函数中不要调用互斥量的lock
函数(互斥量已经被加锁);但是在std::lock_guard
的析构函数中仍然要调用unlock
以解锁互斥量
据此,以上代码可以优化为:
void SwapTestObj(Test &obj1, Test& obj2) {
// 如果两个对象相同,直接返回。否则lock会对同一个mutex执行两次lock造成错误
if (&obj1 == &obj2) {
return;
}
// 同时获取两个互斥量后,再执行 swap,避免多个线程同时执行该函数造成死锁
std::lock(obj1.mu_, obj2.mu_);
swap(obj1.vec_, obj2.vec_);
// 由于 std::lock 中对所有 mutex 都 lock 成功,所以要手动调用 unlock
// obj1.mu_.unlock();
// obj2.mu_.unlock();
// 使用 std::lock_guard 保证互斥量可以正确解锁
std::lock_guard<std::mutex> lock_g1(obj1.mu_, std::adopt_lock);
std::lock_guard<std::mutex> lock_g2(obj2.mu_, std::adopt_lock);
}
5. RAII风格的std::lock()
—std::scoped_lock
std::lock()
内部对互斥量加锁后,仍需要手动执行unlock
,可能会造成错误。使用std::scoped_lock
可以实现利用RAII管理所有的互斥量:
- 在创建对象时接收多个满足Lockable条件的互斥量,使用与
std::lock
相同的策略对互斥量加锁 - 在销毁对象时,会对所有互斥量解锁
所以,以上代码可以优化为:
void SwapTestObj(Test &obj1, Test& obj2) {
// 如果两个对象相同,直接返回。否则lock会对同一个mutex执行两次lock造成错误
if (&obj1 == &obj2) {
return;
}
// 使用 std::scoped_lock 自动管理所有互斥量
std::scoped_lock<std::mutex, std::mutex> guard(obj1.mu_, obj2.mu_);
swap(obj1.vec_, obj2.vec_);
}
6. 互斥量管理器std::unique_lock
以上的std::lock_guard
和std::scoped_lock
都是创建时直接对互斥量加锁,在对象被释放时解锁互斥量。但是,有些情况下,可能想更加灵活的控制加锁和解锁时机(如:在对象没有释放时就抓紧解锁),这时可以使用std::unique_lock
对象来管理互斥量。
std::unique_lock
具有多个重载的构造函数:
std::unique_lock uni_lock(m)
:使用互斥量m
初始化uni_lock
,并在构造函数中调用m.lock()
对互斥量加锁std::unique_lock uni_lock(m, std::defer_lock)
:使用互斥量m
初始化uni_lock
,构造函数内不要对互斥量加锁(互斥量没有被加锁)std::unique_lock uni_lock(m, std::adopt_lock)
:使用互斥量m
初始化uni_lock
,且在内部标记该互斥量已被加锁
从上面的状态可以看到,std::unique_lock
对象创建后,互斥量可能处于已加锁或未加锁状态,所以std::unique_lock
又提供了以下函数用于对管理的互斥量加锁或解锁(调用对应函数时必须保证m
是包含对应函数的类型,即m
满足Lockable条件时才能调用uni_lock.try_lock()
):
uni_lock.lock()
:调用m.lock()
阻塞锁定所管理的互斥量uni_lock.try_lock()
:调用m.try_unlock()
非阻塞地尝试锁定所管理的互斥量uni_lock.unlock()
:调用m.unlock()
解锁所管理的互斥量
根据这些操作可以发现,如果std::unique_lock
所管理的互斥量满足Lockable条件,则std::unique_lock
对象也满足Lockable条件,那么这些std::unique_lock
也能被用在需要Lockable对象的管理函数。如:
// 如果想用于 std::lock,需要保证都未加锁,所以使用 std::defer_lock 创建 std::unique_lock
std::unique_lock<std::mutex> uni_lock1(mu1, std::defer_lock);
std::unique_lock<std::mutex> uni_lock2(mu2, std::defer_lock);
// uni_lock1 和 uni_lock2 都满足 Lockable 条件,可以用于 std::lock
std::lock(uni_lock1, uni_lock2);
在std::unique_lock
内部有一个标志位成员,用来记录所管理的互斥量是否被加锁了,该类型还提供了以下函数返回管理的互斥量是否被锁定:
uni_lock.owns_lock()
:互斥量已锁定时返回true
;否则返回false
std::unique_lock
对象的析构函数中会根据标志位成员的值,决定是否调用m.unlock()
以调用uni_lock.unlock()
解锁管理的互斥量。所以,当std::unique_lock
对象被释放后,其管理的互斥量一定是未加锁的。
std::unique_lock
类型支持移动,但是不支持拷贝。所以我们可以通过移动std::unique_lock
对象,转移互斥量的归属权。
下面代码针对以上用法给出了示例:
// 用来测试的互斥量
std::mutex mu1, mu2, mu3;
std::unique_lock<std::mutex> test_unique_lock() {
// 使用互斥量初始化 std::unique_lock 对象,会锁定互斥量
std::unique_lock<std::mutex> uni_lock1(mu1);
// 使用 std::unique_lock 管理已锁定的互斥量
mu2.lock();
std::unique_lock<std::mutex> uni_lock2(mu2, std::adopt_lock);
// 使用 std::unique_lock 延迟互斥量的锁定
std::unique_lock<std::mutex> uni_lock3(mu3, std::defer_lock);
if (test_some_long_thing()) {
// 由于测试条件可能耗时较久,如果提前锁定互斥量,可能会造成其他线程阻塞
uni_lock3.lock();
// 执行需要互斥访问的任务
do_some_mutex_thing();
// 互斥操作执行完成后,立即解锁,避免后续操作执行时间过久导致其他线程阻塞较久
uni_lock3.unlock();
}
// 执行一些耗时的操作
do_some_long_thing();
// 如果有需要,可以继续灵活使用互斥量
if (test_some_long_thing()) {
uni_lock3.lock();
do_some_mutex_thing();
uni_lock3.unlock();
}
// 返回一个右值 std::unique_lock 以转移互斥量的归属权
return uni_lock3;
}
3.2 多线程中保证数据只初始化一次
考虑如下代码,在多线程并发执行函数时,会导致s_ptr
被初始化多次,造成错误的结果。
std::shared_ptr<SomeClass> s_ptr;
void ConcurrencyUnsafe() {
if (!s_ptr) {
// 如果两个线程同时执行到这个地方,就会导致两次创建对象
s_ptr.reset(new SomeClass());
}
s_ptr->DoSomeThing();
}
为了解决这个问题,C++引入了两个成员专门处理这种情况
std::once_flag
类:用来同步各个线程执行的过程std::call_once(flag, callable, arg)
函数:配合std::once_flag
的对象flag
,实现即使在多个线程中同时调用时,也只会执行一次所指定的函数callable(arg)
注意:函数callable
会在调用它的call_once
所在的线程执行,所以不会像std::thread
的线程函数执行时对参数进行拷贝或移动。因此,可以直接向callable
传递引用,而不需要使用std::ref
此外,从C++11开始规定,对于静态数据的初始化只会在某一线程上单独发生。所以在多线程下,可以保证静态数据没有初始化完成时,其他线程不会越过静态数据的声明去执行后续的代码。因此,在仅仅需要初始化一个数据时,使用静态数据会比使用call_once
开销更小。
class SomeClass{
public:
void DoSomeMutexThing(int &arg_val) {
arg_val = 3;
std::cout << "Set val to 3" << std::endl;
}
/**@brief 多线程并发时也能保证只执行一次的方法 */
void ConcurrencySafe() {
/**
* @brief
* 1. 即使有多个线程同时执行call_once,通过和 flag 配合,可以保证只执行一次 DoSomeMutexThing
* 2. 与 std::thread 向线程函数传参不同,call_once 指定的函数会与调用它
* 的 call_once 在同一个线程执行,所以引用实参可以直接传递
*/
std::call_once(flag, &SomeClass::DoSomeMutexThing, this, val_);
std::cout << "val = " << val_ << std::endl;
}
private:
// 用来保证多多个线程调用 call_once 时,只执行一次其参数指定的可调用对象
std::once_flag flag;
int val_ = 0;
};
int main(int argc, char const *argv[])
{
SomeClass test;
std::thread t1(&SomeClass::ConcurrencySafe, &test);
std::thread t2(&SomeClass::ConcurrencySafe, &test);
std::thread t3(&SomeClass::ConcurrencySafe, &test);
t1.join();
t2.join();
t3.join();
return 0;
}
/** 从输出可以看出,多个线程通过 call_once 只执行了一次 DoSomeMutexThing
Set val to 3
val = 3
val = 3
val = 3
*/
单例模式是一种只需要对数据初始化一次的典型应用场景,分别使用静态数据和call_once
可以实现如下:
class SingletonStatic {
public:
// 使用静态的局部变量实现单例
static SingletonStatic& GetInstance() {
static SingletonStatic inst;
return inst;
}
void print() {
std::cout << "SingletonStatic print" << std::endl;
}
// 删除拷贝构造和拷贝赋值
SingletonStatic(const SingletonStatic&) = delete;
SingletonStatic& operator=(const SingletonStatic&) = delete;
private:
SingletonStatic() = default;
};
class SingletonCallOnce {
public:
static std::shared_ptr<SingletonCallOnce>& GetInstance() {
// 使用 call_once 保证数据只初始化一次
std::call_once(init_flag, [&]() {
callonce_ptr_.reset(new SingletonCallOnce());
});
return callonce_ptr_;
}
void print() {
std::cout << "SingletonCallOnce print" << std::endl;
}
// 删除拷贝构造和拷贝赋值
SingletonCallOnce(const SingletonCallOnce&) = delete;
SingletonCallOnce& operator=(const SingletonCallOnce&) = delete;
private:
SingletonCallOnce() = default;
static std::once_flag init_flag;
static std::shared_ptr<SingletonCallOnce> callonce_ptr_;
};
std::once_flag SingletonCallOnce::init_flag;
std::shared_ptr<SingletonCallOnce> SingletonCallOnce::callonce_ptr_;
3.3 共享锁和排他锁
在某些情景下,可能需要提供多个线程使用同一个互斥量锁定相同的数据,且还需要给这些线程提供独立锁定这些数据的互斥量。
常见的一种情况就是读写操作,多个线程可以共享的读取数据;但是如果某线程要对数据进行修改,就要保证数据只被当前线程独立使用,不会有其他线程在读取或修改该数据。
为了满足这种情况,#include <shared_mutex>
头文件提供了一个shared_mutex
类型的互斥量,该互斥量对象有两种访问级别:
exclusive
:只有一个线程可以加锁成功shared
:多个线程可以同时加锁成功
为了支持以上两种访问级别,类型的对象s_mutex
分别定义了两类加锁和解锁的方法:
- 用于
exclusive
级别的锁。如果一个线程获取了该互斥量的exclusive
锁,其他线程不能获取shared
锁和exclusive
锁。s_mutex.lock()
:阻塞等待获取exclusive
锁。当有线程获取了exclusive
锁或shared
锁时会阻塞s_mutex.try_lock()
:非阻塞尝试获取exclusive
锁。如果其他线程获取了exclusive
锁或shared
锁时会返回false
,否则返回true
s_mutex.unlock()
:解锁exclusive
锁
- 用于
shared
级别的锁。如果一个线程获取了该互斥量的shared
锁,其他线程只能获取shared
锁,但是不能获取exclusive
锁。s_mutex.lock_shared()
:阻塞等待获取shared
锁。当有线程获取了exclusive
锁时会阻塞s_mutex.try_lock_shared()
:非阻塞尝试获取exclusive
锁。如果其他线程获取了exclusive
锁时会返回false
,否则返回true
s_mutex.unlock_shared()
:解锁shared
锁
class ThreadSafeCounter {
public:
ThreadSafeCounter() = default;
int value() {
// 获取 shared 锁
s_mutex_.lock_shared();
// 获取数据
int tmp = value_;
// 解锁 shared 锁
s_mutex_.unlock_shared();
return tmp;
}
void Increment() {
// 获取 exclusive 锁
s_mutex_.lock();
// 修改数据
++value_;
// 解锁 exclusive 锁
s_mutex_.unlock();
}
private:
std::shared_mutex s_mutex_;
int value_{};
};
从上面的例子可以看出,当完成对数据的读或写后,需要及时对shared
和exclusive
锁进行解锁。为了避免和std::mutex
类似的因为异常导致没有正确解锁的问题,也可以使用RAII风格的互斥量管理器保证shared_mutex
对象可以被正确解锁。
从std::shared_mutex
的定义可以看出,其满足Lockable条件所以可以直接使用std::unique_lock
或std::lock_guard
实现exclusive
锁的自动加锁和解锁。
为了实现shared
锁的自动加锁和解锁,C++提供了完全类似std::unique_lock
的shared
锁工具类std::shared_lock
。
std::shared_lock
的构造函数包括:
std::shared_lock shar_lock(s_m)
:使用std::shared_mutex
互斥量对象s_m
初始化shar_lock
,并在构造函数中调用s_m.lock_shared()
对互斥量加锁std::shared_lock shar_lock(s_m, std::defer_lock)
:使用std::shared_mutex
互斥量对象s_m
初始化shar_lock
,构造函数内不要对互斥量加锁(互斥量没有调用s_m.lock_shared()
)std::shared_lock shar_lock(s_m, std::adopt_lock)
:使用std::shared_mutex
互斥量对象s_m
初始化shar_lock
,且在内部标记该互斥量已锁定shared
锁(互斥量已调用s_m.lock_shared()
)
std::shared_lock
和std::unique_lock
一样包含可以手动锁定和解锁管理的std::shared_mutex
对象:
shar_lock.lock()
:调用s_m.lock_shared()
阻塞等待获取shared
锁shar_lock.try_lock()
:调用s_m.try_lock_shared()
非阻塞尝试获取shared
锁shar_lock.unlock()
:调用s_m.unlock_shared()
解锁shared
锁
因此,以上示例可以修改为:
class ThreadSafeCounter {
public:
ThreadSafeCounter() = default;
int value() {
// 使用管理器简化加锁和解锁
std::shared_lock<std::shared_mutex> shar_lock(s_mutex_);
return value_;
}
void Increment() {
// 使用管理器简化加锁和解锁
std::unique_lock<std::shared_mutex> uni_lock(s_mutex_);
++value_;
}
private:
std::shared_mutex s_mutex_;
int value_{};
};
3.4 递归加锁
对于std::mutex
,同一个线程中,如果已经对其加锁,如果再次调用lock
函数,会导致未定义的行为。在部分情况下,可能需要在一个线程中对互斥量重复加锁,为了满足这种情况,C++提供了std::recursive_mutex
。
std::recursive_mutex
与std::mutex
同样都包含lock
、try_lock
和unlock
成员函数以分别实现对互斥量的阻塞加锁、非阻塞加锁和解锁。
比较特殊的是:
- 可以在同一线程内对
std::recursive_mutex
多次执行lock
。当执行相同次数的unlock
后,互斥量被完全解锁。 - 如果有其他线程中调用了
std::recursive_mutex
的lock
,则会阻塞等待正在使用的线程完全解锁std::recursive_mutex
后,才能加锁成功。
根据std::recursive_mutex
的定义,我们同样可以使用std::lock_guard
和std::unique_lock
来简化使用。
class RecursiveClass {
public:
void func1() {
std::lock_guard<std::recursive_mutex> guard(rc_m_);
++value_;
std::cout << "func1 value: " << value_ << std::endl;
// 调用 func2
func2();
}
void func2() {
std::lock_guard<std::recursive_mutex> guard(rc_m_);
++value_;
std::cout << "func2 value: " << value_ << std::endl;
}
private:
// 递归互斥量
std::recursive_mutex rc_m_;
int value_{};
};
通常情况下,如果程序中使用了递归锁,可能是因为程序的逻辑不合理导致的。比如上面的例子,完全可以将公共的++value_
操作独立成一个函数,并使用std::mutex
实现互斥访问。