0%

Pthread学习笔记2

Pthread学习笔记2

临界区问题

当多个线程同时修改同一个内存地址时,可能会出现奇奇怪怪的问题。我们根据下面的公式编写一个计算π的程序:image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<sys/time.h>
int thread_count,n;
double sum;

void* Thread_sum(void* rank){
long my_rank=(long)rank;
double factor;
long long i;
long long my_n=n/thread_count;
long long my_first_i=my_n*my_rank;
long long my_last_i=my_first_i+my_n;

if(my_first_i%2==0){
factor=1.0;
}
else factor=-1.0;
for(i = my_first_i;i<my_last_i;i++,factor=-factor){
sum+=factor/(2*i+1);
}
return NULL;
}

int main(int argc,char** argv){
if(argc!=3)abort();
long thread;
pthread_t* thread_handles;
thread_count=strtol(argv[1],NULL,10);
printf("ans: %f\n",sum);
n=strtol(argv[2],NULL,10);

struct timeval tv,tv2;
gettimeofday(&tv,NULL);
thread_handles=malloc(thread_count*sizeof(pthread_t));
for(thread=0;thread<thread_count;thread++)pthread_create(&thread_handles[thread],NULL,Thread_sum,(void*)thread);
//printf("Hello from the main thread\n");
for(thread=0;thread<thread_count;thread++)pthread_join(thread_handles[thread],NULL);
// for(int i=0;i<m;i++)printf("%f\n",y[i]);
gettimeofday(&tv2,NULL);
printf("ans: %.8f\n",4*sum);
printf("%d\n",tv2.tv_sec*1000000 + tv2.tv_usec - tv.tv_sec*1000000 - tv.tv_usec);

return 0;
}

执行代码,当线程数大于1的时候会发现,程序输出的结果是不确定的,而且误差也较1个线程的情况大,甚至运行时间也是较长:
image.png

这种情况被称为临界区问题,当0号线程把sum取到寄存器并执行运算的时候,1号进程也可能同时把sum取到寄存器,然后0号进程把结果写回内存,随后1号进程也把结果写回内存。这样就导致0号进程的计算结果被覆盖掉了,我们称

1
sum+=factor/(2*i+1);

这样更新共享资源的语句为一个临界区。当多个线程尝试更新一个共享资源时,结果可能是无法预测的。更一般地,当多个线程都要访问共享变量或共享文件这样的共享资源时,如果有一个线程执行了更新操作,就可能产生错误,我们称之为竞争条件

临界区问题可以用忙等待、互斥量和信号量等方法来解决。

忙等待

一种解决方法是设置一个标记变量,用于指明临界区可以被哪个线程执行,该执行完之后修改这个标记变量,而不能执行临界区的线程必须一直处于忙等待状态。
我们可以将代码修改成这样:

1
2
3
while(flag!=my_rank);
sum+=factor/(2*i+1);
flag=(flag+1)%thread_count;

flag是我们说到的标记变量,当flag==当前线程号时,可以执行临界区,否则必须处于忙等待状态。执行完之后要修改flag。

运行结果如下:
image.png
可以看到,结果是正确了,但运行速度满了很多,甚至比单线程的情况还慢。

当然,由于临界区只能被串行执行,我们应该尽量减少临界区执行的次数,比如,我们可以把Thread_sum函数修改成这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void* Thread_sum(void* rank){
long my_rank=(long)rank;
double my_sum=0;
double factor;
long long i;
long long my_n=n/thread_count;
long long my_first_i=my_n*my_rank;
long long my_last_i=my_first_i+my_n;

if(my_first_i%2==0){
factor=1.0;
}
else factor=-1.0;
for(i = my_first_i;i<my_last_i;i++,factor=-factor){
// printf("%d %d\n",flag,my_rank);
my_sum+=factor/(2*i+1);
}
while(flag!=my_rank);
sum+=my_sum;
flag=(flag+1)%thread_count;
return NULL;
}

用一个局部变量来代替sum变量,这样循环的过程中就不会有临界区。
image.png
运行结果正确,速度上也有提升。

使用忙等待还会有一个问题:

