题目
MPI实现π值的计算和PSRS排序
算法设计
π值的计算
基本的串行算法
![http://img.blog.csdn.net/20150416
160118625](lab2/clip_image003.jpg)
利用公式arctan(1)=π/4以及(arctan(x))’=1/(1+x^2),在[0, 1]上对f(x)= 4/(1+x^2)求积分,使用积分的定义离散化近似求解。将[0, 1]分成大量的小区间,使用for循环在每个小区间上计算y_i的值,最后求和。
MPI并行算法
在基本的串行算法上,main函数中开始时用MPI_Init初始化,则所有进程都将执行接下来的代码。用MPI_Comm_size获取总进程数,用MPI_Comm_rank获取当前进程id。将y_i的计算并行化,显式地分配for循环,将紧挨着的p个区间的y_i的值分配给p个进程分别计算(而不是将全部区间直接划分给p个处理器,这样可能导致各线程负载不均衡),换言之,当前进程只计算每连续p个区间中的1个。最后用MPI_Reduce将每个进程计算的局部值归约求和,结果在主进程(0号进程)上。
PSRS排序
给定n个元素的数组A[0..n - 1]
,执行以下步骤
(1)均匀划分:将A[0..n - 1]
均匀划分成p段,每个进程pi处理A[(i*n/p..(i+1)*n/p - 1]
(2)局部排序:每个进程pi调用串行排序算法对A[(i*n/p..(i+1)*n/p - 1]
排序
(3)选取样本:每个进程pi从自己的有序子数组A[(i*n/p..(i+1)*n/p - 1]
中等间距选取p个样本元素,存放在samples数组中
(4)样本排序:用主进程global_samples数组收集(MPI_Gather)各个进程的samples数组,对总共的p2个样本元素进行串行排序
(5)选择主元:用主进程从排好序的p2个样本中选取p - 1个主元,并广播(MPI_Bcast)给其他pi
(6)主元划分:每个进程pi按p - 1个主元将自己的有序子数组A[(i*n/p..(i+1)*n/p - 1]
划分成p段(每段长度可能不同),此时,不需要新申请一个二维数组存放p段数据,只需要一个一维数组sizes记录每段的长度和一个一维数组记录每段的起始位置offsets(相对偏移,通过累加前面的段的长度得到)
(7)全局交换:每个进程pi将自己的有序子数组中的每段按段号交换到对应id的进程中,可以利用MPI_Alltoallv函数实现全局交换,之前的长度数组sizes和偏移数组作为参数offsets,另外需要三个数组newsizes、newoffsets和newdatas用于接收数据。
MPI_Alltoallv函数定义如下
int MPI_Alltoallv(const void *sendbuf, 发送缓冲区的起始地址
const int *sendcounts, 数组:每个发送数据的长度
const int *sdispls, 数组:每个发送数据相对于发送缓冲区起始地址的位移量
MPI_Datatype sendtype, 发送的数据类型
void *recvbuf, 接收缓冲区的起始地址
const int *recvcounts, 数组:每个接收数据的长度
const int *rdispls, 数组:每个接收数据相对于接收缓冲区起始地址的位移量
MPI_Datatype recvtype, 接收的数据类型
MPI_Comm comm) 通信域
该函数实现各个进程向各个进程交换不定长的数据
该函数执行效果举例如下,每个数据的长度不定
交换前
A0 A1 A2 A3 A4 A5 --进程0的sizes数组
B0 B1 B2 B3 B4 B5 --进程1的sizes数组
C0 C1 C2 C3 C4 C5 --进程2的sizes数组
D0 D1 D2 D3 D4 D5 --进程3的sizes数组
E0 E1 E2 E3 E4 E5 --进程4的sizes数组
F0 F1 F2 F3 F4 F5 --进程5的sizes数组
交换后
A0 B0 C0 D0 E0 F0 --进程0的newsizes数组
A1 B1 C1 D1 E1 F1 --进程1的newsizes数组
A2 B2 C2 D2 E2 F2 --进程2的newsizes数组
A3 B3 C3 D3 E3 F3 --进程3的newsizes数组
A4 B4 C4 D4 E4 F4 --进程4的newsizes数组
A5 B5 C5 D5 E5 F5 --进程5的newsizes数组
(8)归并排序:每个进程pi对接收到的元素进行归并排序
(9)用主进程收集(MPI_Gatherv)各个进程的新数据,写回A,此时,原数组A已有序
MPI_Gatherv函数定义如下
int MPI_Gatherv(void* sendbuf, 发送缓冲区的起始地址
int sendcount, 发送数据的长度
MPI_Datatype sendtype, 发送的数据类型
void* recvbuf, 接收缓冲区的起始地址
int *recvcounts, 数组:每个接收数据的长度
int *displs, 数组:每个接收数据相对于接收缓冲区起始地址的位移量
MPI_Datatype recvtype, 接收的数据类型
int root, 根进程的id
MPI_Comm comm) 通信域
该函数实现各个进程向根进程汇集不定长的数据
算法分析
π值的计算
设n为[0, 1]之间划分的区间数
对于串行算法,时间复杂度为O(n)
对于4种并行算法,时间复杂度为O(n/p),其中p为处理器数
PSRS排序
(参考文献 Hanmao Shi , Jonathan Schaeffer. Parallel Sorting by Regular Sampling.)
设n为A的元素个数,p为处理器数,w = n/p
则时间复杂度为各阶段时间复杂度之和
若, 近似为
另外,算法第六步每个处理器的w个数据根据p – 1个主元划分,每段的长度可能不相等,因此数组低维的长度不等,无法实现确定,但文献中证明了每段的长度最长不超过2w个元素。
Theorem 1: In phase 3 of PSRS, each processor merges less than 2w elements.
算法源代码
π值的计算
基本的串行算法
#include <stdio.h>
static long num_steps = 100000;//越大值越精确
double step;
void main(){
int i;
double x, pi, sum = 0.0;
step = 1.0 / (double)num_steps;
for(i = 1; i <= num_steps; i++){
x = (i - 0.5) * step;
sum = sum + 4.0/(1.0 + x * x);
}
pi = step * sum;
printf("%lf\n", pi);
}
使用并行域并行化的程序
#include "mpi.h"
#include <stdio.h>
#include <math.h>
static long num_steps = 100000;
int main(int argc, char *argv[])
{
int id, num_processes;
double local_pi, pi, local_sum = 0.0, x;
double step = 1.0 / (double)num_steps;
double t1, t2;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &num_processes);
MPI_Comm_rank(MPI_COMM_WORLD, &id);
if(id == 0) //主进程
t1 = MPI_Wtime();
for(int i = id + 1; i <= num_steps; i += num_processes){ //各线程计算自己部分的面积local_pi
x = step * (i - 0.5);
local_sum = local_sum + 4.0 / (1.0 + x * x);
}
local_pi = local_sum * step;
MPI_Reduce(&local_pi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); //归约local_pi到pi
if(id == 0){
t2 = MPI_Wtime();
printf("time: %fs\n", t2 - t1);
printf("PI is approximately %.16f\n", pi);
}
MPI_Finalize();
return 0;
}
PSRS排序
#include <stdlib.h>
#include <stdio.h>
#include "mpi.h"
#define INF 2147483647
/**************************************************
* 合并两个已排好序的子数组A[l : m], A[m + 1 : r], 写回A[l : r]
***************************************************/
void Merge(int *A, int l, int m, int r){
int i, j, k, n1 = m - l + 1, n2 = r - m;
int *L = (int *)malloc((n1 + 1) * sizeof(int));
int *R = (int *)malloc((n2 + 1) * sizeof(int));
for(i = 0; i < n1; i++) L[i] = A[l + i];
for(j = 0; j < n2; j++) R[j] = A[m + 1 + j];
L[i] = R[j] = INF;
i = j = 0;
for(k = l; k <= r; k++) if(L[i] <= R[j]) A[k] = L[i++]; else A[k] = R[j++];
free(L); free(R);
}
/**************************************************
* 对A[l : r]进行归并排序
***************************************************/
void MergeSort(int *A, int l, int r){
if(l < r){
int m = (l + r) / 2;
MergeSort(A, l, m);
MergeSort(A, m + 1, r);
Merge(A, l, m, r);
}
}
/**************************************************
* 对A[0 : n - 1]进行PSRS排序
***************************************************/
void PSRS(int *A, int n, int id, int num_processes){ //每个进程都会执行这个函数,这里面的变量每个进程都有一份,因此都是局部的(对于当前进程而言)
int per;
int *samples, *global_samples; //global表示这个变量是主进程会使用的,但事实上每个进程都声明了
int *pivots;
int *sizes, *newsizes;
int *offsets, *newoffsets;
int *newdatas;
int newdatassize;
int *global_sizes;
int *global_offsets;
//-------------------------------------------------------------------------------------------------------------------
per = n / num_processes; //A的n个元素划分为num_processes段,每个进程处理per个元素
samples = (int *)malloc(num_processes * sizeof(int)); //当前进程的采样数组
pivots = (int *)malloc(num_processes * sizeof(int)); //num_processes - 1 个主元,最后一个设为INF,作为哨兵
if(id == 0){ //主进程申请全局采样数组
global_samples = (int *)malloc(num_processes * num_processes * sizeof(int)); //正则采样数为 num_processes * num_processes
}
MPI_Barrier(MPI_COMM_WORLD);//设置路障,同步所有进程
//-------------------------------------------------------------------------------------------------------------------
//(1)均匀划分,当前进程对A中属于自己的部分进行串行归并排序
MergeSort(A, id * per, (id + 1) * per - 1); //这里没有把A中对应当前进程的数据复制到当前进程,而是直接对A部分排序
//(2)正则采样,当前进程选出 num_processes 个样本放在local_sample中
for(int k = 0; k < num_processes; k++)
samples[k] = A[id * per + k * per / num_processes];
//主进程的sample收集各进程的local_sample,共 num_processes * num_processes 个
MPI_Gather(samples, num_processes, MPI_INT, global_samples, num_processes, MPI_INT, 0, MPI_COMM_WORLD);
//-------------------------------------------------------------------------------------------------------------------
//(3)采样排序 (4)选择主元
if(id == 0){ //主进程
MergeSort(global_samples, 0, num_processes * num_processes - 1); //对采样的num_processes * num_processes个样本进行排序
for(int k = 0; k < num_processes - 1; k++) //选出num_processes - 1个主元
pivots[k] = global_samples[(k + 1) * num_processes];
pivots[num_processes - 1] = INF; //哨兵
}
//主进程向各个进程广播,所有进程拥有一样的pivots数组
MPI_Bcast(pivots, num_processes, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
//-------------------------------------------------------------------------------------------------------------------
sizes = (int *)calloc(num_processes, sizeof(int)); //当前进程的per个元素根据主元划分之后的每段的长度,用calloc分配后自动初始化为0
offsets = (int *)calloc(num_processes, sizeof(int)); //当前进程的per个元素根据主元划分之后的每段的起始位置,用calloc分配后自动初始化为0
newsizes = (int *)calloc(num_processes, sizeof(int)); //当前进程在进行全局交换之后的每段的长度,用calloc分配后自动初始化为0
newoffsets = (int *)calloc(num_processes, sizeof(int)); //当前进程在进行全局交换之后的每段的起始位置,用calloc分配后自动初始化为0
//(5)主元划分
for(int k = 0, j = id * per; j < id * per + per; j++){ //当前进程对自己的per个元素按主元划分为num_processes段,此处不处理数据,只计算每段大小
if(A[j] < pivots[k]) //如果之前不用哨兵,最后一段要单独考虑
sizes[k]++;
else
sizes[++k]++;
}
//(6)全局交换
//多对多全局交换消息,首先每个进程向每个接收者发送接收者对应的【段的大小】
MPI_Alltoall(sizes, 1, MPI_INT, newsizes, 1, MPI_INT, MPI_COMM_WORLD);
//计算原来的段偏移数组,新的段偏移数组,新的数据大小
newdatassize = newsizes[0];
for(int k = 1; k < num_processes; k++){
offsets[k] = offsets[k - 1] + sizes[k - 1];
newoffsets[k] = newoffsets[k - 1] + newsizes[k - 1];
newdatassize += newsizes[k];
}
//申请当前进程新的数据空间
newdatas = (int *)malloc(newdatassize * sizeof(int)); //当前进程在进行全局交换之后的数据,是由交换后的各段组合而成的
//多对多全局交换消息,每个进程向每个接收者发送接收者对应的【段】
MPI_Alltoallv(&(A[id * per]), sizes, offsets, MPI_INT, newdatas, newsizes, newoffsets, MPI_INT, MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
//-------------------------------------------------------------------------------------------------------------------
//(7)当前进程归并排序自己的新数据
MergeSort(newdatas, 0, newdatassize - 1);
MPI_Barrier(MPI_COMM_WORLD);
//(8)主进程收集各个进程的数据,写回A
//首先收集各进程新数据的大小
if(id == 0)
global_sizes = (int *)calloc(num_processes, sizeof(int));
MPI_Gather(&newdatassize, 1, MPI_INT, global_sizes, 1, MPI_INT, 0, MPI_COMM_WORLD);
//主进程计算即将搜集的各进程数据的起始位置
if(id == 0){
global_offsets = (int *)calloc(num_processes, sizeof(int));
for(int k = 1; k < num_processes; k++)
global_offsets[k] = global_offsets[k - 1] + global_sizes[k - 1];
}
//主进程收集各个进程的数据
MPI_Gatherv(newdatas, newdatassize, MPI_INT, A, global_sizes, global_offsets, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
//-------------------------------------------------------------------------------------------------------------------
//销毁动态数组
free(samples); samples = NULL;
free(pivots); pivots = NULL;
free(sizes); sizes = NULL;
free(offsets); offsets = NULL;
free(newdatas); newdatas = NULL;
free(newsizes); newsizes = NULL;
free(newoffsets); newoffsets = NULL;
if(id == 0){
free(global_samples); global_samples = NULL;
free(global_sizes); global_sizes = NULL;
free(global_offsets); global_offsets = NULL;
}
}
int main(int argc, char *argv[]){
int A[27] = {15, 46, 48, 93, 39, 6, 72, 91, 14, 36, 69, 40, 89, 61, 97, 12, 21, 54, 53, 97, 84, 58, 32, 27, 33, 72, 20};
double t1, t2;
int id, num_processes;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &num_processes); //获取进程数
MPI_Comm_rank(MPI_COMM_WORLD, &id); //获取当前进程id
if(id == 0)
t1 = MPI_Wtime();
PSRS(A, 27, id, num_processes);
if(id == 0){
t2 = MPI_Wtime();
printf("time: %lfs\n", t2 - t1);
for(int i = 0; i < 27; i++)
printf("%d ", A[i]);
printf("\n");
}
MPI_Finalize();
return 0;
}