线程同步的概念 假设有 4 个线程 A、B、C、D,当前一个线程 A 对内存中的共享资源进行访问的时候,其他线程 B、C、D 都不可以对这块内存进行操作,直到线程 A 对这块内存访问完毕为止,B、C、D 中的一个才能访问这块内存,剩余的两个需要继续阻塞等待,以此类推,直至所有的线程都对这块内存操作完毕。 线程对内存的这种访问方式就称之为线程同步 ,所谓的同步并不是多个线程同时对内存进行访问,而是按照先后顺序依次进行的。
为什么要同步 例子:两个线程交替数数(每个线程数 50 个数,交替数到 100)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 #include <stdio.h> #include <unistd.h> #include <pthread.h> #define MAX 50 int number;void * funcA (void * arg) { for (int i = 0 ; i < MAX; i ++) { int cur = number; cur ++; usleep (10 ); number = cur; printf ("Thread id = %lu, number = %d\n" , pthread_self (), number); } return NULL ; }void * funcB (void * arg) { for (int i = 0 ; i < MAX; i ++) { int cur = number; cur ++; number = cur; printf ("Thread id = %lu, number = %d\n" , pthread_self (), number); usleep (5 ); } return NULL ; }int main (int argc, const char * argv[]) { pthread_t p1, p2; pthread_create (&p1, NULL , funcA, NULL ); pthread_create (&p2, NULL , funcB, NULL ); pthread_join (p1, NULL ); pthread_join (p2, NULL ); return 0 ; }
上面的例子进行测试,虽然每个线程内部循环了 50 次每次数一数,但是最终没有数到 100。 通过输出的结果可知,有些数字被重复数了多次,原因就是没有对线程进行同步处理,造成了数据的混乱。
两个线程在数数的时候需要分时复用 CPU 时间片,并且测试程序中调用了 sleep()
导致线程的 CPU 时间片没用完就被迫挂起,这样就能让 CPU 的上下文切换(保存当前状态,下一次继续运行的时候需要加载保存的状态)更加频繁,更容易再现数据混乱的这个现象。CPU 对应寄存器、一级缓存、二级缓存、三级缓存是独占的,用于存储处理的数据和线程的状态信息,数据被 CPU 处理完成需要再次被写入到物理内存中,物理内存数据也可以通过文件 IO 操作写入到磁盘中。
在测试程序中两个线程共用全局变量 number
。当线程变成运行态之后开始数数,从物理内存加载数据,然后将数据放到 CPU 进行运算,最后将结果更新到物理内存中。如果数数的两个线程都可以顺利完成这个流程,那么得到的结果肯定是正确的。如果线程 A 执行这个过程期间就失去了 CPU 时间片,线程 A 被挂起了最新的数据没能更新到物理内存。线程 B 变成运行态之后从物理内存读数据,很显然它没有拿到最新数据,只能基于旧的数据往后数,然后失去 CPU 时间片挂起。线程 A 得到 CPU 时间片变成运行态,将上次没更新到内存的数据更新到内存,但是这样会导致线程 B 已经更新到内存的数据被覆盖,最终导致有些数据被重复数多次。
同步方式 对于多个线程访问共享资源出现数据混乱的问题,需要进行线程同步。常用的线程同步方式有四种: (1)互斥锁 (2)读写锁 (3)条件变量 (4)信号量 所谓的共享资源就是多个线程共同访问的变量,这些变量通常为全局数据区变量或者堆区变量,这些变量对应的共享资源也被称为临界资源。
找到临界资源之后,再找和临界资源相关的上下文代码,这样就得到了一个代码块,这个代码块可以称之为临界区。 确定好临界区(临界区越小越好)之后,就可以进行线程同步,线程同步的大致处理思路: (1)在临界区代码之上,添加加锁函数,对临界区加锁。 哪个线程调用这句代码,就会把这把锁锁上,其他线程就只能阻塞在锁上。 (2)在临界区代码之下,添加解锁函数,对临界区解锁。 出临界区的线程会将锁定的那把锁打开,其他抢到锁的线程就可以进入到临界区。 (3)锁机制能保证临界区代码最多只能同时有一个线程访问,这样并行访问就变为串行访问。
互斥锁 互斥锁函数 互斥锁是线程同步最常用的一种方式,通过互斥锁可以锁定一个代码块。 被锁定的这个代码块,所有的线程只能顺序执行,这样多线程访问共享资源数据混乱的问题就可以被解决。 付出的代价就是执行效率的降低,因为默认临界区多个线程是可以并行处理的,现在只能串行处理。
在 Linux 中互斥锁的类型为 pthread_mutex_t
,创建一个这种类型的变量就得到了一把互斥锁。在创建的锁对象中保存了当前这把锁的状态信息:锁定还是打开,如果是锁定状态还记录了给这把锁加锁的线程信息(线程 ID)。 一个互斥锁变量只能被一个线程锁定,被锁定之后其他线程再对互斥锁变量加锁就会被阻塞,直到这把互斥锁被解锁,被阻塞的线程才能被解除阻塞。 一般情况下,每一个共享资源对应一个把互斥锁,锁的个数和线程的个数无关。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 int pthread_mutex_init (pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr) ;int pthread_mutex_destroy (pthread_mutex_t *mutex) ;
1 2 int pthread_mutex_lock (pthread_mutex_t *mutex) ;
这个函数被调用,首先会判断参数 mutex
互斥锁中的状态是否为锁定状态: (1)如果没有被锁定,那么这个线程可以加锁成功,这个锁中会记录是哪个线程加锁成功; (2)如果被锁定,其他线程加锁就失败,这些线程都会阻塞在这把锁上。 当这把锁被解开之后,这些阻塞在锁上的线程就解除阻塞,并且这些线程是通过竞争的方式对这把锁加锁,没抢到锁的线程继续阻塞。
1 2 int pthread_mutex_trylock (pthread_mutex_t *mutex) ;
调用这个函数对互斥锁变量加锁还是有两种情况: (1)如果这把锁没有被锁定,线程加锁成功; (2)如果被锁住,调用这个函数加锁的线程不会被阻塞,加锁失败直接返回错误号。
1 2 int pthread_mutex_unlock (pthread_mutex_t *mutex) ;
不是所有的线程都可以对互斥锁解锁,哪个线程加的锁,哪个线程才能解锁成功。
互斥锁使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 #include <stdio.h> #include <unistd.h> #include <pthread.h> #define MAX 50 int number;pthread_mutex_t mutex;void * funcA (void * arg) { for (int i = 0 ; i < MAX; i ++) { pthread_mutex_lock (&mutex); int cur = number; cur ++; usleep (10 ); number = cur; printf ("Thread id = %lu, number = %d\n" , pthread_self (), number); pthread_mutex_unlock (&mutex); } return NULL ; }void * funcB (void * arg) { for (int i = 0 ; i < MAX; i ++) { pthread_mutex_lock (&mutex); int cur = number; cur ++; number = cur; printf ("Thread id = %lu, number = %d\n" , pthread_self (), number); pthread_mutex_unlock (&mutex); usleep (5 ); } return NULL ; }int main (int argc, const char * argv[]) { pthread_t p1, p2; pthread_mutex_init (&mutex, NULL ); pthread_create (&p1, NULL , funcA, NULL ); pthread_create (&p2, NULL , funcB, NULL ); pthread_join (p1, NULL ); pthread_join (p2, NULL ); pthread_mutex_destroy (&mutex); return 0 ; }
死锁 当多个线程访问共享资源需要加锁,如果锁使用不当,就会造成死锁这种现象。线程死锁造成的后果:所有的线程都被阻塞,并且线程的阻塞是无法解除的(因为可以解锁的线程也被阻塞)。
造成死锁的场景有如下几种: (1)加锁之后忘记解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void func () { for (int i = 0 ; i < 6 ; ++i) { pthread_mutex_lock (&mutex); } }void func () { for (int i = 0 ; i < 6 ; ++i) { pthread_mutex_lock (&mutex); if (xxx) { return ; } pthread_mutex_lock (&mutex); } }
(2)重复加锁,造成死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 void func () { for (int i = 0 ; i < 6 ; ++i) { pthread_mutex_lock (&mutex); pthread_mutex_lock (&mutex); pthread_mutex_unlock (&mutex); } }void funcA () { for (int i = 0 ; i < 6 ; ++i) { pthread_mutex_lock (&mutex); pthread_mutex_unlock (&mutex); } }void funcB () { for (int i = 0 ; i < 6 ; ++i) { pthread_mutex_lock (&mutex); funcA (); pthread_mutex_unlock (&mutex); } }
(3)在程序中有多个共享资源,因此有很多把锁,随意加锁,导致相互被阻塞
1 2 3 4 5 6 7 场景描述:1 .有两个共享资源 X 、Y (X 对应锁 A 、Y 对应锁 B ) - 线程 A 访问资源 X ,加锁 A - 线程 B 访问资源 Y ,加锁 B 2 .线程 A 要访问资源 Y ,线程 B 要访问资源 X (因为资源 X 和 Y 已经被对应的锁锁住了,因此这两个线程被阻塞) - 线程 A 被锁 B 阻塞,无法打开 A 锁 - 线程 B 被锁 A 阻塞,无法打开 B 锁
在使用多线程编程的时候,如何避免死锁? (1)避免多次锁定,多检查 (2)对共享资源访问完毕之后,一定要解锁(或者在加锁的使用 trylock) (3)如果程序中有多把锁,可以控制对锁的访问顺序(顺序访问共享资源,但有些情况下做不到)。另外在对其他互斥锁做加锁操作之前,先释放当前线程拥有的互斥锁。 (4)项目程序中可以引入一些专门用于死锁检测的模块
读写锁 读写锁函数 读写锁是互斥锁的升级版,读操作的时候可以提高程序的执行效率。 如果所有的线程都是做读操作,那么读是并行的。但是使用互斥锁,读操作也是串行的。
读写锁是一把锁,锁的类型为 pthread_rwlock_t
,有了类型之后就可以创建一把互斥锁。
1 pthread_rwlock_t rwlock;
称为读写锁,是因为这把锁既可以锁定读操作,也可以锁定写操作。为了方便理解,可以大致认为在这把锁中记录了这些信息: (1)锁的状态:锁定/打开 (2)锁定的操作:读操作/写操作(使用读写锁锁定了读操作,需要先解锁才能去锁定写操作) (3)哪个线程将这把锁锁上了
读写锁的使用方式,和互斥锁的使用方式是完全相同的: (1)找共享资源,确定临界区。 (2)在临界区的开始位置加锁(读锁/写锁),临界区的结束位置解锁。
读写锁可以锁定读或者写操作,读写锁的特点如下: (1)使用读写锁的读锁锁定了临界区,线程对临界区的访问是并行的,读锁是共享的。 (2)使用读写锁的写锁锁定了临界区,线程对临界区的访问是串行的,写锁是独占的。 (3)使用读写锁分别对两个临界区加了读锁和写锁,两个线程同时访问两个临界区,访问写锁临界区的线程继续运行,访问读锁临界区的线程阻塞,因为写锁比读锁的优先级高。
如果程序中所有的线程都对共享资源做写操作,使用读写锁没有优势,和互斥锁是一样的; 如果程序中所有的线程都对共享资源写操作/读操作,并且对共享资源读的操作越多,读写锁更有优势。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 #include <pthread.h> pthread_rwlock_t rwlock;int pthread_rwlock_init (pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr) ;int pthread_rwlock_destroy (pthread_rwlock_t *rwlock) ;
1 2 int pthread_rwlock_rdlock (pthread_rwlock_t *rwlock) ;
调用这个函数: 如果读写锁是打开的,那么加锁成功; 如果读写锁已经锁定了读操作,调用这个函数依然可以加锁成功,因为读锁是共享的; 如果读写锁已经锁定了写操作,调用这个函数的线程会被阻塞。
1 2 3 int pthread_rwlock_tryrdlock (pthread_rwlock_t *rwlock) ;
调用这个函数: 如果读写锁是打开的,那么加锁成功; 如果读写锁已经锁定了读操作,调用这个函数依然可以加锁成功,因为读锁是共享的; 如果读写锁已经锁定了写操作,调用这个函数加锁失败,对应的线程不会被阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。
1 2 int pthread_rwlock_wrlock (pthread_rwlock_t *rwlock) ;
调用这个函数: 如果读写锁是打开的,那么加锁成功; 如果读写锁已经锁定了读操作/写操作,调用这个函数的线程会被阻塞。
1 2 3 int pthread_rwlock_trywrlock (pthread_rwlock_t *rwlock) ;
调用这个函数: 如果读写锁是打开的,那么加锁成功; 如果读写锁已经锁定了读操作/写操作,调用这个函数加锁失败,但是线程不会阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。
1 2 int pthread_rwlock_unlock (pthread_rwlock_t *rwlock) ;
读写锁的使用 8 个线程操作同一个全局变量,3 个线程不定时写同一全局资源,5 个线程不定时读同一全局资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> #define MAX 50 int number;pthread_rwlock_t rwlock;void * read_num (void * arg) { for (int i = 0 ; i < MAX; i ++) { pthread_rwlock_rdlock (&rwlock); printf ("Thread read, id = %lu, number = %d\n" , pthread_self (), number); pthread_rwlock_unlock (&rwlock); usleep (rand () % 5 ); } return NULL ; }void * write_num (void * arg) { for (int i = 0 ; i < MAX; i ++) { pthread_rwlock_wrlock (&rwlock); int cur = number; cur ++; number = cur; printf ("Thread write, id = %lu, number = %d\n" , pthread_self (), number); pthread_rwlock_unlock (&rwlock); usleep (rand () % 5 ); } return NULL ; }int main (int argc, const char * argv[]) { pthread_t p1[5 ], p2[3 ]; pthread_rwlock_init (&rwlock, NULL ); for (int i = 0 ; i < 5 ; i ++) { pthread_create (&p1[i], NULL , read_num, NULL ); } for (int i = 0 ; i < 3 ; i ++) { pthread_create (&p2[i], NULL , write_num, NULL ); } for (int i = 0 ; i < 5 ; i ++) { pthread_join (p1[i], NULL ); } for (int i = 0 ; i < 3 ; i ++) { pthread_join (p2[i], NULL ); } pthread_rwlock_destroy (&rwlock); return 0 ; }
条件变量 严格意义上来说,条件变量的主要作用不是处理线程同步 ,而是进行线程的阻塞 。 如果在多线程程序中只使用条件变量无法实现线程的同步,必须要配合互斥锁 来使用。 虽然条件变量和互斥锁都能阻塞线程,但是二者的效果不一样,二者的区别如下: (1)假设有 A-Z
26 个线程,这 26 个线程共同访问同一把互斥锁。如果线程 A 加锁成功,那么其余 B-Z 线程访问互斥锁都阻塞,所有的线程只能顺序访问临界区。 (2)条件变量只有在满足指定条件下才会阻塞线程。如果条件不满足,多个线程可以同时进入临界区,同时读写临界资源,这种情况下还是会出现共享资源中数据的混乱。
一般情况下,条件变量用于处理生产者和消费者模型,并且和互斥锁配合使用。 条件变量类型对应的类型为 pthread_cond_t
,定义一个条件变量类型的变量:
被条件变量阻塞的线程信息会被记录到这个变量中,在解除阻塞的时候使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 #include <pthread.h> pthread_cond_t cond;int pthread_cond_init (pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr) ;int pthread_cond_destroy (pthread_cond_t *cond) ;
1 2 int pthread_cond_wait (pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex) ;
通过函数原型可知,该函数在阻塞线程的时候,需要一个互斥锁参数。这个互斥锁主要功能是进行线程同步,让线程顺序进入临界区,避免出现数共享资源的数据混乱。
该函数会对这个互斥锁做以下几件事情: (1)阻塞线程时候,如果线程已经对互斥锁 mutex
上锁,那么会将这把锁打开,这样做是为了避免死锁。 (2)线程解除阻塞的时候,函数内部会帮助这个线程再次将这个 mutex
互斥锁锁上,继续向下访问临界区。
1 2 3 4 5 6 7 8 struct timespec { time_t tv_sec; long tv_nsec; };int pthread_cond_timedwait (pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime) ;
这个函数的前两个参数和 pthread_cond_wait
函数是一样的,第三个参数表示线程阻塞的时长。注意:struct timespec 这个结构体中记录的时间是从 1971.1.1 到某个时间点的时间,总长度使用秒/纳秒表示,因此赋值方式相对要麻烦一点:
1 2 3 4 time_t mytim = time (NULL ); struct timespec tmsp; tmsp.tv_nsec = 0 ; tmsp.tv_sec = time (NULL ) + 100 ;
1 2 3 4 int pthread_cond_signal (pthread_cond_t *cond) ;int pthread_cond_broadcast (pthread_cond_t *cond) ;
调用上面两个函数中的任意一个,都可以唤醒被 pthread_cond_wait
或者 pthread_cond_timedwait
阻塞的线程。 区别在于:pthread_cond_signal
是唤醒至少一个被阻塞的线程(总个数不定),pthread_cond_broadcast
是唤醒所有被阻塞的线程。
生产者和消费者 生产者和消费者模型的组成: (1)生产者线程(若干个) 生产商品或者任务放入到任务队列中 任务队列满了 就阻塞,不满 的时候就工作 通过一个生产者的条件变量 控制生产者线程阻塞和非阻塞 (2)消费者线程(若干个) 读任务队列,将任务或者数据取出 任务队列中有数据 就消费,没有数据 就阻塞 通过一个消费者的条件变量 控制消费者线程阻塞和非阻塞 (3)队列(存储任务/数据) 对应一块内存,为了读写访问可以通过一个数据结构维护这块内存 可以是数组、链表,也可以使用 STL
容器(queue/stack/list/vector)
场景描述:使用条件变量实现生产者和消费者模型。 生产者有 5 个,往链表头部添加节点;消费者也有 5 个,删除链表头部的节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define MAX 5 pthread_cond_t cond;pthread_mutex_t mutex;struct Node { int val; struct Node * next; };struct Node * head = NULL ;void * producer (void * arg) { while (1 ) { pthread_mutex_lock (&mutex); struct Node * newNode = (struct Node*)malloc (sizeof (struct Node)); newNode->val = rand () % 1000 ; newNode->next = head; head = newNode; printf ("producer thread, id = %lu, val = %d\n" , pthread_self (), newNode->val); pthread_mutex_unlock (&mutex); pthread_cond_broadcast (&cond); sleep (rand () % 3 ); } return NULL ; }void * consumer (void * arg) { while (1 ) { pthread_mutex_lock (&mutex); while (head == NULL ) { pthread_cond_wait (&cond, &mutex); } struct Node * node = head; printf ("consumer thread, id = %lu, val = %d\n" , pthread_self (), node->val); head = head->next; free (node); pthread_mutex_unlock (&mutex); sleep (rand () % 3 ); } return NULL ; }int main () { pthread_mutex_init (&mutex, NULL ); pthread_cond_init (&cond, NULL ); pthread_t t1[MAX], t2[MAX]; for (int i = 0 ; i < MAX; i ++) { pthread_create (&t1[i], NULL , producer, NULL ); pthread_create (&t2[i], NULL , consumer, NULL ); } for (int i = 0 ; i < MAX; i ++) { pthread_join (t1[i], NULL ); pthread_join (t2[i], NULL ); } pthread_mutex_destroy (&mutex); pthread_cond_destroy (&cond); return 0 ; }
信号量 信号量函数 信号量用在多线程多任务同步的,一个线程完成了某一个动作就通过信号量告诉别的线程,别的线程再执行某些动作。 信号量不一定是锁定某一个资源,而是流程上的概念。比如:有 A、B 两个线程,B 线程要等 A 线程完成某一任务以后再进行自己下面的步骤,这个任务并不一定是锁定某一资源,还可以是进行一些计算或者数据处理等。
信号量(信号灯)与互斥锁和条件变量的主要不同在于“灯” 的概念,灯亮则表示着资源可用,灯灭则表示资源不可用。 信号量主要阻塞线程 ,不能完全保证线程安全。如果要保证线程安全,需要信号量和互斥锁 一起使用。 信号量和条件变量一样用于处理生产者和消费者模型,用于阻塞生产者线程或者消费者线程的运行。
信号的类型为 sem_t
:
1 2 #include <semaphore.h> sem_t sem;
Linux 提供的信号量操作函数原型如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <semaphore.h> int sem_init (sem_t *sem, int pshared, unsigned int value) ;int sem_destroy (sem_t *sem) ;
1 2 3 int sem_wait (sem_t *sem) ;
当线程调用这个函数,并且 sem 中的资源数 > 0,线程不会阻塞,线程会占用 sem 中的一个资源,因此资源数 -1, 直到 sem 中的资源数减为 0 时,资源被耗尽,因此线程也就被阻塞。
1 2 3 int sem_trywait (sem_t *sem) ;
当线程调用这个函数,并且 sem 中的资源数 > 0,线程不会阻塞,线程会占用 sem 中的一个资源,因此资源数 -1, 直到 sem 中的资源数减为 0 时,资源被耗尽,但是线程不会被阻塞,直接返回错误号。 因此可以在程序中添加判断分支,用于处理获取资源失败之后的情况。
1 2 int sem_post (sem_t *sem) ;
调用该函数会将 sem 中的资源数 +1, 如果有线程在调用 sem_wait
、sem_trywait
、sem_timedwait
时因为 sem 中的资源数为 0 被阻塞,这时这些线程会解除阻塞,获取到资源之后继续向下运行。
1 2 3 int sem_getvalue (sem_t *sem, int *sval) ;
通过这个函数可以查看 sem 中现在拥有的资源个数,通过第二个参数 sval 将数据传出,第二个参数的作用和返回值是一样的。
生产者和消费者 由于生产者和消费者是两类线程,并且在还没有生成之前是不能进行消费的, 在使用信号量处理这类问题的时候可以定义两个信号量,分别用于记录生产者和消费者线程拥有的总资源数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 sem_t psem;sem_t csem;sem_init (&psem, 0 , 5 ); sem_init (&csem, 0 , 0 ); sem_wait (&psem); sem_post (&csem); sem_wait (&csem);sem_post (&psem);
信号量使用 场景描述:使用信号量实现生产者和消费者模型。 生产者有 5 个,往链表头部添加节点;消费者也有 5 个,删除链表头部的节点。
总资源数为 1 如果生产者和消费者线程使用的信号量对应的总资源数为 1, 那么不管线程有多少个,可以工作的线程只有一个,其余线程由于拿不到资源都被迫阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #define MAX 5 sem_t semp;sem_t semc;pthread_mutex_t mutex;struct Node { int val; struct Node * next; };struct Node * head = NULL ;void * producer (void * arg) { while (1 ) { sem_wait (&semp); struct Node * newNode = (struct Node*)malloc (sizeof (struct Node)); newNode->val = rand () % 1000 ; newNode->next = head; head = newNode; printf ("producer thread, id = %lu, val = %d\n" , pthread_self (), newNode->val); sem_post (&semc); sleep (rand () % 3 ); } return NULL ; }void * consumer (void * arg) { while (1 ) { sem_wait (&semc); struct Node * node = head; printf ("consumer thread, id = %lu, val = %d\n" , pthread_self (), node->val); head = head->next; free (node); sem_post (&semp); sleep (rand () % 3 ); } return NULL ; }int main () { sem_init (&semp, 0 , 1 ); sem_init (&semc, 0 , 0 ); pthread_t t1[MAX], t2[MAX]; for (int i = 0 ; i < MAX; i ++) { pthread_create (&t1[i], NULL , producer, NULL ); pthread_create (&t2[i], NULL , consumer, NULL ); } for (int i = 0 ; i < MAX; i ++) { pthread_join (t1[i], NULL ); pthread_join (t2[i], NULL ); } pthread_mutex_destroy (&mutex); sem_destroy (&semp); sem_destroy (&semc); return 0 ; }
通过测试代码得到如下结论: 如果生产者和消费者使用的信号量总资源数为 1,那么不会出现生产者线程和消费者线程同时访问共享资源的情况。不管生产者和消费者线程有多少个,它们都是顺序执行的。
总资源数大于 1 如果生产者和消费者线程使用的信号量对应的总资源数为大于 1,这种场景下出现的情况就比较多: (1)多个生产者线程同时生产 (2)多个消费者同时消费 (3)生产者线程和消费者线程同时生产和消费 以上不管哪一种情况都可能会出现多个线程访问共享资源的情况, 如果要防止共享资源出现数据混乱,那么需要使用互斥锁进行线程同步,处理代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #define MAX 5 sem_t semp;sem_t semc;pthread_mutex_t mutex;struct Node { int val; struct Node * next; };struct Node * head = NULL ;void * producer (void * arg) { while (1 ) { sem_wait (&semp); pthread_mutex_lock (&mutex); struct Node * newNode = (struct Node*)malloc (sizeof (struct Node)); newNode->val = rand () % 1000 ; newNode->next = head; head = newNode; printf ("producer thread, id = %lu, val = %d\n" , pthread_self (), newNode->val); pthread_mutex_unlock (&mutex); sem_post (&semc); sleep (rand () % 3 ); } return NULL ; }void * consumer (void * arg) { while (1 ) { sem_wait (&semc); pthread_mutex_lock (&mutex); struct Node * node = head; printf ("consumer thread, id = %lu, val = %d\n" , pthread_self (), node->val); head = head->next; free (node); pthread_mutex_unlock (&mutex); sem_post (&semp); sleep (rand () % 3 ); } return NULL ; }int main () { sem_init (&semp, 0 , 5 ); sem_init (&semc, 0 , 0 ); pthread_mutex_init (&mutex, NULL ); pthread_t t1[MAX], t2[MAX]; for (int i = 0 ; i < MAX; ++i) { pthread_create (&t1[i], NULL , producer, NULL ); pthread_create (&t2[i], NULL , consumer, NULL ); } for (int i = 0 ; i < MAX; ++i) { pthread_join (t1[i], NULL ); pthread_join (t2[i], NULL ); } pthread_mutex_destroy (&mutex); sem_destroy (&semp); sem_destroy (&semc); return 0 ; }
编写上述代码的时候还有一个注意事项:不论是消费者线程的处理函数还是生产者线程的处理函数,内部有这两行代码:
1 2 3 4 5 6 7 sem_wait (&csem);pthread_mutex_lock (&mutex);sem_wait (&csem);pthread_mutex_lock (&mutex);
这两行代码的调用顺序是不能颠倒的,如果颠倒过来就有可能会造成死锁。
下面分析一种死锁的场景:初始化状态下消费者线程没有任务信号量资源。 假设某一个消费者线程先运行,调用 pthread_mutex_lock(&mutex)
对互斥锁加锁成功,然后调用 sem_wait(&csem)
由于没有资源,因此被阻塞。其余的消费者线程由于没有抢到互斥锁,因此被阻塞在互斥锁上。 对应生产者线程第一步操作也是调用 pthread_mutex_lock(&mutex)
,但是这时候互斥锁已经被消费者线程锁上了,所有生产者都被阻塞。 到此为止,多余的线程都被阻塞了,程序产生了死锁。
参考资料 https://subingwen.cn/linux/thread-sync/