Commit bffa2317 authored by Shen Yu's avatar Shen Yu
Browse files

modified: ../../openmpi/ompi/mpi/c/bcast.c

	modified:   ../../openmpi/ompi/mpi/c/finalize.c
	modified:   ../../openmpi/ompi/mpi/c/gather.c
	modified:   ../../openmpi/ompi/mpi/c/gatherv.c
	modified:   ../../openmpi/ompi/mpi/c/init.c
	modified:   ../../openmpi/ompi/mpi/c/irecv.c
	modified:   ../../openmpi/ompi/mpi/c/isend.c
	modified:   ../Makefile
	modified:   commset.h
	new file:   commset.inc
	modified:   msglog.h
	deleted:    ../src/GlobalComm.c
	deleted:    ../src/commset.c

    1. Reorganize the program's structure
    2. Fix the bug in commset
parent 045a1ae4
......@@ -131,7 +131,7 @@ int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype,
int LOG_ARGV[3]={root, count, elemsize};
// LOGTHIS is used to avoid logging the communication caused by the log itself
// in init.c
if(LOGTHIS) LOGMSG1(FUNC_ID, call_time, comm, LOG_ARGC, LOG_ARGV);
if(LOGTHIS == 1) LOGMSG1(FUNC_ID, call_time, comm, LOG_ARGC, LOG_ARGV);
addComm(comm, &localComm);
// end
......
......@@ -68,7 +68,7 @@ int MPI_Finalize(void)
free(INT_POOL);
extern char COMMFILE[]; // COMMFILE is initialized in init.c
saveCommMap(&localComm, COMMFILE);
freeCommSet(&localComm);
freeCOMMSET(&localComm);
// end
return ompi_mpi_finalize();
......
......@@ -200,7 +200,7 @@ int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
int LOGMSG_RECVSIZE=LOGMSG_SIZE(recvtype, recvcount, &recvelemsize);
const int LOG_ARGC=5;
int LOG_ARGV[5]={root, sendcount, recvcount, sendelemsize, recvelemsize};
if(LOGTHIS) LOGMSG1(FUNC_ID, call_time, comm, LOG_ARGC, LOG_ARGV);
if(LOGTHIS == 1) LOGMSG1(FUNC_ID, call_time, comm, LOG_ARGC, LOG_ARGV);
addComm(comm, &localComm);
// end
......
......@@ -215,7 +215,7 @@ int MPI_Gatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
int LOGMSG_RECVSIZE=LOGMSG_SIZE_V(size, recvtype, recvcounts, recvelemsizelist);
const int LOG_ARGC=3;
int LOG_ARGV[3]={root, sendcount, sendelemsize};
if(LOGTHIS) LOGMSG2(FUNC_ID, call_time, comm, LOG_ARGC, size, LOG_ARGV, recvelemsizelist);
if(LOGTHIS == 1) LOGMSG2(FUNC_ID, call_time, comm, LOG_ARGC, size, LOG_ARGV, recvelemsizelist);
addComm(comm, &localComm);
// end
......
......@@ -37,7 +37,8 @@
#include <unistd.h>
#include <string.h>
#include <msglog.h>
struct CommSet localComm;
#include <commset.inc>
COMMSET localComm;
struct timeval START_TIME;
FILE* MSGLOGFILE;
char COMMFILE[4096];
......@@ -45,7 +46,7 @@ int LOGMSGMYID;
int* INT_POOL;
int REQ_POOL_SIZE;
int* REQ_POOL;
bool LOGTHIS;
int LOGTHIS;
// end
#if OMPI_BUILD_MPI_PROFILING
......@@ -132,9 +133,9 @@ int MPI_Init(int *argc, char ***argv)
printf("logfilepath=%s\n",logfilepath);
sprintf(COMMFILE, "%s/COMM.txt", logfilepath);
}
LOGTHIS=false;
LOGTHIS=0;
MPI_Bcast(logfilepath, 4000, MPI_CHAR, 0, MPI_COMM_WORLD);
LOGTHIS=true;
LOGTHIS=1;
// every process set its onw file name with hostname and pid
char logfilename[4096];
......@@ -154,7 +155,7 @@ int MPI_Init(int *argc, char ***argv)
INT_POOL=malloc(total_nproc*2*sizeof(int));
REQ_POOL_SIZE=1024;
REQ_POOL=malloc(1024*sizeof(int));
initCommSet(&localComm);
initCOMMSET(&localComm);
addComm(MPI_COMM_WORLD, &localComm);
// end
......
......@@ -95,7 +95,7 @@ int MPI_Irecv(void *buf, int count, MPI_Datatype type, int source,
const int FUNC_ID=14;
int elemsize;
int MSG_SIZE=LOGMSG_SIZE(type, count, &elemsize);
const int request_idx=MPI_Request_c2f(request);
const int request_idx=MPI_Request_c2f(*request);
const int LOG_ARGC=5;
int LOG_ARGV[5]={source, count, elemsize, tag, request_idx};
LOGMSG1(FUNC_ID, call_time, comm, LOG_ARGC, LOG_ARGV);
......
......@@ -108,7 +108,7 @@ int MPI_Isend(const void *buf, int count, MPI_Datatype type, int dest,
const int FUNC_ID=15;
int elemsize;
int MSG_SIZE=LOGMSG_SIZE(type, count, &elemsize);
const int request_idx=MPI_Request_c2f(request);
const int request_idx=MPI_Request_c2f(*request);
const int LOG_ARGC=5;
int LOG_ARGV[5]={dest, count, elemsize, tag, request_idx};
LOGMSG1(FUNC_ID, call_time, comm, LOG_ARGC, LOG_ARGV);
......
OPT = -D_DEBUG -fPIC
OPT = -D_DEBUG
INC = -Iinclude
MPICXX = mpicxx
MPICC = mpicc
all: bin/mpi_player lib/GlobalComm.a lib/GlobalComm.so
all: bin/mpi_player
bin/mpi_player: createMPIComm.o read_run.o mpi_player.o
$(MPICXX) -o bin/mpi_player createMPIComm.o read_run.o mpi_player.o $(OPT)
lib/GlobalComm.so: GlobalComm.o commset.o
gcc -shared -fPIC -o lib/GlobalComm.so GlobalComm.o commset.o
lib/GlobalComm.a: GlobalComm.o commset.o
ar rcsv lib/GlobalComm.a GlobalComm.o commset.o
createMPIComm.o: src/createMPIComm.cpp
$(MPICXX) -c src/createMPIComm.cpp $(OPT) $(INC)
......@@ -30,5 +24,5 @@ commset.o: src/commset.c
$(MPICC) -c src/commset.c $(OPT) $(INC)
clean:
rm bin/mpi_player lib/GlobalComm.* *.o 2>/dev/null
rm bin/mpi_player lib/libGlobalComm.* *.o 2>/dev/null
struct CommSet
#ifndef __COMMSET_H__
#define __COMMSET_H__
typedef struct COMMSET
{
int n; // number of comms in the set
int maxsize; // set maxsize
MPI_Comm* comms; // the array containing the comms
};
} COMMSET;
int initCommSet(struct CommSet *CS);
int addComm(MPI_Comm comm, struct CommSet *CS);
void freeCommSet(struct CommSet *CS);
int initCOMMSET(COMMSET *CS);
int inCOMMSET(MPI_Comm comm, COMMSET *CS);
int addComm(MPI_Comm comm, COMMSET *CS);
void freeCOMMSET(COMMSET *CS);
#endif
\ No newline at end of file
#ifndef __COMMSET_INC__
#define __COMMSET_INC__
#include <stdlib.h>
int initCOMMSET(COMMSET *CS)
{
CS->n = 0;
CS->maxsize = 16;
CS->comms = (MPI_Comm*)malloc(sizeof(MPI_Comm) * CS->maxsize);
if(CS->comms == NULL)
return -1;
else
return 0;
}
int inCOMMSET(MPI_Comm comm, COMMSET *CS)
{
int r = 0;
for(int i = 0; i < CS->n; ++i)
{
if(CS->comms[i] == comm)
{
r = 1;
break;
}
}
return r;
}
int addComm(MPI_Comm comm, COMMSET *CS)
{
if(inCOMMSET(comm, CS) == 1)
return 0;
if(CS->n >= CS->maxsize)
{
CS->maxsize *= 2;
CS->comms = (MPI_Comm*)realloc(CS->comms, sizeof(MPI_Comm) * CS->maxsize);
if(CS->comms == NULL)
{
return -1;
}
}
CS->comms[CS->n] = comm;
++(CS->n);
return 0;
}
void freeCOMMSET(COMMSET *CS)
{
if(CS->comms) free(CS->comms);
CS->n = 0;
CS->maxsize = -1;
}
#endif
......@@ -3,9 +3,9 @@
#define __MSGLOG_H__
#include <stdio.h>
#include <sys/time.h>
#include "commset.h"
#include <commset.h>
extern struct CommSet localComm;
extern COMMSET localComm;
extern FILE* MSGLOGFILE;
extern struct timeval START_TIME;
......
// This file is used to collect all local communicator ids from all processes
// and compare their member processes. If their member processes are same, they
// will be considered as the same communicator and assigned a global communicator
// id, and stored in the local -- global communicator id's map.
// After all local communicators are collected, the map is save to a text file
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
#include <math.h>
#include <msglog.h>
typedef struct COMM
{
int n; // number of members processes associated with each local communicator
int size; // size of members array
int* members; // member processes' global ranks of the communicator
} COMM;
inline void freeCOMMs(int n, COMM c[])
{
for(int i = 0; i < n; ++i)
{
c[i].n = 0;
c[i].size = 0;
free(c[i].members);
}
}
inline int compareCOMM(COMM* comm1, COMM* comm2)
{
int isSame = 1;
if(comm1->n != comm2->n)
{
isSame = 0;
}
else
{
for(int i = 0; i < comm1->n; ++i)
{
if(comm1->members[i] != comm2->members[i])
{
isSame = 0;
break;
}
}
}
return isSame;
}
inline void copyCOMMs(int n, COMM src[], COMM dst[])
{
for(int ic = 0; ic < n; ++ic)
{
dst[ic].n = src[ic].n;
dst[ic].size = src[ic].size;
dst[ic].members = malloc(dst[ic].size * sizeof(COMM));
for(int i = 0; i < src[ic].size; ++i)
dst[ic].members[i] = src[ic].members[i];
}
}
typedef struct ALLCOMM
{
int n; // number all global communicators
int size; // current comms array size
COMM *comms;
} ALLCOMM;
inline int initALLCOMM(ALLCOMM* ac)
{
int nprocs;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
ac->n = 0;
ac->size = 4*ceil(sqrt(nprocs));
ac->comms = (COMM*) malloc(ac->size * sizeof(COMM));
return 0;
}
inline void freeALLCOMM(ALLCOMM* ac)
{
freeCOMMs(ac->n, ac->comms);
free(ac->comms);
ac->n = 0;
ac->size = 0;
}
inline int inALLCOMM(COMM *c, ALLCOMM *ac)
{
int in = 0;
for(int i = 0; i < ac->n; ++i)
{
if(compareCOMM(c, &ac->comms[i]) == 1)
{
in = 1;
break;
}
}
return in;
}
inline int addALLCOMM(COMM *c, ALLCOMM *ac)
{
int r = inALLCOMM(c, ac);
if(r == 0)
{
if(ac->n == ac->size)
{
COMM *oldcomms = ac->comms;
ac->size *= 2;
ac->comms = (COMM*) malloc(ac->size * sizeof(COMM));
copyCOMMs(ac->n, oldcomms, ac->comms);
freeCOMMs(ac->n, ac->comms);
}
copyCOMMs(1, c, &(ac->comms[ac->n]));
++ ac->n;
r = ac->n;
}
return r;
}
inline int buildSendBuf(struct CommSet *CS, MPI_Group group_world, int** p_sendbuf)
{
int N = CS->n; // number of local communicators
int total_n_member = 0; // total number of processes in all local communicators
for(int i = 0; i < N; ++i)
{
int i_size;
MPI_Comm_size(CS->comms[i], &i_size);
total_n_member += i_size;
}
// sendbuf is an array containing 4 parts:
// - length 1: total number of local communicator N
// - length N: local communicators IDs
// - length N: number of processes associated with each local communicator
// - length total_n_member: all processes global ranks of all local communicators
int sendcount = 1 + N + N + total_n_member;
int* sendbuf = malloc(sendcount * sizeof(int));
*p_sendbuf = sendbuf;
sendbuf[0] = N;
if(N > 0)
{
int* localCommID = &sendbuf[1];
int* size = &sendbuf[1+N];
int* ranks = &sendbuf[1+N+N];
for(int i = 0; i < N; ++i)
{
int i_pid = MPI_Comm_c2f(CS->comms[i]);
localCommID[i] = i_pid;
MPI_Comm_size(CS->comms[i], &size[i]);
}
// collect local comm members
int nprocs;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
int *localRank = malloc(nprocs * sizeof(int));
for(int i = 0; i < nprocs; ++i)
localRank[i] = i;
int idx = 0; // index of first communicator's processes list position
for(int i = 0; i < N; ++i)
{
MPI_Group igroup;
MPI_Comm_group(CS->comms[i], &igroup);
MPI_Group_translate_ranks(igroup, size[i], &localRank[0], group_world, &ranks[idx]);
idx += size[i]; // move index to the next communicator's position
}
free(localRank);
}
return sendcount;
}
inline int collectAllCommPara(int sendcount, int sendbuf[], int** p_recvcounts, int** p_recvbuf)
{
int nprocs;
int myid;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
// gather size of member list first
int* recvcounts;
if(myid == 0)
{
recvcounts = malloc(nprocs * sizeof(int));
}
MPI_Gather(&sendcount, 1, MPI_INT, &recvcounts[0], 1, MPI_INT, 0, MPI_COMM_WORLD);
// gather all member list
int* displs;
int* recvbuf;
int recvsize=0;
if(myid == 0)
{
displs = malloc(nprocs * sizeof(int));
displs[0]=0;
for(int i=0; i<nprocs; ++i)
{
recvsize += recvcounts[i];
if(i>0) displs[i] = displs[i-1] + recvcounts[i];
}
recvbuf = malloc(recvsize * sizeof(int));
}
MPI_Gatherv(&sendbuf[0], sendcount, MPI_INT,
&recvbuf[0], &recvcounts[0], &displs[0],
MPI_INT, 0, MPI_COMM_WORLD);
*p_recvcounts = recvcounts;
*p_recvbuf = recvbuf;
if(myid == 0) free(displs);
return recvsize;
}
inline int buildCommList(int buff[], ALLCOMM* ac, int* p_commlist_size, int** p_commlist)
{
const int N = buff[0]; // number of communicators
if(N > 0)
{
int* commlist = *p_commlist;
if(*p_commlist_size < 2 * N * sizeof(int))
{
int* old_commlist = commlist;
commlist = malloc(2 * N * sizeof(int));
for(int i = 0; i < *p_commlist_size; ++i)
commlist[i] = old_commlist[i];
p_commlist = &commlist;
free(old_commlist);
}
int* localCommID = &buff[1];
int* size = &buff[1 + N];
int* ranks = &buff[1 + N + N];
int idx = ac->n + 1;
for(int i = 0; i < N; ++i)
{
// make comm
COMM c;
c.n = N;
c.size = size[i];
c.members = ranks;
// find local comm id and global comm id in ac
commlist[2*i] = localCommID[i];
commlist[2*i + 1] = addALLCOMM(&c, ac);
}
}
return N;
}
inline int saveCommMap(struct CommSet *CS, char FILENAME[])
{
// retrieve every comm from commset and find out
// the member processes global ids and put them
// in the sendbuf
int nprocs;
int myid;
MPI_Group MPI_GROUP_WORLD;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_group(MPI_COMM_WORLD, &MPI_GROUP_WORLD);
int** p_sendbuf;
int sendcount = buildSendBuf(&localComm, MPI_GROUP_WORLD, p_sendbuf);
int* recvcounts;
int* recvbuf;
int recvsize = collectAllCommPara(sendcount, *p_sendbuf, &recvcounts, &recvbuf);
if(myid == 0)
{
FILE* COMMLISTFILE = fopen(FILENAME, "w");
int idx = 0;
ALLCOMM ac;
initALLCOMM(&ac);
for(int i = 0; i < nprocs; ++i)
{
fprintf(COMMLISTFILE, "%d", i);
int commlist_size;
int* commlist;
int n = buildCommList(&recvbuf[idx], &ac, &commlist_size, &commlist);
idx += recvcounts[i];
for(int i = 0; i < n; ++i)
{
fprintf(COMMLISTFILE, " %d %d", commlist[2*i], commlist[2*i + 1]);
}
fprintf(COMMLISTFILE, "\n");
}
fclose(COMMLISTFILE);
}
return 0;
}
#include <stdlib.h>
#include <mpi.h>
#include <commset.h>
int initCommSet(struct CommSet *CS)
{
CS->n=0;
CS->maxsize=16;
CS->comms=(MPI_Comm*)malloc(sizeof(MPI_Comm)*CS->maxsize);
if(CS->comms == NULL)
return -1;
else
return 0;
}
inline int inCommSet(MPI_Comm comm, struct CommSet *CS)
{
int r=0;
for(int i=0; i<CS->n; ++i)
{
if(CS->comms[i] == comm)
{
r=1;
break;
}
}
return r;
}
int addComm(MPI_Comm comm, struct CommSet *CS)
{
if(inCommSet(comm, CS) == 1)
return 0;
if(CS->n < CS->maxsize)
{
CS->comms[CS->n]=comm;
++(CS->n);
}
else
{
int oldmaxsize = CS->maxsize;
MPI_Comm* oldSet = CS->comms;
CS->maxsize *= 2;
CS->comms = (MPI_Comm*)malloc(sizeof(MPI_Comm)*CS->maxsize);
if(CS->comms == NULL)
{
CS->comms = oldSet;
return -1;
}
else
{
for(int i = 0; i < oldmaxsize; ++i)
CS->comms[i] = oldSet[i];
free(oldSet);
CS->comms[CS->n] = comm;
++(CS->n);
}
}
return 0;
}
void freeCommSet(struct CommSet *CS)
{
free(CS->comms);
CS->comms = NULL;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment