线程同步

线程同步的概念

假设有 4 个线程 A、B、C、D,当前一个线程 A 对内存中的共享资源进行访问的时候,其他线程 B、C、D 都不可以对这块内存进行操作,直到线程 A 对这块内存访问完毕为止,B、C、D 中的一个才能访问这块内存,剩余的两个需要继续阻塞等待,以此类推,直至所有的线程都对这块内存操作完毕。
线程对内存的这种访问方式就称之为线程同步,所谓的同步并不是多个线程同时对内存进行访问,而是按照先后顺序依次进行的。

为什么要同步

例子:两个线程交替数数(每个线程数 50 个数,交替数到 100)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

#define MAX 50
// 全局变量
int number;

// 线程处理函数
void* funcA(void* arg)
{
for (int i = 0; i < MAX; i ++)
{
int cur = number;
cur ++;
usleep(10);
number = cur;
printf("Thread id = %lu, number = %d\n", pthread_self(), number);
}

return NULL;
}

void* funcB(void* arg)
{
for (int i = 0; i < MAX; i ++)
{
int cur = number;
cur ++;
number = cur;
printf("Thread id = %lu, number = %d\n", pthread_self(), number);
usleep(5);
}

return NULL;
}

int main(int argc, const char* argv[])
{
pthread_t p1, p2;

// 创建线程
pthread_create(&p1, NULL, funcA, NULL);
pthread_create(&p2, NULL, funcB, NULL);

// 阻塞,回收资源
pthread_join(p1, NULL);
pthread_join(p2, NULL);

return 0;
}
// gcc .\counter.c -lpthread -o app

上面的例子进行测试,虽然每个线程内部循环了 50 次每次数一数,但是最终没有数到 100。
通过输出的结果可知,有些数字被重复数了多次,原因就是没有对线程进行同步处理,造成了数据的混乱。

两个线程在数数的时候需要分时复用 CPU 时间片,并且测试程序中调用了 sleep() 导致线程的 CPU 时间片没用完就被迫挂起,这样就能让 CPU 的上下文切换(保存当前状态,下一次继续运行的时候需要加载保存的状态)更加频繁,更容易再现数据混乱的这个现象。CPU 对应寄存器、一级缓存、二级缓存、三级缓存是独占的,用于存储处理的数据和线程的状态信息,数据被 CPU 处理完成需要再次被写入到物理内存中,物理内存数据也可以通过文件 IO 操作写入到磁盘中。

在测试程序中两个线程共用全局变量 number。当线程变成运行态之后开始数数,从物理内存加载数据,然后将数据放到 CPU 进行运算,最后将结果更新到物理内存中。如果数数的两个线程都可以顺利完成这个流程,那么得到的结果肯定是正确的。如果线程 A 执行这个过程期间就失去了 CPU 时间片,线程 A 被挂起了最新的数据没能更新到物理内存。线程 B 变成运行态之后从物理内存读数据,很显然它没有拿到最新数据,只能基于旧的数据往后数,然后失去 CPU 时间片挂起。线程 A 得到 CPU 时间片变成运行态,将上次没更新到内存的数据更新到内存,但是这样会导致线程 B 已经更新到内存的数据被覆盖,最终导致有些数据被重复数多次。

同步方式

对于多个线程访问共享资源出现数据混乱的问题,需要进行线程同步。常用的线程同步方式有四种:
(1)互斥锁
(2)读写锁
(3)条件变量
(4)信号量
所谓的共享资源就是多个线程共同访问的变量,这些变量通常为全局数据区变量或者堆区变量,这些变量对应的共享资源也被称为临界资源。

找到临界资源之后,再找和临界资源相关的上下文代码,这样就得到了一个代码块,这个代码块可以称之为临界区。
确定好临界区(临界区越小越好)之后,就可以进行线程同步,线程同步的大致处理思路:
(1)在临界区代码之上,添加加锁函数,对临界区加锁。
哪个线程调用这句代码,就会把这把锁锁上,其他线程就只能阻塞在锁上。
(2)在临界区代码之下,添加解锁函数,对临界区解锁。
出临界区的线程会将锁定的那把锁打开,其他抢到锁的线程就可以进入到临界区。
(3)锁机制能保证临界区代码最多只能同时有一个线程访问,这样并行访问就变为串行访问。

互斥锁

互斥锁函数

互斥锁是线程同步最常用的一种方式,通过互斥锁可以锁定一个代码块。
被锁定的这个代码块,所有的线程只能顺序执行,这样多线程访问共享资源数据混乱的问题就可以被解决。
付出的代价就是执行效率的降低,因为默认临界区多个线程是可以并行处理的,现在只能串行处理。

在 Linux 中互斥锁的类型为 pthread_mutex_t,创建一个这种类型的变量就得到了一把互斥锁。在创建的锁对象中保存了当前这把锁的状态信息:锁定还是打开,如果是锁定状态还记录了给这把锁加锁的线程信息(线程 ID)。
一个互斥锁变量只能被一个线程锁定,被锁定之后其他线程再对互斥锁变量加锁就会被阻塞,直到这把互斥锁被解锁,被阻塞的线程才能被解除阻塞。
一般情况下,每一个共享资源对应一个把互斥锁,锁的个数和线程的个数无关。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 初始化互斥锁
// restrict: 是一个关键字, 用来修饰指针, 只有这个关键字修饰的指针可以访问指向的内存地址, 其他指针是不行的
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
/*
参数:
mutex:
互斥锁变量的地址
attr:
互斥锁的属性,一般使用默认属性,这个参数指定为 NULL
*/

// 释放互斥锁资源
int pthread_mutex_destroy(pthread_mutex_t *mutex);
1
2
// 修改互斥锁的状态, 将其设定为锁定状态, 这个状态被写入到参数 mutex
int pthread_mutex_lock(pthread_mutex_t *mutex);

这个函数被调用,首先会判断参数 mutex 互斥锁中的状态是否为锁定状态:
(1)如果没有被锁定,那么这个线程可以加锁成功,这个锁中会记录是哪个线程加锁成功;
(2)如果被锁定,其他线程加锁就失败,这些线程都会阻塞在这把锁上。
当这把锁被解开之后,这些阻塞在锁上的线程就解除阻塞,并且这些线程是通过竞争的方式对这把锁加锁,没抢到锁的线程继续阻塞。

1
2
// 尝试加锁
int pthread_mutex_trylock(pthread_mutex_t *mutex);

调用这个函数对互斥锁变量加锁还是有两种情况:
(1)如果这把锁没有被锁定,线程加锁成功;
(2)如果被锁住,调用这个函数加锁的线程不会被阻塞,加锁失败直接返回错误号。

1
2
// 对互斥锁解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);

不是所有的线程都可以对互斥锁解锁,哪个线程加的锁,哪个线程才能解锁成功。

互斥锁使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

#define MAX 50
// 全局变量
int number;
// 互斥锁
// 全局变量,多个线程共享
pthread_mutex_t mutex;

void* funcA(void* arg)
{
for (int i = 0; i < MAX; i ++)
{
pthread_mutex_lock(&mutex);
int cur = number;
cur ++;
usleep(10);
number = cur;
printf("Thread id = %lu, number = %d\n", pthread_self(), number);
pthread_mutex_unlock(&mutex);
}

return NULL;
}

void* funcB(void* arg)
{
for (int i = 0; i < MAX; i ++)
{
pthread_mutex_lock(&mutex);
int cur = number;
cur ++;
number = cur;
printf("Thread id = %lu, number = %d\n", pthread_self(), number);
pthread_mutex_unlock(&mutex);
usleep(5);
}

return NULL;
}

int main(int argc, const char* argv[])
{
pthread_t p1, p2;
// 互斥锁初始化
pthread_mutex_init(&mutex, NULL);

// 创建两个子线程
pthread_create(&p1, NULL, funcA, NULL);
pthread_create(&p2, NULL, funcB, NULL);

// 阻塞,资源回收
pthread_join(p1, NULL);
pthread_join(p2, NULL);

// 互斥锁释放
pthread_mutex_destroy(&mutex);

return 0;
}
// gcc .\counter2.c -lpthread -o app