考虑一个最多可以同时执行2个线程的计算机,我们使用忙等待的方法编写一个5个线程的pthreads程序。假设开始时,操作系统运行0号线程和1号线程,这是没有问题的,此时2,3,4三个线程被挂起。然后假设0号线程运行结束,操作系统将其挂起,然后调度3号线程。由于操作系统的线程调度是随机的,它完全不知道此时只有2号线程能进入临界区,所以3号线程只能一直处于忙等待状态,浪费了时间。

所以,当线程数量大于计算机能同时运行的线程数时,使用忙等待的方法可能会产生问题。

互斥量

由于处于忙等待的线程会持续地占用CPU资源,所以忙等待并不是一个特别理想的解决临界区问题的方案。我们可以使用互斥量和信号量。

互斥量是互斥锁的简称,是一个特殊类型的变量,它可以保证每次只有一个线程能进入临界区。

互斥量的变量类型是pthread_mutex_t,必须调用 pthread_mutex_init()函数进行初始化。

当线程执行到临界区前面时,应该调用pthread_mutex_lock()函数。如果此时临界区已经被加了锁(即已经有其他线程执行到了这里),则它会被卡在临界区外等待;否则它会进入临界区,并把临界区加锁。

当临界区被执行完,线程要用pthread_mutex_unlock函数把临界区解锁。此时,系统会从其他等待的线程选出一个进入临界区。如果有多个等待的线程,操作系统选择哪个线程是随机的的。

互斥量使用完之后要用pthread_mutex_destroy函数将其销毁。

互斥量的示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<sys/time.h>
int thread_count,n;
double sum;
int flag;
pthread_mutex_t mutex;
void* Thread_sum(void* rank){
long my_rank=(long)rank;
double my_sum=0;
double factor;
long long i;
long long my_n=n/thread_count;
long long my_first_i=my_n*my_rank;
long long my_last_i=my_first_i+my_n;

if(my_first_i%2==0){
factor=1.0;
}
else factor=-1.0;
for(i = my_first_i;i<my_last_i;i++,factor=-factor){
// printf("%d %d\n",flag,my_rank);
my_sum+=factor/(2*i+1);
}
pthread_mutex_lock(&mutex);
sum+=my_sum;
pthread_mutex_unlock(&mutex);
return NULL;
}

int main(int argc,char** argv){
if(argc!=3)abort();
long thread;
pthread_t* thread_handles;
thread_count=strtol(argv[1],NULL,10);
n=strtol(argv[2],NULL,10);
pthread_mutex_init(&mutex,NULL);
struct timeval tv,tv2;
gettimeofday(&tv,NULL);
thread_handles=malloc(thread_count*sizeof(pthread_t));
for(thread=0;thread<thread_count;thread++)pthread_create(&thread_handles[thread],NULL,Thread_sum,(void*)thread);
//printf("Hello from the main thread\n");
for(thread=0;thread<thread_count;thread++)pthread_join(thread_handles[thread],NULL);
// for(int i=0;i<m;i++)printf("%f\n",y[i]);
free(thread_handles);
pthread_mutex_destroy(&mutex);
gettimeofday(&tv2,NULL);
printf("ans: %.8f\n",4*sum);
printf("%d\n",tv2.tv_sec*1000000 + tv2.tv_usec - tv.tv_sec*1000000 - tv.tv_usec);

return 0;
}

运行结果:

image.png

信号量

信号量(semaphore)可以认为是一种特殊类型的unsigned int,它的类型是sem_t。信号量可以赋值为0,1,……。大多数情况下,我们只用0和1两个值。这种只有0和1值的信号量也称为二元信号量。0对应上了锁的互斥量,1对应没上锁的互斥量。

初始化信号量的方法是调用sem_init(),它的原型是:

1
int sem_init(sem_t* a,int b,int c);

其中第二个参数直接置为0就可以了,第三个参数是初始值。

