Commit 1358ba8f authored by Shen Yu's avatar Shen Yu
Browse files

modified: openmpi/ompi/mpi/c/finalize.c

	modified:   replayer/Makefile
	deleted:    replayer/include/GlobalComm.h
	modified:   replayer/include/commset.h
	renamed:    replayer/include/_MSGLOG_.h -> replayer/include/msglog.h
	new file:   replayer/src/GlobalComm.c
	deleted:    replayer/src/GlobalComm.cpp
	modified:   replayer/src/commset.c

    rewrite GlobalComm with standard c instead of c++, so it can be used
    inside openmpi
parent ef454e59
......@@ -67,7 +67,7 @@ int MPI_Finalize(void)
fclose(MSGLOGFILE);
free(INT_POOL);
extern char COMMFILE[]; // COMMFILE is initialized in init.c
saveGlobalCommMap(&localComm, COMMFILE);
saveCommMap(&localComm, COMMFILE);
freeCommSet(&localComm);
// end
......
OPT = -D_DEBUG
OPT = -D_DEBUG -fPIC
INC = -Iinclude
MPICXX = mpicxx
MPICC = mpicc
all: bin/mpi_player lib/GlobalComm.a
all: bin/mpi_player lib/GlobalComm.a lib/GlobalComm.so
bin/mpi_player: createMPIComm.o read_run.o mpi_player.o
[ -e bin ] || mkdir bin; $(MPICXX) -o bin/mpi_player createMPIComm.o read_run.o mpi_player.o $(OPT)
$(MPICXX) -o bin/mpi_player createMPIComm.o read_run.o mpi_player.o $(OPT)
lib/GlobalComm.so: GlobalComm.o commset.o
gcc -shared -fPIC -o lib/GlobalComm.so GlobalComm.o commset.o
lib/GlobalComm.a: GlobalComm.o commset.o
[ -e lib ] || mkdir lib; ar rcsv lib/GlobalComm.a GlobalComm.o commset.o
ar rcsv lib/GlobalComm.a GlobalComm.o commset.o
createMPIComm.o: src/createMPIComm.cpp
$(MPICXX) -c src/createMPIComm.cpp $(OPT) $(INC)
......@@ -20,12 +23,12 @@ read_run.o: src/read_run.cpp
mpi_player.o: src/mpi_player.cpp
$(MPICXX) -c src/mpi_player.cpp $(OPT) $(INC)
GlobalComm.o: src/GlobalComm.cpp
$(MPICXX) -c src/GlobalComm.cpp $(OPT) $(INC)
GlobalComm.o: src/GlobalComm.c
$(MPICC) -c src/GlobalComm.c $(OPT) $(INC)
commset.o: src/commset.c
$(MPICC) -c src/commset.c $(OPT) $(INC)
clean:
rm bin/mpi_player lib/GlobalComm.a *.o 2>/dev/null
rm bin/mpi_player lib/GlobalComm.* *.o 2>/dev/null
// collect all communicators and make a local <-> glocal communication id map
// and save it in a file.
int saveGlobalCommMap(struct CommSet *CS, char FILENAME[]);
......@@ -5,10 +5,8 @@ struct CommSet
MPI_Comm* comms; // the array containing the comms
};
inline int initCommSet(struct CommSet *CS);
int initCommSet(struct CommSet *CS);
inline int inCommSet(MPI_Comm comm, struct CommSet *CS);
int addComm(MPI_Comm comm, struct CommSet *CS);
inline int addComm(MPI_Comm comm, struct CommSet *CS);
inline void freeCommSet(struct CommSet *CS);
void freeCommSet(struct CommSet *CS);
......@@ -13,7 +13,7 @@ extern int LOGMSGMYID;
extern int* INT_POOL;
extern int REQ_POOL_SIZE;
extern int* REQ_POOL;
extern bool LOGTHIS;
extern int LOGTHIS;
//inline void LOGMSG(const char FUNCNAME[], const struct timeval call_time, const MPI_Comm COMM,
// const int DSTPROC, const int SENDSIZE,
// const int SRCPROC, const int RECVSIZE);
......
// This file is used to collect all local communicator ids from all processes
// and compare their member processes. If their member processes are same, they
// will be considered as the same communicator and assigned a global communicator
// id, and stored in the local -- global communicator id's map.
// After all local communicators are collected, the map is save to a text file
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
#include <math.h>
#include <msglog.h>
typedef struct COMM
{
int n; // number of members processes associated with each local communicator
int size; // size of members array
int* members; // member processes' global ranks of the communicator
} COMM;
inline void freeCOMMs(int n, COMM c[])
{
for(int i = 0; i < n; ++i)
{
c[i].n = 0;
c[i].size = 0;
free(c[i].members);
}
}
inline int compareCOMM(COMM* comm1, COMM* comm2)
{
int isSame = 1;
if(comm1->n != comm2->n)
{
isSame = 0;
}
else
{
for(int i = 0; i < comm1->n; ++i)
{
if(comm1->members[i] != comm2->members[i])
{
isSame = 0;
break;
}
}
}
return isSame;
}
inline void copyCOMMs(int n, COMM src[], COMM dst[])
{
for(int ic = 0; ic < n; ++ic)
{
dst[ic].n = src[ic].n;
dst[ic].size = src[ic].size;
dst[ic].members = malloc(dst[ic].size * sizeof(COMM));
for(int i = 0; i < src[ic].size; ++i)
dst[ic].members[i] = src[ic].members[i];
}
}
typedef struct ALLCOMM
{
int n; // number all global communicators
int size; // current comms array size
COMM *comms;
} ALLCOMM;
inline int initALLCOMM(ALLCOMM* ac)
{
int nprocs;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
ac->n = 0;
ac->size = 4*ceil(sqrt(nprocs));
ac->comms = (COMM*) malloc(ac->size * sizeof(COMM));
return 0;
}
inline void freeALLCOMM(ALLCOMM* ac)
{
freeCOMMs(ac->n, ac->comms);
free(ac->comms);
ac->n = 0;
ac->size = 0;
}
inline int inALLCOMM(COMM *c, ALLCOMM *ac)
{
int in = 0;
for(int i = 0; i < ac->n; ++i)
{
if(compareCOMM(c, &ac->comms[i]) == 1)
{
in = 1;
break;
}
}
return in;
}
inline int addALLCOMM(COMM *c, ALLCOMM *ac)
{
int r = inALLCOMM(c, ac);
if(r == 0)
{
if(ac->n == ac->size)
{
COMM *oldcomms = ac->comms;
ac->size *= 2;
ac->comms = (COMM*) malloc(ac->size * sizeof(COMM));
copyCOMMs(ac->n, oldcomms, ac->comms);
freeCOMMs(ac->n, ac->comms);
}
copyCOMMs(1, c, &(ac->comms[ac->n]));
++ ac->n;
r = ac->n;
}
return r;
}
inline int buildSendBuf(struct CommSet *CS, MPI_Group group_world, int** p_sendbuf)
{
int N = CS->n; // number of local communicators
int total_n_member = 0; // total number of processes in all local communicators
for(int i = 0; i < N; ++i)
{
int i_size;
MPI_Comm_size(CS->comms[i], &i_size);
total_n_member += i_size;
}
// sendbuf is an array containing 4 parts:
// - length 1: total number of local communicator N
// - length N: local communicators IDs
// - length N: number of processes associated with each local communicator
// - length total_n_member: all processes global ranks of all local communicators
int sendcount = 1 + N + N + total_n_member;
int* sendbuf = malloc(sendcount * sizeof(int));
*p_sendbuf = sendbuf;
sendbuf[0] = N;
if(N > 0)
{
int* localCommID = &sendbuf[1];
int* size = &sendbuf[1+N];
int* ranks = &sendbuf[1+N+N];
for(int i = 0; i < N; ++i)
{
int i_pid = MPI_Comm_c2f(CS->comms[i]);
localCommID[i] = i_pid;
MPI_Comm_size(CS->comms[i], &size[i]);
}
// collect local comm members
int nprocs;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
int *localRank = malloc(nprocs * sizeof(int));
for(int i = 0; i < nprocs; ++i)
localRank[i] = i;
int idx = 0; // index of first communicator's processes list position
for(int i = 0; i < N; ++i)
{
MPI_Group igroup;
MPI_Comm_group(CS->comms[i], &igroup);
MPI_Group_translate_ranks(igroup, size[i], &localRank[0], group_world, &ranks[idx]);
idx += size[i]; // move index to the next communicator's position
}
free(localRank);
}
return sendcount;
}
inline int collectAllCommPara(int sendcount, int sendbuf[], int** p_recvcounts, int** p_recvbuf)
{
int nprocs;
int myid;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
// gather size of member list first
int* recvcounts;
if(myid == 0)
{
recvcounts = malloc(nprocs * sizeof(int));
}
MPI_Gather(&sendcount, 1, MPI_INT, &recvcounts[0], 1, MPI_INT, 0, MPI_COMM_WORLD);
// gather all member list
int* displs;
int* recvbuf;
int recvsize=0;
if(myid == 0)
{
displs = malloc(nprocs * sizeof(int));
displs[0]=0;
for(int i=0; i<nprocs; ++i)
{
recvsize += recvcounts[i];
if(i>0) displs[i] = displs[i-1] + recvcounts[i];
}
recvbuf = malloc(recvsize * sizeof(int));
}
MPI_Gatherv(&sendbuf[0], sendcount, MPI_INT,
&recvbuf[0], &recvcounts[0], &displs[0],
MPI_INT, 0, MPI_COMM_WORLD);
*p_recvcounts = recvcounts;
*p_recvbuf = recvbuf;
if(myid == 0) free(displs);
return recvsize;
}
inline int buildCommList(int buff[], ALLCOMM* ac, int* p_commlist_size, int** p_commlist)
{
const int N = buff[0]; // number of communicators
if(N > 0)
{
int* commlist = *p_commlist;
if(*p_commlist_size < 2 * N * sizeof(int))
{
int* old_commlist = commlist;
commlist = malloc(2 * N * sizeof(int));
for(int i = 0; i < *p_commlist_size; ++i)
commlist[i] = old_commlist[i];
p_commlist = &commlist;
free(old_commlist);
}
int* localCommID = &buff[1];
int* size = &buff[1 + N];
int* ranks = &buff[1 + N + N];
int idx = ac->n + 1;
for(int i = 0; i < N; ++i)
{
// make comm
COMM c;
c.n = N;
c.size = size[i];
c.members = ranks;
// find local comm id and global comm id in ac
commlist[2*i] = localCommID[i];
commlist[2*i + 1] = addALLCOMM(&c, ac);
}
}
return N;
}
inline int saveCommMap(struct CommSet *CS, char FILENAME[])
{
// retrieve every comm from commset and find out
// the member processes global ids and put them
// in the sendbuf
int nprocs;
int myid;
MPI_Group MPI_GROUP_WORLD;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_group(MPI_COMM_WORLD, &MPI_GROUP_WORLD);
int** p_sendbuf;
int sendcount = buildSendBuf(&localComm, MPI_GROUP_WORLD, p_sendbuf);
int* recvcounts;
int* recvbuf;
int recvsize = collectAllCommPara(sendcount, *p_sendbuf, &recvcounts, &recvbuf);
if(myid == 0)
{
FILE* COMMLISTFILE = fopen(FILENAME, "w");
int idx = 0;
ALLCOMM ac;
initALLCOMM(&ac);
for(int i = 0; i < nprocs; ++i)
{
fprintf(COMMLISTFILE, "%d", i);
int commlist_size;
int* commlist;
int n = buildCommList(&recvbuf[idx], &ac, &commlist_size, &commlist);
idx += recvcounts[i];
for(int i = 0; i < n; ++i)
{
fprintf(COMMLISTFILE, " %d %d", commlist[2*i], commlist[2*i + 1]);
}
fprintf(COMMLISTFILE, "\n");
}
fclose(COMMLISTFILE);
}
return 0;
}
// collect all communicators and make a local <-> glocal communication id map
// and save it in a file.
#include <vector>
#include <set>
#include <map>
#include <fstream>
#include <mpi.h>
extern "C"
{
#include "_MSGLOG_.h"
}
using namespace std;
// every process build it communicator member list
// the list will be sent to process 0 and used to
// make the local <-> global communication id map
// eg. those local communicators with same process
// members will be considered as the same global
// communicator althrough they may be not the same
// in original program
inline int buildSendBuf(struct CommSet *CS, MPI_Group group_world, vector<int>& sendbuf)
{
int N=CS->n; // number of local comms
int total_n_member=0; // total number of processes in all local comms
for(int i=0; i<N; ++i)
{
int i_size;
MPI_Comm_size(CS->comms[i], &i_size);
total_n_member += i_size;
}
// sendbuf has 4 parts, the first one is a int number which is the
// number of local communicators, the second one is a int array who
// is the local communicators' IDs, the third part is also a int
// array who is the number of the processes associated with each
// communicator, and the final part are the processes global ranks
const int sendcount=1+N+N+total_n_member;
sendbuf.resize(sendcount);
sendbuf[0]=N;
if(N>0)
{
int *localCommID=&sendbuf[1];
int *size=&sendbuf[1+N];
int *ranks=&sendbuf[1+N+N];
for(int i=0; i<N; ++i)
{
int i_pid=MPI_Comm_c2f(CS->comms[i]);
localCommID[i]=i_pid;
MPI_Comm_size(CS->comms[i], &size[i]);
}
// collect local comm members
int nprocs;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
vector<int> localRank(nprocs);
for(int i=0; i<nprocs; ++i) localRank[i]=i;
int idx=0; // index of first communicator's processes list position
for(int i=0; i<N; ++i)
{
MPI_Group igroup;
MPI_Comm_group(CS->comms[i], &igroup);
MPI_Group_translate_ranks(igroup, size[i], &localRank[0], group_world, &ranks[idx]);
idx += size[i]; // move index to the next communicator's position
}
}
return sendcount;
}
inline int collectAllCommPara(int sendcount, vector<int>& sendbuf, vector<int>& recvcounts, vector<int>& recvbuf)
{
int nprocs;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
int myid;
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
// gather size of member list first
if(myid == 0) recvcounts.resize(nprocs);
MPI_Gather(&sendcount, 1, MPI_INT, &recvcounts[0], 1, MPI_INT, 0, MPI_COMM_WORLD);
// gather all member list
vector<int> displs;
int recvsize=0;
if(myid == 0)
{
displs.resize(nprocs);
displs[0]=0;
for(int i=0; i<nprocs; ++i)
{
recvsize += recvcounts[i];
if(i>0) displs[i]=displs[i-1]+recvcounts[i];
}
recvbuf.resize(recvsize);
}
MPI_Gatherv(&sendbuf[0], sendcount, MPI_INT,
&recvbuf[0], &recvcounts[0], &displs[0],
MPI_INT, 0, MPI_COMM_WORLD);
return recvsize;
}
inline int buildCommMap(int* buff, vector<int>& commlist)
{
static map<set<int>, int> ranks_gid;
const int ncomm=buff[0]; // number of communicators
commlist.push_back(ncomm);
if(ncomm>0)
{
int* localCommID=&buff[1];
int* size=&buff[1+ncomm];
int* ranks=&buff[1+ncomm+ncomm];
int idx=0;
set<int> key;
for(int i=0; i<ncomm; ++i)
{
// make map key with ranks list of each comm
const int iCommID=localCommID[i];
const int iSize=size[i];
int* irank=&ranks[idx];
for(int j=0; j<iSize; ++j)
{
key.insert(irank[j]);
}
idx+=iSize;
// check whether the key is existed
int gCommID;
auto v=ranks_gid.find(key);
if(v != ranks_gid.end()) // the comm is already existed
{
gCommID=v->second;
}
else
{
gCommID=ranks_gid.size()+1;
ranks_gid[key]=gCommID;
}
commlist.push_back(iCommID);
commlist.push_back(gCommID);
}
}
return commlist.size();
}
int saveGlobalCommMap(struct CommSet *CS, char FILENAME[])
{
// retrieve every comm from commset and find out
// the member processes global ids and put them
// in the sendbuf
int nprocs;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
int myid;
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Group MPI_GROUP_WORLD;
MPI_Comm_group(MPI_COMM_WORLD, &MPI_GROUP_WORLD);
vector<int> sendbuf;
int sendcount = buildSendBuf(CS, MPI_GROUP_WORLD, sendbuf);
// process 0 gather all comm member lists from all
// processes and put them int a global comm set
// whose key is the members
vector<int> recvcounts;
vector<int> recvbuf;
collectAllCommPara(sendcount, sendbuf, recvcounts, recvbuf);
// save the local <-> global map to a file
// every line has the map for one process
// the first number is the process id
// the following number pairs are the local/global
// comm id pairs
if(myid == 0)
{
ofstream fCommMap;
fCommMap.open(FILENAME, ios::out);
int idx=0;
for(int iproc=0; iproc<nprocs; ++iproc)
{
int* buff=&recvbuf[idx]; // buffer with received data from process i
vector<int> commlist;
int n=buildCommMap(buff, commlist);
for(int i=0; i<n; ++i)
fCommMap<<commlist[i]<<" ";
fCommMap<<endl;
idx += recvcounts[iproc];
}
fCommMap.close();
}
return 0;
}
......@@ -2,8 +2,7 @@
#include <mpi.h>
#include <commset.h>
inline int initCommSet(struct CommSet *CS)
int initCommSet(struct CommSet *CS)
{
CS->n=0;
CS->maxsize=16;
......@@ -28,7 +27,7 @@ inline int inCommSet(MPI_Comm comm, struct CommSet *CS)
return r;
}
inline int addComm(MPI_Comm comm, struct CommSet *CS)
int addComm(MPI_Comm comm, struct CommSet *CS)
{
if(inCommSet(comm, CS) == 1)
return 0;
......@@ -39,29 +38,29 @@ inline int addComm(MPI_Comm comm, struct CommSet *CS)
}
else
{
int oldmaxsize=CS->maxsize;
MPI_Comm* oldSet=CS->comms;
int oldmaxsize = CS->maxsize;
MPI_Comm* oldSet = CS->comms;
CS->maxsize *= 2;
CS->comms=(MPI_Comm*)malloc(sizeof(MPI_Comm)*CS->maxsize);
CS->comms = (MPI_Comm*)malloc(sizeof(MPI_Comm)*CS->maxsize);
if(CS->comms == NULL)
{
CS->comms=oldSet;
CS->comms = oldSet;
return -1;
}
else
{
for(int i=0; i<oldmaxsize; ++i)
CS->comms[i]=oldSet[i];
for(int i = 0; i < oldmaxsize; ++i)
CS->comms[i] = oldSet[i];
free(oldSet);
CS->comms[CS->n]=comm;
CS->comms[CS->n] = comm;
++(CS->n);
}
}
return 0;
}
inline void freeCommSet(struct CommSet *CS)
void freeCommSet(struct CommSet *CS)
{
free(CS->comms);
CS->comms=NULL;
CS->comms = NULL;
}
Markdown is supported
0% or .
You are about to add 0 p