死锁

当多个线程访问共享资源需要加锁,如果锁使用不当,就会造成死锁这种现象。线程死锁造成的后果:所有的线程都被阻塞,并且线程的阻塞是无法解除的(因为可以解锁的线程也被阻塞)。

造成死锁的场景有如下几种:
(1)加锁之后忘记解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 场景1
void func()
{
for (int i = 0; i < 6; ++i)
{
// 当前线程 A 加锁成功,当前循环完毕没有解锁,在下一轮循环的时候自己被阻塞
// 其余的线程也被阻塞
pthread_mutex_lock(&mutex);
// ...
// 忘记解锁
}
}

// 场景2
void func()
{
for (int i = 0; i < 6; ++i)
{
// 当前线程 A 加锁成功
// 其余的线程被阻塞
pthread_mutex_lock(&mutex);
// ...
if (xxx)
{
// 函数退出, 没有解锁(解锁函数无法被执行了)
return;
}

pthread_mutex_lock(&mutex);
}
}

(2)重复加锁,造成死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void func()
{
for (int i = 0; i < 6; ++i)
{
// 当前线程 A 加锁成功
// 其余的线程阻塞
pthread_mutex_lock(&mutex);
// 锁被锁住,A线程阻塞
pthread_mutex_lock(&mutex);
// ...
pthread_mutex_unlock(&mutex);
}
}

// 隐藏的比较深的情况
void funcA()
{
for(int i = 0; i < 6; ++i)
{
// 当前线程 A 加锁成功
// 其余的线程阻塞
pthread_mutex_lock(&mutex);
// ...
pthread_mutex_unlock(&mutex);
}
}

void funcB()
{
for(int i = 0; i < 6; ++i)
{
// 当前线程A加锁成功
// 其余的线程阻塞
pthread_mutex_lock(&mutex);
funcA(); // 重复加锁
// ....
pthread_mutex_unlock(&mutex);
}
}

(3)在程序中有多个共享资源,因此有很多把锁,随意加锁,导致相互被阻塞

1
2
3
4
5
6
7
场景描述:
1.有两个共享资源 XYX 对应锁 AY 对应锁 B
- 线程 A 访问资源 X,加锁 A
- 线程 B 访问资源 Y,加锁 B
2.线程 A 要访问资源 Y,线程 B 要访问资源 X(因为资源 XY 已经被对应的锁锁住了,因此这两个线程被阻塞)
- 线程 A 被锁 B 阻塞,无法打开 A
- 线程 B 被锁 A 阻塞,无法打开 B

在使用多线程编程的时候,如何避免死锁?
(1)避免多次锁定,多检查
(2)对共享资源访问完毕之后,一定要解锁(或者在加锁的使用 trylock)
(3)如果程序中有多把锁,可以控制对锁的访问顺序(顺序访问共享资源,但有些情况下做不到)。另外在对其他互斥锁做加锁操作之前,先释放当前线程拥有的互斥锁。
(4)项目程序中可以引入一些专门用于死锁检测的模块

读写锁

读写锁函数

读写锁是互斥锁的升级版,读操作的时候可以提高程序的执行效率。
如果所有的线程都是做读操作,那么读是并行的。但是使用互斥锁,读操作也是串行的。

读写锁是一把锁,锁的类型为 pthread_rwlock_t,有了类型之后就可以创建一把互斥锁。

1
pthread_rwlock_t rwlock;

称为读写锁,是因为这把锁既可以锁定读操作,也可以锁定写操作。为了方便理解,可以大致认为在这把锁中记录了这些信息:
(1)锁的状态:锁定/打开
(2)锁定的操作:读操作/写操作(使用读写锁锁定了读操作,需要先解锁才能去锁定写操作)
(3)哪个线程将这把锁锁上了

读写锁的使用方式,和互斥锁的使用方式是完全相同的:
(1)找共享资源,确定临界区。
(2)在临界区的开始位置加锁(读锁/写锁),临界区的结束位置解锁。

