0%

MPI学习笔记2

MPI学习笔记2

集合通信

MPI_Send和MPI_Recv可以实现不同进程之间的通信,但如果要跟很多进程发消息,或者要接受很多进程发送的消息,这样就太过麻烦。为此,MPI提供了一系列称为集合通信的函数,它涉及通信子之间的所有进程的通信。

MPI_Reduce

MPI_Reduce用于实现高效的全局运算,如求和,求最大值等。原型如下:

1
2
int MPI_Reduce(void* input_data_p,void* output_data_p,int count,
MPI_Datatype datatype,MPI_Op operator,int dest_process,MPI_Comm comm);

对于每个进程,我们调用MPI_Reduce,然后对目的进程,output_data_p指向的值会与每个进程的input_data_p指向的值进行运算。如果count参数的值大于1,则运算会在数组上进行。
支持的运算有:
|运算符值|含义|
|-|-|
|MPI_MAX|求最大值|
|MPI_MIN|求最小值|
|MPI_SUM|求累加和|
|MPI_PROD|求累乘值|
|MPI_LAND|逻辑与|
|MPI_BAND|按位与|
|MPI_LOR|逻辑或|
|MPI_BOR|按位或|
|MPI_LOR|逻辑异或|
|MPI_BOR|按位异或|
|MPI_MAXLOC|求最大值和最大值所在的位置|
|MPI_MINLOC|求最小值和最小值所在的位置|

一个示例程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include<mpi.h>
#include<bits/stdc++.h>
#include<sys/time.h>
using namespace std;

int main(int argc,char** argv){
int comm_sz,my_rank;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);
int val=my_rank*my_rank,sum=0;
MPI_Reduce(&val,&sum,1,MPI_INT,MPI_SUM,0,MPI_COMM_WORLD);
if(my_rank==0){cout<<sum<<endl;}
MPI_Finalize();
}

有两点值得注意:
1.第一个参数和第二个参数的指针不能相同,否则会得到非法的结果。
2.对于不是目标进程的进程,第二个参数实际上是没有作用的,可以置为NULL。

MPI_Allreduce

有时候,我们不仅需要全局运算,还需要把结果放到每个进程里,这时候可以调用MPI_Allreduce函数:

1
2
int MPI_Allreduce(void* input_data_p,void* output_data_p,int count,
MPI_Datatype datatype,MPI_OP operator,MPI_Comm comm);

参数的意义基本与MPI_Reduce相同,不再赘述。
用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include<mpi.h>
#include<bits/stdc++.h>
#include<sys/time.h>
using namespace std;

int main(int argc,char** argv){
int comm_sz,my_rank;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);
int val=my_rank*my_rank,sum=0;
MPI_Allreduce(&val,&sum,1,MPI_INT,MPI_SUM,MPI_COMM_WORLD);
cout<<sum<<endl;
MPI_Finalize();
}

MPI_Bcast

有时候,我们需要将一个进程里的数据发送到通信子中的所有进程,这是可以调用MPI_Bcast函数:

1
int MPI_Bcast(void* data_p,int count,MPI_Datatype datatype,int souce_proc,MPI_Comm comm);

其中,data_p在发送进程里为输入参数,在其他进程里为输出参数,用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include<mpi.h>
#include<bits/stdc++.h>
#include<sys/time.h>
using namespace std;

int main(int argc,char** argv){
int comm_sz,my_rank;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);

if(my_rank==0){
char* s="hello world!\n";
MPI_Bcast(s,strlen(s)+1,MPI_CHAR,0,MPI_COMM_WORLD);
}
else{
char *s=(char*)malloc(100*sizeof(char));
MPI_Bcast(s,14,MPI_CHAR,0,MPI_COMM_WORLD);//注意,这里s还是空字符串,不能用strlen确定长度。
printf("In process %d,s is %s",my_rank,s);
free(s);
}
MPI_Finalize();
}

MPI_Scatter

假如在主进程里有一个长为100的数组,要把它分到10个进程里,其中0号进程分到前10个元素,1号进程分到第10个到第19个元素……这时可以调用MPI_Scatter函数来进行分发:

1
2
3
int MPI_Scatter(void* send_buf_p,int send_count,
MPI_Datatype send_type,void* recv_buf_p,int recv_count,
MPI_Datatype recv_type,int src_proc,MPI_Comm comm);

假如数组大小为n,通信子里共有comm_sz个进程,则MPI_Scatter会把数组分为comm_xz份,每份有$local_n=\frac{n}{comm_xz}$个。此时send_count和recv_count应被置为local_n,因为send_count表示的是发送到每个进程的数据量,recv_count表示每个进程接收到的数据量。

一个示例程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include<mpi.h>
#include<bits/stdc++.h>
#include<sys/time.h>
using namespace std;

int main(int argc,char** argv){
int comm_sz,my_rank;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);
int n=100;
int* b=(int*)malloc(n/comm_sz*sizeof(int));
if(my_rank==0){
int* a=(int*)malloc(n*sizeof(int));
for(int i=0;i<n;i++){
a[i]=i;
}
MPI_Scatter(a,n/comm_sz,MPI_INT,b,n/comm_sz,MPI_INT,0,MPI_COMM_WORLD);
}
else{
MPI_Scatter(NULL,n/comm_sz,MPI_INT,b,n/comm_sz,MPI_INT,0,MPI_COMM_WORLD);
}
for(int i=0;i<n/comm_sz;i++)printf("In process %d, b[%d]=%d\n",my_rank,i,b[i]);
MPI_Finalize();
}

注意,跟MPI_Reduce一样,MPI_Scatter的两个指针参数不能相同,而且非目的进程的send_buff_p可以是NULL。

MPI_Gather

有了把数据发出去的方法,自然会有把数据收集起来的方法,它是MPI_Gather:

1
2
int MPI_Gather(void* send_buf_p,int send_count,MPI_Datatype send_type,void* recv_buf_p,
int recv_count,MPI_Datatype recv_type,int dest_proc,MPI_Comm comm);

用法和注意事项 与MPI_Scatter差不多,不再赘述

MPI_Allgather

MPI_Allgather与 MPI_Gather的关系有点类似MPI_allreduce与MPI_Reduce的关系

1
2
int MPI_Gather(void* send_buf_p,int send_count,MPI_Datatype send_type,void* recv_buf_p,
int recv_count,MPI_Datatype recv_type,int dest_proc,MPI_Comm comm);

用于将数据串联起来,存储到每个进程的recv_buf_p参数中。

综合应用

用上述集合通信的方法实现矩阵向量乘法的并行化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include<mpi.h>
#include<bits/stdc++.h>
#include<sys/time.h>
using namespace std;

void my_rand(double* vec,int size){
for(int i=0;i<size;i++)vec[i]=(double)rand()/RAND_MAX;
}

void Mat_vec_mul2(double* local_A,double* x,double* local_y,int local_m,int n,int local_n,MPI_Comm comm){
int local_i,j,local_ok=1;
for(local_i=0;local_i<local_m;local_i++){
local_y[local_i]=0.0;
for(j=0;j<n;j++)local_y[local_i]+=local_A[local_i*n+j]*x[j];
}
}
int main(int argc,char** argv){
int comm_sz,my_rank;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);