如果我们要用信号量来解决临界区问题,可以先创建一个全局信号量,初始值为1。在临界区前调用sem_wait()函数,这个函数的意义是:如果信号量为0则阻塞;如果是非0值则减1然后进入临界区。临界区执行完之后,调用sem_post()函数将信号量置为1。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<semaphore.h>//注意要加这个头文件
#include<sys/time.h>
int thread_count,n;
double sum;
int flag;
sem_t my_semaphore;
void* Thread_sum(void* rank){
long my_rank=(long)rank;
double my_sum=0;
double factor;
long long i;
long long my_n=n/thread_count;
long long my_first_i=my_n*my_rank;
long long my_last_i=my_first_i+my_n;

if(my_first_i%2==0){
factor=1.0;
}
else factor=-1.0;
for(i = my_first_i;i<my_last_i;i++,factor=-factor){
my_sum+=factor/(2*i+1);
}
sem_wait(&my_semaphore);
sum+=my_sum;
printf("Thread %d\n",my_rank);
sem_post(&my_semaphore);
return NULL;
}

int main(int argc,char** argv){
if(argc!=3)abort();
long thread;
pthread_t* thread_handles;
thread_count=strtol(argv[1],NULL,10);
n=strtol(argv[2],NULL,10);
sem_init(&my_semaphore,0,1);

struct timeval tv,tv2;
gettimeofday(&tv,NULL);
thread_handles=malloc(thread_count*sizeof(pthread_t));
for(thread=0;thread<thread_count;thread++)pthread_create(&thread_handles[thread],NULL,Thread_sum,(void*)thread);
for(thread=0;thread<thread_count;thread++)pthread_join(thread_handles[thread],NULL);
free(thread_handles);
gettimeofday(&tv2,NULL);
printf("ans: %.8f\n",4*sum);
printf("%d\n",tv2.tv_sec*1000000 + tv2.tv_usec - tv.tv_sec*1000000 - tv.tv_usec);
return 0;
}

使用信号量还可以控制线程执行的顺序,如发送消息问题。我们可以给每个线程分配一个信号量,初始化为0,给每个线程一个char*,指向收到的消息。每一个线程把消息放到下一个线程后将下一个线程的信号量置为1,然后等待上一个线程的消息。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<semaphore.h>
#include<sys/time.h>
#define MSG_MAX 100
int thread_count,n;
double sum;
int flag;
pthread_mutex_t mutex;
char** messages;
sem_t* semaphores;
void* Thread_sum(void* rank){
long my_rank=(long)rank;
long dest=(my_rank+1)%thread_count;
char* my_msg=malloc(MSG_MAX*sizeof(char));
sprintf(my_msg,"Hello to %ld from %ld",dest,my_rank);
messages[dest]=my_msg;
sem_post(&semaphores[dest]);
sem_wait(&semaphores[my_rank]);
printf("Thread %ld > %s\n",my_rank,messages[my_rank]);
}

int main(int argc,char** argv){
if(argc!=2)abort();
long thread;
pthread_t* thread_handles;
thread_count=strtol(argv[1],NULL,10);
semaphores=(sem_t*)malloc(thread_count*sizeof(sem_t));
messages=(char**)malloc(thread_count*sizeof(char*));
struct timeval tv,tv2;
gettimeofday(&tv,NULL);
thread_handles=malloc(thread_count*sizeof(pthread_t));
for(thread=0;thread<thread_count;thread++)pthread_create(&thread_handles[thread],NULL,Thread_sum,(void*)thread);
//printf("Hello from the main thread\n");
for(thread=0;thread<thread_count;thread++)pthread_join(thread_handles[thread],NULL);
// for(int i=0;i<m;i++)printf("%f\n",y[i]);
free(thread_handles);
pthread_mutex_destroy(&mutex);
gettimeofday(&tv2,NULL);
printf("ans: %.8f\n",4*sum);
printf("%d\n",tv2.tv_sec*1000000 + tv2.tv_usec - tv.tv_sec*1000000 - tv.tv_usec);

return 0;
}

结果如下所示:
image.png

但如果我们把sem_wait那一行去掉,会出现这样的变化:
image.png
第0号线程没有收到消息的时候就调用printf函数,自然就得不到想要的结果。

像上面这种一个线程需要等待另一个线程执行某种操作的同步方式,有时称为生产者-消费者同步模型