读写锁可以锁定读或者写操作,读写锁的特点如下:
(1)使用读写锁的读锁锁定了临界区,线程对临界区的访问是并行的,读锁是共享的。
(2)使用读写锁的写锁锁定了临界区,线程对临界区的访问是串行的,写锁是独占的。
(3)使用读写锁分别对两个临界区加了读锁和写锁,两个线程同时访问两个临界区,访问写锁临界区的线程继续运行,访问读锁临界区的线程阻塞,因为写锁比读锁的优先级高。

如果程序中所有的线程都对共享资源做写操作,使用读写锁没有优势,和互斥锁是一样的;
如果程序中所有的线程都对共享资源写操作/读操作,并且对共享资源读的操作越多,读写锁更有优势。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <pthread.h>
pthread_rwlock_t rwlock;
// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);

// 释放读写锁占用的系统资源
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
/*
参数:
rwlock:
读写锁的地址,传出参数
attr:
读写锁属性,一般使用默认属性,指定为 NULL
*/
1
2
// 在程序中对读写锁加读锁, 锁定的是读操作
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

调用这个函数:
如果读写锁是打开的,那么加锁成功;
如果读写锁已经锁定了读操作,调用这个函数依然可以加锁成功,因为读锁是共享的;
如果读写锁已经锁定了写操作,调用这个函数的线程会被阻塞。

1
2
3
// 这个函数可以有效的避免死锁
// 如果加读锁失败, 不会阻塞当前线程, 直接返回错误号
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);

调用这个函数:
如果读写锁是打开的,那么加锁成功;
如果读写锁已经锁定了读操作,调用这个函数依然可以加锁成功,因为读锁是共享的;
如果读写锁已经锁定了写操作,调用这个函数加锁失败,对应的线程不会被阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。

1
2
// 在程序中对读写锁加写锁, 锁定的是写操作
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

调用这个函数:
如果读写锁是打开的,那么加锁成功;
如果读写锁已经锁定了读操作/写操作,调用这个函数的线程会被阻塞。

1
2
3
// 这个函数可以有效的避免死锁
// 如果加写锁失败, 不会阻塞当前线程, 直接返回错误号
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

调用这个函数:
如果读写锁是打开的,那么加锁成功;
如果读写锁已经锁定了读操作/写操作,调用这个函数加锁失败,但是线程不会阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。

1
2
// 解锁,锁定了读/写都可以解锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

读写锁的使用

8 个线程操作同一个全局变量,3 个线程不定时写同一全局资源,5 个线程不定时读同一全局资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

#define MAX 50
// 全局变量
int number;
// 读写锁
pthread_rwlock_t rwlock;

// 线程处理函数
void* read_num(void* arg)
{
for (int i = 0; i < MAX; i ++)
{
pthread_rwlock_rdlock(&rwlock);
printf("Thread read, id = %lu, number = %d\n", pthread_self(), number);
pthread_rwlock_unlock(&rwlock);
usleep(rand() % 5);
}

return NULL;
}

void* write_num(void* arg)
{
for (int i = 0; i < MAX; i ++)
{
pthread_rwlock_wrlock(&rwlock);
int cur = number;
cur ++;
number = cur;
printf("Thread write, id = %lu, number = %d\n", pthread_self(), number);
pthread_rwlock_unlock(&rwlock);
usleep(rand() % 5);
}

return NULL;
}

int main(int argc, const char* argv[])
{
pthread_t p1[5], p2[3];
pthread_rwlock_init(&rwlock, NULL);

// 创建两组子线程
for (int i = 0; i < 5; i ++)
{
pthread_create(&p1[i], NULL, read_num, NULL); // 读线程
}
for (int i = 0; i < 3; i ++)
{
pthread_create(&p2[i], NULL, write_num, NULL); // 写线程
}

// 阻塞,回收资源
for (int i = 0; i < 5; i ++)
{
pthread_join(p1[i], NULL);
}
for (int i = 0; i < 3; i ++)
{
pthread_join(p2[i], NULL);
}

pthread_rwlock_destroy(&rwlock);

return 0;
}
// gcc .\rwlock.c -lpthread -o app

条件变量

严格意义上来说,条件变量的主要作用不是处理线程同步,而是进行线程的阻塞
如果在多线程程序中只使用条件变量无法实现线程的同步,必须要配合互斥锁来使用。
虽然条件变量和互斥锁都能阻塞线程,但是二者的效果不一样,二者的区别如下:
(1)假设有 A-Z 26 个线程,这 26 个线程共同访问同一把互斥锁。如果线程 A 加锁成功,那么其余 B-Z 线程访问互斥锁都阻塞,所有的线程只能顺序访问临界区。
(2)条件变量只有在满足指定条件下才会阻塞线程。如果条件不满足,多个线程可以同时进入临界区,同时读写临界资源,这种情况下还是会出现共享资源中数据的混乱。

一般情况下,条件变量用于处理生产者和消费者模型,并且和互斥锁配合使用。
条件变量类型对应的类型为 pthread_cond_t,定义一个条件变量类型的变量:

1
pthread_cond_t cond;

被条件变量阻塞的线程信息会被记录到这个变量中,在解除阻塞的时候使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <pthread.h>
pthread_cond_t cond;
// 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);

// 销毁释放资源
int pthread_cond_destroy(pthread_cond_t *cond);
/*
参数:
cond:
条件变量的地址
attr:
条件变量属性,一般使用默认属性,指定为 NULL
*/
1
2
// 线程阻塞函数:哪个线程调用这个函数,哪个线程就会被阻塞
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

通过函数原型可知,该函数在阻塞线程的时候,需要一个互斥锁参数。这个互斥锁主要功能是进行线程同步,让线程顺序进入临界区,避免出现数共享资源的数据混乱。

该函数会对这个互斥锁做以下几件事情:
(1)阻塞线程时候,如果线程已经对互斥锁 mutex 上锁,那么会将这把锁打开,这样做是为了避免死锁。
(2)线程解除阻塞的时候,函数内部会帮助这个线程再次将这个 mutex 互斥锁锁上,继续向下访问临界区。

1
2
3
4
5
6
7
8
// 表示的时间是从 1971.1.1 到某个时间点的时间,总长度使用秒/纳秒表示
struct timespec {
time_t tv_sec; /* Seconds */
long tv_nsec; /* Nanoseconds [0 .. 999999999] */
};
// 将线程阻塞一定的时间长度,时间到达之后,线程就解除阻塞
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

这个函数的前两个参数和 pthread_cond_wait 函数是一样的,第三个参数表示线程阻塞的时长。注意:struct timespec 这个结构体中记录的时间是从 1971.1.1 到某个时间点的时间,总长度使用秒/纳秒表示,因此赋值方式相对要麻烦一点:

1
2
3
4
time_t mytim = time(NULL); // 1970.1.1 0:0:0 到当前的总秒数
struct timespec tmsp;
tmsp.tv_nsec = 0;
tmsp.tv_sec = time(NULL) + 100; // 线程阻塞 100s
1
2
3
4
// 唤醒阻塞在条件变量上的线程,至少有一个被解除阻塞
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒阻塞在条件变量上的线程,被阻塞的线程全部解除阻塞
int pthread_cond_broadcast(pthread_cond_t *cond);

调用上面两个函数中的任意一个,都可以唤醒被 pthread_cond_wait 或者 pthread_cond_timedwait 阻塞的线程。
区别在于:pthread_cond_signal 是唤醒至少一个被阻塞的线程(总个数不定),pthread_cond_broadcast 是唤醒所有被阻塞的线程。

生产者和消费者

生产者和消费者模型的组成:
(1)生产者线程(若干个)
生产商品或者任务放入到任务队列中
任务队列满了就阻塞,不满的时候就工作
通过一个生产者的条件变量控制生产者线程阻塞和非阻塞
(2)消费者线程(若干个)
读任务队列,将任务或者数据取出
任务队列中有数据就消费,没有数据就阻塞
通过一个消费者的条件变量控制消费者线程阻塞和非阻塞
(3)队列(存储任务/数据)
对应一块内存,为了读写访问可以通过一个数据结构维护这块内存
可以是数组、链表,也可以使用 STL 容器(queue/stack/list/vector)

场景描述:使用条件变量实现生产者和消费者模型。
生产者有 5 个,往链表头部添加节点;消费者也有 5 个,删除链表头部的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#define MAX 5

pthread_cond_t cond;
pthread_mutex_t mutex;

// 链表的节点类型
// 链表存放,不考虑仓库大小
struct Node
{
int val;
struct Node* next;
};
// 链表头节点
struct Node* head = NULL;

// 生产者
void* producer(void* arg)
{
while (1)
{
pthread_mutex_lock(&mutex);

// 创建新节点
struct Node* newNode = (struct Node*)malloc(sizeof(struct Node));
// 初始化节点
newNode->val = rand() % 1000;
newNode->next = head;
head = newNode;
printf("producer thread, id = %lu, val = %d\n", pthread_self(), newNode->val);

pthread_mutex_unlock(&mutex);
// 只要生产一个,就通知消费者去消费
// pthread_cond_signal(&cond);
pthread_cond_broadcast(&cond);
sleep(rand() % 3);
}

return NULL;
}

// 消费者
void* consumer(void* arg)
{
while (1)
{
pthread_mutex_lock(&mutex);

while (head == NULL)
{
// 阻塞消费者线程,阻塞会释放 mutex
pthread_cond_wait(&cond, &mutex);
}
struct Node* node = head;
printf("consumer thread, id = %lu, val = %d\n", pthread_self(), node->val);
head = head->next;
free(node);

pthread_mutex_unlock(&mutex);
sleep(rand() % 3);
}

return NULL;
}

int main()
{
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);

pthread_t t1[MAX], t2[MAX]; // 生产者、消费者
// 创建线程
for (int i = 0; i < MAX; i ++)
{
pthread_create(&t1[i], NULL, producer, NULL);
pthread_create(&t2[i], NULL, consumer, NULL);
}

// 回收线程
for (int i = 0; i < MAX; i ++)
{
pthread_join(t1[i], NULL);
pthread_join(t2[i], NULL);
}

pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);

return 0;
}

信号量

信号量函数

信号量用在多线程多任务同步的,一个线程完成了某一个动作就通过信号量告诉别的线程,别的线程再执行某些动作。
信号量不一定是锁定某一个资源,而是流程上的概念。比如:有 A、B 两个线程,B 线程要等 A 线程完成某一任务以后再进行自己下面的步骤,这个任务并不一定是锁定某一资源,还可以是进行一些计算或者数据处理等。

信号量(信号灯)与互斥锁和条件变量的主要不同在于“灯” 的概念,灯亮则表示着资源可用,灯灭则表示资源不可用。
信号量主要阻塞线程,不能完全保证线程安全。如果要保证线程安全,需要信号量和互斥锁一起使用。
信号量和条件变量一样用于处理生产者和消费者模型,用于阻塞生产者线程或者消费者线程的运行。

信号的类型为 sem_t

1
2
#include <semaphore.h>
sem_t sem;

Linux 提供的信号量操作函数原型如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <semaphore.h>
// 初始化信号量/信号灯
int sem_init(sem_t *sem, int pshared, unsigned int value);
/*
参数:
sem:
信号量变量地址
pshared:
0:线程同步
非 0:进程同步
value:
初始化当前信号量拥有的资源数(>=0)。如果资源数为 0,线程就会被阻塞。
*/

// 资源释放,线程销毁之后调用这个函数
// 参数 sem 是 sem_init() 的第一个参数
int sem_destroy(sem_t *sem);
1
2
3
// 参数 sem 是 sem_init() 的第一个参数  
// 函数被调用 sem 中的资源就会被消耗 1 个,资源数 -1
int sem_wait(sem_t *sem);

当线程调用这个函数,并且 sem 中的资源数 > 0,线程不会阻塞,线程会占用 sem 中的一个资源,因此资源数 -1,
直到 sem 中的资源数减为 0 时,资源被耗尽,因此线程也就被阻塞。

1
2
3
// 参数 sem 就是 sem_init() 的第一个参数  
// 函数被调用 sem 中的资源就会被消耗 1 个,资源数-1
int sem_trywait(sem_t *sem);

当线程调用这个函数,并且 sem 中的资源数 > 0,线程不会阻塞,线程会占用 sem 中的一个资源,因此资源数 -1,
直到 sem 中的资源数减为 0 时,资源被耗尽,但是线程不会被阻塞,直接返回错误号。
因此可以在程序中添加判断分支,用于处理获取资源失败之后的情况。

1
2
// 调用该函数给 sem 中的资源数 +1
int sem_post(sem_t *sem);

调用该函数会将 sem 中的资源数 +1,
如果有线程在调用 sem_waitsem_trywaitsem_timedwait 时因为 sem 中的资源数为 0 被阻塞,这时这些线程会解除阻塞,获取到资源之后继续向下运行。

1
2
3
// 查看信号量 sem 中的整形数的当前值,这个值会被写入到 sval 指针对应的内存中
// sval 是一个传出参数
int sem_getvalue(sem_t *sem, int *sval);

通过这个函数可以查看 sem 中现在拥有的资源个数,通过第二个参数 sval 将数据传出,第二个参数的作用和返回值是一样的。

生产者和消费者

由于生产者和消费者是两类线程,并且在还没有生成之前是不能进行消费的,
在使用信号量处理这类问题的时候可以定义两个信号量,分别用于记录生产者和消费者线程拥有的总资源数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 生产者线程 
sem_t psem;
// 消费者线程
sem_t csem;

// 信号量初始化
sem_init(&psem, 0, 5); // 5个生产者可以同时生产
sem_init(&csem, 0, 0); // 消费者线程没有资源,因此不能消费

// 生产者线程
// 在生产之前,从信号量中取出一个资源
sem_wait(&psem);
// 生产者商品代码,有商品了,放到任务队列
// ...
// 通知消费者消费,给消费者信号量添加资源,让消费者解除阻塞
sem_post(&csem);

// -----------------------------------------------------

// 消费者线程
// 消费者需要等待生产,默认启动之后应该阻塞
sem_wait(&csem);
// 开始消费
// ...
// 消费完成,通过生产者生产,给生产者信号量添加资源
sem_post(&psem);

信号量使用

场景描述:使用信号量实现生产者和消费者模型。
生产者有 5 个,往链表头部添加节点;消费者也有 5 个,删除链表头部的节点。

总资源数为 1

如果生产者和消费者线程使用的信号量对应的总资源数为 1,
那么不管线程有多少个,可以工作的线程只有一个,其余线程由于拿不到资源都被迫阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

#define MAX 5

// 生产者的信号量
sem_t semp;
// 消费者的信号量
sem_t semc;
pthread_mutex_t mutex;

// 链表的节点类型
// 链表存放,不考虑仓库大小
struct Node
{
int val;
struct Node* next;
};
// 链表头节点
struct Node* head = NULL;

// 生产者
void* producer(void* arg)
{
while (1)
{
sem_wait(&semp);

// 创建新节点
struct Node* newNode = (struct Node*)malloc(sizeof(struct Node));
// 初始化节点
newNode->val = rand() % 1000;
newNode->next = head;
head = newNode;
printf("producer thread, id = %lu, val = %d\n", pthread_self(), newNode->val);

sem_post(&semc);
sleep(rand() % 3);
}

return NULL;
}

// 消费者
void* consumer(void* arg)
{
while (1)
{
sem_wait(&semc);

struct Node* node = head;
printf("consumer thread, id = %lu, val = %d\n", pthread_self(), node->val);
head = head->next;
free(node);

sem_post(&semp);
sleep(rand() % 3);
}

return NULL;
}

int main()
{
// 生产者
sem_init(&semp, 0, 1);
// 消费者(资源初始化为 0,消费者线程启动就阻塞)
sem_init(&semc, 0, 0);

pthread_t t1[MAX], t2[MAX]; // 生产者、消费者
// 创建线程
for (int i = 0; i < MAX; i ++)
{
pthread_create(&t1[i], NULL, producer, NULL);
pthread_create(&t2[i], NULL, consumer, NULL);
}

// 回收线程
for (int i = 0; i < MAX; i ++)
{
pthread_join(t1[i], NULL);
pthread_join(t2[i], NULL);
}

pthread_mutex_destroy(&mutex);
sem_destroy(&semp);
sem_destroy(&semc);

return 0;
}