int m,n,local_m,local_n;
if(argc!=3)abort();
sscanf(argv[1],"%d",&m);
sscanf(argv[2],"%d",&n);
local_m=m/comm_sz,local_n=n/comm_sz;
double* local_A,* local_x,* local_y;
local_A=(double*)malloc(local_m*n*sizeof(double));
local_x=(double*)malloc(local_n*sizeof(double));
local_y=(double*)malloc(local_m*sizeof(double));
double *x=(double*)malloc(n*sizeof(double));
if(my_rank==0){
struct timeval tv,tv2;
gettimeofday(&tv,NULL);

double* A=(double*)malloc(m*n*sizeof(double));
double* y=(double*)malloc(m*sizeof(double)),*y2=(double*)malloc(m*sizeof(double));
my_rand(A,m*n),my_rand(x,n);
MPI_Scatter(A,local_m*n,MPI_DOUBLE,local_A,local_m*n,MPI_DOUBLE,0,MPI_COMM_WORLD);
MPI_Bcast(x,n,MPI_DOUBLE,0,MPI_COMM_WORLD);
Mat_vec_mul2(local_A,x,local_y,local_m,n,local_n,MPI_COMM_WORLD);
MPI_Gather(local_y,local_m,MPI_DOUBLE,y,local_m,MPI_DOUBLE,0,MPI_COMM_WORLD);

gettimeofday(&tv2,NULL);
printf("time: %d\n",tv2.tv_sec*1000000 + tv2.tv_usec - tv.tv_sec*1000000 - tv.tv_usec);
// for(int i=0;i<m;i++)cout<<y[i]<<" "<<y2[i]<<endl;
free(A),free(y),free(y2);
}
else{
MPI_Scatter(NULL,local_m*n,MPI_DOUBLE,local_A,local_m*n,MPI_DOUBLE,0,MPI_COMM_WORLD);
MPI_Bcast(x,n,MPI_DOUBLE,0,MPI_COMM_WORLD);
Mat_vec_mul2(local_A,x,local_y,local_m,n,local_n,MPI_COMM_WORLD);
MPI_Gather(local_y,local_m,MPI_DOUBLE,NULL,local_m,MPI_DOUBLE,0,MPI_COMM_WORLD);
}
free(local_A);free(local_y);free(x);
}

虽然这个程序使用了多进程的方式进行了优化,但事实上运行时间远远大于串行的版本:

并行:
image.png

串行:
image.png

MPI派生数据类型

显然,进程之间通信是非常非常慢的。如果主进程要往其他进程广播一个整数,两个浮点数,有没有方法能用一次通信的方法解决呢?
MPI为了解决这个问题,提供了用户自定义的派生数据类型。
我们可以用MPI_Type_create_struct函数创建由不同基本数据类型的元素所组成的派生数据类型:

1
2
int MPI_Type_create_struct(int count,int array_of_clocklengths[],
MPI_Aint array_of_displacements[],MPI_Datatype array_of_types[],MPI_Datatype* new_type_p);

第一个参数表明我们创建的数据类型有多少个元素;第二个参数表示每个元素是单个的数据还是一个数组;第三个参数表示每个元素相对于第一个元素的内存地址偏移量;第四个参数表示每个元素的类型;第5个参数表示创建的类型。

比如我们要创建一种数据类型,包含一个int,两个double,可以这样写:

1
2
3
4
5
6
7
8
9
10
11
12
int a;
double b,c;
MPI_Aint a_addr,b_addr,c_addr;
MPI_Get_address(&a,&a_addr);
MPI_Get_address(&b,&b_addr);
MPI_Get_address(&c,&c_addr);
MPI_Aint array_of_displacements[3]={0,b_addr-a_addr,c_addr-a_addr};
int array_of_blocklengths[3]={1,1,1};
MPI_Datatype array_of_types[3]={MPI_INT,MPI_DOUBLE,MPI_DOUBLE};
MPI_Datatype input_mpi_t;
MPI_Type_create_struct(3,array_of_blocklengths,array_of_displacements,array_of_types,&input_mpi_t);
MPI_Type_commit(&input_mpi_t);

MPI_Get_address()用于获得第一个指针参数的绝对地址,把这个地址赋值给第二个参数指向的地址空间。
再调用完MPI_Type_create_struct()函数之后,还需要调用MPI_Type_commit()函数向通信子提交这一数据类型。之后,就可以像使用普通数据类型一样使用input_mpi_t了:

1
2
3
4
5
6
7
8
  if(my_rank==0){
a=1,b=1.1,c=2.2;
MPI_Bcast(&a,1,input_mpi_t,0,MPI_COMM_WORLD);
}
else{
MPI_Bcast(&a,1,input_mpi_t,0,MPI_COMM_WORLD);
printf("In process %d: a=%d b=%f c=%f\n",my_rank,a,b,c);
}