线程同步 进行多线程编程,如果多个线程需要对同一块内存进行操作,比如:同时读、同时写、同时读写。 对于后两种情况来说,如果不做任何的人为干涉就会出现各种各样的错误数据。这是因为线程在运行的时候需要先得到 CPU 时间片,时间片用完之后需要放弃已获得的 CPU 资源,线程频繁地在就绪态和运行态之间切换,更复杂一点还可以在就绪态、运行态、挂起态之间切换,这样就会导致线程的执行顺序并不是有序的,而是随机的、混乱的。
互斥锁 解决多线程数据混乱的方案就是进行线程同步,最常用的就是互斥锁,C++11 中一共提供了四种互斥锁:std::mutex
:独占的互斥锁,不能递归使用std::timed_mutex
:带超时的独占互斥锁,不能递归使用std::recursive_mutex
:递归互斥锁,不带超时功能std::recursive_timed_mutex
:递归互斥锁,带超时 互斥锁也被称为互斥量,二者是一个东西。
std::mutex 不论是在 C 还是 C++ 中,进行线程同步的处理流程基本上是一致的,C++ 的 mutex 类提供了相关的 API 函数:
成员函数 lock()
函数用于给临界区加锁,并且只能有一个线程获得锁的所有权,它有阻塞线程的作用,函数原型如下:
独占互斥锁对象有两种状态:锁定和未锁定。 如果互斥锁是打开的,调用 lock()
函数的线程会得到互斥锁的所有权,并将其上锁,其它线程再调用该函数的时候由于得不到互斥锁的所有权,就会被 lock()
函数阻塞。当拥有互斥锁所有权的线程将互斥锁解锁,此时被 lock()
阻塞的线程解除阻塞,抢到互斥锁所有权的线程加锁并继续运行,没抢到互斥锁所有权的线程继续阻塞。
除了使用 lock()
还可以使用 try_lock()
获取互斥锁的所有权并对互斥锁加锁,函数原型如下:
二者的区别在于 try_lock()
不会阻塞线程,lock()
会阻塞线程: (1)如果互斥锁是未锁定状态,得到了互斥锁所有权并加锁成功,函数返回 true; (2)如果互斥锁是锁定状态,无法得到互斥锁所有权加锁失败,函数返回 false。
当互斥锁被锁定之后可以通过 unlock()
进行解锁,但是注意只有拥有互斥锁所有权的线程(对互斥锁上锁的线程才能将其解锁),其它线程是没有权限做这件事情的。函数原型如下:
通过介绍以上三个函数,使用互斥锁进行线程同步的大致思路差不多就能搞清楚了,主要分为以下几步: (1)找到多个线程操作的共享资源(全局变量、堆内存、类成员变量等),也可以称为临界资源 (2)找到和共享资源有关的上下文代码,也就是临界区 (3)在临界区的上面调用互斥锁类的 lock() 方法 (4)在临界区的下面调用互斥锁的 unlock() 方法
线程同步的目的是让多线程按照顺序依次执行临界区代码,这样做线程对共享资源的访问就从并行访问变为了线性访问,访问效率降低了,但是保证了数据的正确性。当线程对互斥锁对象加锁,并且执行完临界区代码之后,一定要使用这个线程对互斥锁解锁,否则最终会造成线程的死锁。死锁之后当前应用程序中的所有线程都会被阻塞,并且阻塞无法解除,应用程序也无法继续运行。
线程同步 举例:两个线程共同操作同一个全局变量,二者交替计数,将数值存储到这个全局变量并打印。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <iostream> #include <chrono> #include <thread> #include <mutex> using namespace std;int g_num = 0 ; mutex g_num_mutex;void slow_inc (int id) { for (int i = 0 ; i < 10 ; ++i) { g_num_mutex.lock (); ++g_num; cout << "thread: " << id << " => " << g_num << endl; g_num_mutex.unlock (); this_thread::sleep_for (chrono::seconds (1 )); } }int main () { thread t1 (slow_inc, 0 ) ; thread t2 (slow_inc, 1 ) ; t1.join (); t2.join (); }
上面的示例代码中,两个子线程执行的任务是一样的(其实也可以不一样,不同的任务中也可以对共享资源进行读写操作),在任务函数中把与全局变量相关的代码加了锁,两个线程只能顺序访问这部分代码(如果不进行线程同步,打印出的数据是混乱且无序的)。 另外,注意:在所有线程的任务函数执行完毕之前,互斥锁对象是不能被析构的,一定要在程序中保证这个对象的可用性。互斥锁的个数和共享资源的个数相等,也就是说每一个共享资源都应该对应一个互斥锁对象。互斥锁对象的个数和线程的个数没有关系。
std::lock_guard lock_guard
是 C++11 新增的一个模板类,使用这个类可以简化互斥锁 lock()
和 unlock()
的写法,同时也更安全。 这个模板类的定义和常用的构造函数原型如下:
1 2 3 4 5 template <class Mutex >class lock_guard ;explicit lock_guard (mutex_type& m) ;
lock_guard
在使用上面提供的这个构造函数构造对象时,会自动锁定互斥量,而在退出作用域后进行析构时就会自动解锁,从而保证了互斥量的正确操作,避免忘记 unlock()
操作而导致线程死锁。lock_guard
使用了 RAII
技术,在类构造函数中分配资源,在析构函数中释放资源,保证资源出了作用域就释放。
使用 lock_guard
对上面的示例进行修改,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <iostream> #include <chrono> #include <thread> #include <mutex> using namespace std;int g_num = 0 ; mutex g_num_mutex;void slow_inc (int id) { for (int i = 0 ; i < 6 ; ++i) { lock_guard<mutex> lock (g_num_mutex) ; ++g_num; cout << "thread: " << id << " => " << g_num << endl; this_thread::sleep_for (chrono::seconds (1 )); } }int main () { thread t1 (slow_inc, 0 ) ; thread t2 (slow_inc, 1 ) ; t1.join (); t2.join (); }
代码被精简,而且不用担心因为忘记解锁而造成程序的死锁,但是这种方式也有弊端。在上面的示例代码中整个 for 循环的循环体都被当做了临界区,多个线程线性地执行临界区代码,临界区越大程序效率越低,还是需要根据实际情况选择最优的解决方案。
std::recursive_mutex 递归互斥锁 std::recursive_mutex
允许同一线程多次获得互斥锁,用来解决同一线程需要多次获取互斥量时死锁的问题,在下面的例子中使用独占非递归互斥量会发生死锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 #include <iostream> #include <thread> #include <mutex> using namespace std;struct Calculate { Calculate () : m_i (6 ) {} void mul (int x) { lock_guard<mutex> locker (m_mutex) ; m_i *= x; } void div (int x) { lock_guard<mutex> locker (m_mutex) ; m_i /= x; } void both (int x, int y) { lock_guard<mutex> locker (m_mutex) ; mul (x); div (y); } int m_i; mutex m_mutex; };int main () { Calculate cal; cal.both (6 , 3 ); return 0 ; }
上面的程序中执行了 cal.both(6, 3)
调用之后,程序就会发生死锁。在 both()
中已经对互斥锁加锁,继续调用 mul()
函数,已经得到互斥锁所有权的线程再次获取这个互斥锁的所有权会造成死锁(C++ 中程序会异常退出,C 库函数会导致这个互斥锁永远无法被解锁,最终阻塞所有的线程)。
解决这个死锁,一个简单的办法就是使用递归互斥锁 std::recursive_mutex
,它允许一个线程多次获得互斥锁的所有权。修改后的示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 #include <iostream> #include <thread> #include <mutex> using namespace std;struct Calculate { Calculate () : m_i (6 ) {} void mul (int x) { lock_guard<recursive_mutex> locker (m_mutex) ; m_i *= x; } void div (int x) { lock_guard<recursive_mutex> locker (m_mutex) ; m_i /= x; } void both (int x, int y) { lock_guard<recursive_mutex> locker (m_mutex) ; mul (x); div (y); } int m_i; recursive_mutex m_mutex; };int main () { Calculate cal; cal.both (6 , 3 ); cout << "cal.m_i: " << cal.m_i << endl; return 0 ; }
虽然递归互斥锁可以解决同一个互斥锁频繁获取互斥锁资源的问题,但是还是建议少用,主要原因如下: (1)使用递归互斥锁的场景往往都是可以简化的,使用递归互斥锁很容易放纵复杂逻辑的产生,从而导致 bug 的产生。 (2)递归互斥锁比非递归互斥锁效率要低一些。 (3)递归互斥锁虽然允许同一个线程多次获得同一个互斥锁的所有权,但最大次数并未具体说明,一旦超过一定的次数,就会抛出 std::system
错误。
std::timed_mutex std::timed_mutex
是超时独占互斥锁,在获取互斥锁资源时增加了超时等待功能。因为不知道获取锁资源需要等待多长时间,为了保证不一直等待下去,设置了一个超时时长,超时后线程就可以解除阻塞去做其他事情。
std::timed_mutex
比 std::_mutex
多了两个成员函数:try_lock_for()
和 try_lock_until()
1 2 3 4 5 6 7 8 9 10 void lock () ;bool try_lock () ;void unlock () ;template <class Rep , class Period >bool try_lock_for (const chrono::duration<Rep,Period>& rel_time) ;template <class Clock , class Duration >bool try_lock_until (const chrono::time_point<Clock,Duration>& abs_time) ;
try_lock_for
函数:当线程获取不到互斥锁资源的时候,让线程阻塞一定的时间长度。try_lock_until
函数:当线程获取不到互斥锁资源的时候,让线程阻塞到某一个指定的时间点。 关于两个函数的返回值: (1)如果得到互斥锁的所有权之后,函数会马上解除阻塞,返回 true。 (2)如果阻塞的时长用完或者到达指定的时间点之后,函数也会解除阻塞,返回 false。
std::timed_mutex
的使用,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 #include <iostream> #include <thread> #include <mutex> using namespace std; timed_mutex g_mutex;void work () { chrono::seconds timeout (1 ) ; while (true ) { if (g_mutex.try_lock_for (timeout)) { cout << "thread id: " << this_thread::get_id () << ", get ownership" << endl; this_thread::sleep_for (chrono::seconds (10 )); g_mutex.unlock (); break ; } else { cout << "thread id: " << this_thread::get_id () << ", no get ownership" << endl; this_thread::sleep_for (chrono::milliseconds (50 )); } } }int main () { thread t1 (work) ; thread t2 (work) ; t1.join (); t2.join (); return 0 ; }
上面的示例代码中,通过一个 while 循环不停的去获取超时互斥锁的所有权,如果得不到就阻塞 1 秒钟,1 秒之后如果还是得不到阻塞 50 毫秒,然后再次继续尝试,直到获得互斥锁的所有权,跳出循环体。
关于递归超时互斥锁 std::recursive_timed_mutex
的使用方式和超时互斥锁 std::timed_mutex
是一样的,它可以允许一个线程多次获得互斥锁所有权,而 timed_mutex
只允许线程获取一次互斥锁所有权。另外,递归超时互斥锁 std::recursive_timed_mutex
也拥有超时互斥锁 std::recursive_mutex
的弊端,不建议频繁使用。
条件变量 条件变量是 C++11 提供的另外一种用于等待的同步机制,它能阻塞一个或多个线程,直到收到另外一个线程发出的通知或者超时,才会唤醒当前阻塞的线程。条件变量需要和互斥量配合起来使用,C++11 提供了两种条件变量: (1)condition_variable 需要配合 std::unique_lock<std::mutex>
进行 wait 操作,也就是阻塞线程的操作。 (2)condition_variable_any 和任意带有 lock()
、unlock()
语义的 mutex 搭配使用,有四种:std::mutex
:独占的非递归互斥锁std::timed_mutex
:带超时的独占非递归互斥锁std::recursive_mutex
:不带超时功能的递归互斥锁std::recursive_timed_mutex
:带超时的递归互斥锁
条件变量通常用于生产者和消费者模型,使用过程如下: (1)拥有条件变量的线程获取互斥量 (2)循环检查某个条件,如果条件不满足阻塞当前线程,否则线程继续向下执行 如果产品的数量达到上限,生产者阻塞;否则,生产者一直生产。 如果产品的数量为零,消费者阻塞;否则,消费者一直消费。 (3)条件满足之后,可以调用 notify_one()
或者 notify_all()
唤醒一个或者所有被阻塞的线程 消费者唤醒被阻塞的生产者,生产者解除阻塞继续生产。 生产者唤醒被阻塞的消费者,消费者解除阻塞继续消费。
condition_variable 成员函数 condition_variable
的成员函数主要分为两部分:线程等待(阻塞)函数和线程通知(唤醒)函数,这些函数被定义于头文件 <condition_variable>
。 (1)等待函数 调用 wait()
函数的线程会被阻塞
1 2 3 4 5 void wait (unique_lock<mutex>& lck) ;template <class Predicate>void wait (unique_lock<mutex>& lck, Predicate pred) ;
函数1:调用该函数的线程直接被阻塞 函数2:该函数的第二个参数是一个判断条件,是一个返回值为布尔类型的函数。该参数可以传递一个有名函数的地址,也可以直接指定一个匿名函数。表达式返回 false 当前线程被阻塞;表达式返回 true 当前线程不会被阻塞,继续向下执行。
独占的互斥锁对象不能直接传递给 wait()
函数,需要通过模板类 unique_lock
进行二次处理,通过得到的对象仍然可以对独占的互斥锁对象做如下操作:lock()
:锁定关联的互斥锁try_lock()
:尝试锁定关联的互斥锁。若无法锁定,函数直接返回try_lock_for()
:试图锁定关联的可定时锁定互斥锁。若互斥锁在给定时长中仍不能被锁定,函数返回try_lock_until()
:试图锁定关联的可定时锁定互斥锁。若互斥锁在给定的时间点后仍不能被锁定,函数返回unlock()
:将互斥锁解锁
如果线程被该函数阻塞,这个线程会释放占有的互斥锁的所有权,当阻塞解除之后这个线程会重新得到互斥锁的所有权,继续向下执行(这个过程是在函数内部完成的,了解这个过程即可,其目的是避免线程的死锁)。wait_for()
函数和 wait()
的功能是一样的,只不过多了一个阻塞时长。假设阻塞的线程没有被其他线程唤醒,当阻塞时长用完之后,线程就会自动解除阻塞,继续向下执行。
1 2 3 4 5 6 7 template <class Rep , class Period >cv_status wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time) ; template <class Rep , class Period , class Predicate >bool wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time, Predicate pred) ;
wait_until()
函数和 wait_for()
的功能是一样的,它是指定让线程阻塞到某一个时间点。假设阻塞的线程没有被其他线程唤醒,当到达指定的时间点之后,线程就会自动解除阻塞,继续向下执行。
1 2 3 4 5 6 7 template <class Clock , class Duration >cv_status wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time) ;template <class Clock , class Duration , class Predicate >bool wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time, Predicate pred) ;
(2)通知函数
1 2 void notify_one () noexcept ;void notify_all () noexcept ;
notify_one()
:唤醒一个被当前条件变量阻塞的线程notify_all()
:唤醒全部被当前条件变量阻塞的线程
生产者和消费者模型 使用条件变量来实现一个同步队列,这个队列作为生产者线程和消费者线程的共享资源,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <list> #include <functional> using namespace std;class SyncQueue {public : SyncQueue (int maxSize) : m_maxSize (maxSize) {} void put (const int & x) { unique_lock<mutex> locker (m_mutex) ; while (m_queue.size () == m_maxSize) { cout << "task queue is full, wait..." << endl; m_notFull.wait (locker); } m_queue.push_back (x); cout << x << " is produce" << endl; m_notEmpty.notify_one (); } int take () { unique_lock<mutex> locker (m_mutex) ; while (m_queue.empty ()) { cout << "task queue is empty, wait..." << endl; m_notEmpty.wait (locker); } int x = m_queue.front (); m_queue.pop_front (); m_notFull.notify_one (); cout << x << " is consume" << endl; return x; } bool empty () { lock_guard<mutex> locker (m_mutex) ; return m_queue.empty (); } bool full () { lock_guard<mutex> locker (m_mutex) ; return m_queue.size () == m_maxSize; } int size () { lock_guard<mutex> locker (m_mutex) ; return m_queue.size (); }private : list<int > m_queue; int m_maxSize; mutex m_mutex; condition_variable m_notEmpty; condition_variable m_notFull; };int main () { SyncQueue taskQ (50 ) ; auto produce = bind (&SyncQueue::put, &taskQ, placeholders::_1); auto consume = bind (&SyncQueue::take, &taskQ); thread t1[3 ]; thread t2[3 ]; for (int i = 0 ; i < 3 ; ++i) { t1[i] = thread (produce, i + 100 ); t2[i] = thread (consume); } for (int i = 0 ; i < 3 ; i ++) { t1[i].join (); t2[i].join (); } return 0 ; }
条件变量 condition_variable 类的 wait()
还有一个重载的方法,可以接受一个条件,这个条件也可以是一个返回值为布尔类型的函数,条件变量会先检查判断这个条件是否满足。 (1)如果满足条件,则当前线程重新获得互斥锁的所有权,结束阻塞,继续向下执行。 (2)如果不满足条件,则当前线程会释放互斥锁(解锁)同时被阻塞,等待被唤醒。
上面的示例代码中 put()
、take()
修改如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void put (const int & x) { unique_lock<mutex> locker (m_mutex) ; m_notFull.wait (locker, [this ]() { return m_queue.size () != m_maxSize; }); m_queue.push_back (x); cout << x << " is produce" << endl; m_notEmpty.notify_one (); }int take () { unique_lock<mutex> locker (m_mutex) ; m_notEmpty.wait (locker, [this ]() { return !m_queue.empty (); }); int x = m_queue.front (); m_queue.pop_front (); m_notFull.notify_one (); cout << x << " is consume" << endl; return x; }
修改之后程序变得更精简,而且执行效率更高,因为在这两个函数中的 while 循环被删掉了,但是最终的效果是一样的,推荐使用这种方式的 wait()
进行线程的阻塞。
condition_variable_any 成员函数 condition_variable_any
的成员函数也是分为两部分:线程等待(阻塞)函数 和线程通知(唤醒)函数,这些函数被定义于头文件 <condition_variable>
。 (1)等待函数
1 2 3 4 5 6 template <class Lock > void wait (Lock& lck) ;template <class Lock , class Predicate >void wait (Lock& lck, Predicate pred) ;
wait_for()
函数和 wait()
的功能是一样的,只不过多了一个阻塞时长。假设阻塞的线程没有被其他线程唤醒,当阻塞时长用完之后,线程就会自动解除阻塞,继续向下执行。
1 2 3 4 5 template <class Lock , class Rep , class Period >cv_status wait_for (Lock& lck, const chrono::duration<Rep,Period>& rel_time) ; template <class Lock , class Rep , class Period , class Predicate >bool wait_for (Lock& lck, const chrono::duration<Rep,Period>& rel_time, Predicate pred) ;
wait_until()
函数和 wait_for()
的功能是一样的,它是指定让线程阻塞到某一个时间点,假设阻塞的线程没有被其他线程唤醒,当到达指定的时间点之后,线程就会自动解除阻塞,继续向下执行。
1 2 3 4 5 template <class Lock , class Clock , class Duration >cv_status wait_until (Lock& lck, const chrono::time_point<Clock,Duration>& abs_time) ;template <class Lock , class Clock , class Duration , class Predicate >bool wait_until (Lock& lck, const chrono::time_point<Clock,Duration>& abs_time, Predicate pred) ;
(2)通知函数
1 2 void notify_one () noexcept ;void notify_all () noexcept ;
notify_one()
:唤醒一个被当前条件变量阻塞的线程notify_all()
:唤醒全部被当前条件变量阻塞的线程
生产者和消费者模型 使用条件变量 condition_variable_any
同样可以实现上面的生产者和消费者的例子,代码只有个别细节不同:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 #include <iostream> #include <thread> #include <mutex> #include <list> #include <functional> #include <condition_variable> using namespace std;class SyncQueue {public : SyncQueue (int maxSize) : m_maxSize (maxSize) {} void put (const int & x) { lock_guard<mutex> locker (m_mutex) ; m_notFull.wait (m_mutex, [this ]() { return m_queue.size () != m_maxSize; }); m_queue.push_back (x); cout << x << " is produce" << endl; m_notEmpty.notify_one (); } int take () { lock_guard<mutex> locker (m_mutex) ; m_notEmpty.wait (m_mutex, [this ]() { return !m_queue.empty (); }); int x = m_queue.front (); m_queue.pop_front (); m_notFull.notify_one (); cout << x << " is consume" << endl; return x; } bool empty () { lock_guard<mutex> locker (m_mutex) ; return m_queue.empty (); } bool full () { lock_guard<mutex> locker (m_mutex) ; return m_queue.size () == m_maxSize; } int size () { lock_guard<mutex> locker (m_mutex) ; return m_queue.size (); }private : list<int > m_queue; int m_maxSize; mutex m_mutex; condition_variable_any m_notEmpty; condition_variable_any m_notFull; };int main () { SyncQueue taskQ (50 ) ; auto produce = bind (&SyncQueue::put, &taskQ, placeholders::_1); auto consume = bind (&SyncQueue::take, &taskQ); thread t1[3 ]; thread t2[3 ]; for (int i = 0 ; i < 3 ; ++i) { t1[i] = thread (produce, i + 100 ); t2[i] = thread (consume); } for (int i = 0 ; i < 3 ; i ++) { t1[i].join (); t2[i].join (); } return 0 ; }
以上介绍的两种互斥锁各自有各自的特点,condition_variable
配合 unique_lock
使用更灵活一些,可以在任何时候自由地释放互斥锁,而 condition_variable_any
如果和 lock_guard
一起使用必须要等到其生命周期结束才能将互斥锁释放。 但是,condition_variable_any
可以和多种互斥锁配合使用,应用场景也更广,而 condition_variable
只能和独占的非递归互斥锁mutex
配合使用,有一定的局限性。
参考资料 https://subingwen.cn/cpp/mutex https://subingwen.cn/cpp/condition