通过测试代码得到如下结论:
如果生产者和消费者使用的信号量总资源数为 1,那么不会出现生产者线程和消费者线程同时访问共享资源的情况。不管生产者和消费者线程有多少个,它们都是顺序执行的。

总资源数大于 1

如果生产者和消费者线程使用的信号量对应的总资源数为大于 1,这种场景下出现的情况就比较多:
(1)多个生产者线程同时生产
(2)多个消费者同时消费
(3)生产者线程和消费者线程同时生产和消费
以上不管哪一种情况都可能会出现多个线程访问共享资源的情况,
如果要防止共享资源出现数据混乱,那么需要使用互斥锁进行线程同步,处理代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

#define MAX 5

// 生产者的信号量
sem_t semp;
// 消费者的信号量
sem_t semc;
pthread_mutex_t mutex;

// 链表的节点类型
// 链表存放,不考虑仓库大小
struct Node
{
int val;
struct Node* next;
};
// 链表头节点
struct Node* head = NULL;

// 生产者
void* producer(void* arg)
{
while (1)
{
sem_wait(&semp);
pthread_mutex_lock(&mutex);

// 创建新节点
struct Node* newNode = (struct Node*)malloc(sizeof(struct Node));
// 初始化节点
newNode->val = rand() % 1000;
newNode->next = head;
head = newNode;
printf("producer thread, id = %lu, val = %d\n", pthread_self(), newNode->val);

pthread_mutex_unlock(&mutex);
sem_post(&semc);
sleep(rand() % 3);
}

return NULL;
}

// 消费者
void* consumer(void* arg)
{
while (1)
{
sem_wait(&semc);
pthread_mutex_lock(&mutex);

struct Node* node = head;
printf("consumer thread, id = %lu, val = %d\n", pthread_self(), node->val);
head = head->next;
free(node);

pthread_mutex_unlock(&mutex);
sem_post(&semp);
sleep(rand() % 3);
}

return NULL;
}

int main()
{
// 生产者
sem_init(&semp, 0, 5);
// 消费者(资源初始化为 0,消费者线程启动就阻塞)
sem_init(&semc, 0, 0);
pthread_mutex_init(&mutex, NULL);

pthread_t t1[MAX], t2[MAX]; // 生产者、消费者
// 创建线程
for (int i = 0; i < MAX; ++i)
{
pthread_create(&t1[i], NULL, producer, NULL);
pthread_create(&t2[i], NULL, consumer, NULL);
}

// 回收线程
for (int i = 0; i < MAX; ++i)
{
pthread_join(t1[i], NULL);
pthread_join(t2[i], NULL);
}

pthread_mutex_destroy(&mutex);
sem_destroy(&semp);
sem_destroy(&semc);

return 0;
}

编写上述代码的时候还有一个注意事项:不论是消费者线程的处理函数还是生产者线程的处理函数,内部有这两行代码:

1
2
3
4
5
6
7
// 消费者
sem_wait(&csem);
pthread_mutex_lock(&mutex);

// 生产者
sem_wait(&csem);
pthread_mutex_lock(&mutex);

这两行代码的调用顺序是不能颠倒的,如果颠倒过来就有可能会造成死锁。

下面分析一种死锁的场景:初始化状态下消费者线程没有任务信号量资源。
假设某一个消费者线程先运行,调用 pthread_mutex_lock(&mutex) 对互斥锁加锁成功,然后调用 sem_wait(&csem) 由于没有资源,因此被阻塞。其余的消费者线程由于没有抢到互斥锁,因此被阻塞在互斥锁上。
对应生产者线程第一步操作也是调用 pthread_mutex_lock(&mutex),但是这时候互斥锁已经被消费者线程锁上了,所有生产者都被阻塞。
到此为止,多余的线程都被阻塞了,程序产生了死锁。

参考资料

https://subingwen.cn/linux/thread-sync/


线程同步
https://lcf163.github.io/2021/08/20/线程同步/
作者
乘风的小站
发布于
2021年8月20日
许可协议