Commit e6809943 authored by Shen Yu's avatar Shen Yu
Browse files

first workable version for record mpi logs with comms

parent bffa2317
# Prerequisites
*.d
# Compiled Object files
*.slo
*.lo
*.o
*.obj
# Precompiled Headers
*.gch
*.pch
# Compiled Dynamic libraries
*.so
*.dylib
*.dll
*.a
# Fortran module files
*.mod
*.smod
# Compiled Static libraries
*.lai
*.la
*.a
*.lib
# Executables
*.exe
*.out
*.app
// This file is used to collect all local communicator ids from all processes
// and compare their member processes. If their member processes are same, they
// will be considered as the same communicator and assigned a global communicator
// id, and stored in the local -- global communicator id's map.
// After all local communicators are collected, the map is save to a text file
#ifndef __GLOBALCOMM_H__
#define __GLOBALCOMM_H__
#include <stdlib.h>
#include <math.h>
typedef struct COMM
{
int n; // number of members processes associated with each local communicator
int size; // size of members array
int* members; // member processes' global ranks of the communicator
} COMM;
#ifdef _DEBUG
void printCOMMmembers(COMM* c)
{
for(int i = 0; i < c->n; ++i)
printf(" %d", c->members[i]);
printf("\n");
}
#endif
void freeCOMMs(int n, COMM c[])
{
for(int i = 0; i < n; ++i)
{
c[i].n = 0;
c[i].size = 0;
free(c[i].members);
}
}
int compareCOMM(COMM* comm1, COMM* comm2)
{
int isSame = 1;
if(comm1->n != comm2->n)
{
isSame = 0;
}
else
{
for(int i = 0; i < comm1->n; ++i)
{
if(comm1->members[i] != comm2->members[i])
{
isSame = 0;
break;
}
}
}
return isSame;
}
void copyCOMMs(int nc, COMM src[], COMM dst[])
{
for(int ic = 0; ic < nc; ++ic)
{
dst[ic].n = src[ic].n;
dst[ic].size = src[ic].size;
dst[ic].members = (int*) malloc(dst[ic].size * sizeof(COMM));
for(int i = 0; i < src[ic].size; ++i)
dst[ic].members[i] = src[ic].members[i];
}
}
typedef struct ALLCOMM
{
int n; // number all global communicators
int size; // current comms array size
COMM *comms;
} ALLCOMM;
int initALLCOMM(int NPROC, ALLCOMM* ac)
{
ac->n = 0;
ac->size = 4*ceil(sqrt(NPROC));
ac->comms = (COMM*) malloc(ac->size * sizeof(COMM));
return 0;
}
void freeALLCOMM(ALLCOMM* ac)
{
freeCOMMs(ac->n, ac->comms);
free(ac->comms);
ac->n = 0;
ac->size = 0;
}
int inALLCOMM(COMM *c, ALLCOMM *ac)
{
int in = -1;
for(int i = 0; i < ac->n; ++i)
{
if(compareCOMM(c, &ac->comms[i]) == 1)
{
in = i;
break;
}
}
return in;
}
int addALLCOMM(COMM *c, ALLCOMM *ac)
{
int r = inALLCOMM(c, ac);
if(r < 0)
{
if(ac->n == ac->size)
{
COMM *oldcomms = ac->comms;
ac->size *= 2;
ac->comms = (COMM*) malloc(ac->size * sizeof(COMM));
copyCOMMs(ac->n, oldcomms, ac->comms);
freeCOMMs(ac->n, ac->comms);
}
copyCOMMs(1, c, &(ac->comms[ac->n]));
r = ac->n;
++ ac->n;
}
return r;
}
int buildSendBuf(COMMSET *CS, MPI_Group group_world, int** p_sendbuf)
{
int N = CS->n; // number of local communicators
#ifdef _DEBUG
int myid;
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
printf("myid = %d : start buildSendBuf, CS->n = %d\n", myid, CS->n);
#endif
int total_n_member = 0; // total number of processes in all local communicators
const N = CS->n;
for(int i = 0; i < N; ++i)
{
total_n_member += CS->commsize[i];
}
#ifdef _DEBUG
printf("myid = %d : total_n_member = %d\n", myid, total_n_member);
#endif
// sendbuf is an array containing 4 parts:
// - length 1: total number of local communicator N
// - length N: local communicators IDs
// - length N: number of processes associated with each local communicator
// - length total_n_member: all processes global ranks of all local communicators
int sendcount = 1 + N + N + total_n_member;
int* sendbuf = (int*) malloc(sendcount * sizeof(int));
sendbuf[0] = N;
if(N > 0)
{
int* localCommID = &sendbuf[1];
int* size = &sendbuf[1+N];
int* ranks = &sendbuf[1+N+N];
int idx = 0;
for(int i = 0; i < N; ++i)
{
localCommID[i] = CS->commid[i];
size[i] = CS->commsize[i];
for(int j = 0; j < size[i]; ++j)
ranks[idx + j] = CS->commranks[i][j];
idx += size[i];
}
}
*p_sendbuf = sendbuf;
return sendcount;
}
int collectAllCommPara(int sendcount, int sendbuf[], int** p_recvcounts, int** p_recvbuf)
{
int nprocs;
int myid;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
// gather size of member list first
int* recvcounts;
if(myid == 0)
{
recvcounts = (int*) malloc(nprocs * sizeof(int));
}
MPI_Gather(&sendcount, 1, MPI_INT, &recvcounts[0], 1, MPI_INT, 0, MPI_COMM_WORLD);
// gather all member list
int* displs;
int* recvbuf;
int recvsize=0;
if(myid == 0)
{
displs = (int*) malloc(nprocs * sizeof(int));
displs[0]=0;
for(int i=0; i<nprocs; ++i)
{
recvsize += recvcounts[i];
if(i>0) displs[i] = displs[i-1] + recvcounts[i];
}
recvbuf = (int*) malloc(recvsize * sizeof(int));
}
MPI_Gatherv(&sendbuf[0], sendcount, MPI_INT,
&recvbuf[0], &recvcounts[0], &displs[0],
MPI_INT, 0, MPI_COMM_WORLD);
*p_recvcounts = recvcounts;
*p_recvbuf = recvbuf;
if(myid == 0) free(displs);
return recvsize;
}
int buildCommList(int buff[], ALLCOMM* p_ac, int* p_commlist_size, int** p_commlist)
{
int N = buff[0]; // number of communicators
#ifdef _DEBUG
printf("start buildCommList, number of communicators is %d\n", N);
#endif
if(N > 0)
{
int* commlist = (int*) realloc(*p_commlist, 2 * N * sizeof(int));
int* localCommID = &buff[1];
int* size = &buff[1 + N];
int* ranks = &buff[1 + N + N];
//int idx = ac->n + 1;
int r_idx = 0;
COMM c_tmp;
for(int i = 0; i < N; ++i)
{
// make comm
c_tmp.n = size[i];
c_tmp.size = size[i];
c_tmp.members = &ranks[r_idx];
r_idx += c_tmp.size;
// find local comm id and global comm id in ac
commlist[2*i] = localCommID[i];
commlist[2*i + 1] = addALLCOMM(&c_tmp, p_ac);
#ifdef _DEBUG
printf("local comm (id = %d) has global id %d\n", localCommID[i], commlist[2*i + 1]);
printf("ALLCOMM.size %d, ALLCOMM.n %d\n", p_ac->size, p_ac->n);
#endif
}
*p_commlist_size = 2 * N;
*p_commlist = commlist;
}
return N;
}
int saveCommMap(COMMSET *CS, char FILENAME[])
{
// retrieve every comm from commset and find out
// the member processes global ids and put them
// in the sendbuf
int nprocs;
int myid;
MPI_Group MPI_GROUP_WORLD;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_group(MPI_COMM_WORLD, &MPI_GROUP_WORLD);
#ifdef _DEBUG
printf("myid %d : begin save commmap\n", myid);
#endif
int* sendbuf;
int sendcount = buildSendBuf(CS, MPI_GROUP_WORLD, &sendbuf);
#ifdef _DEBUG
printf("myid %d : sendcount = %d, sendbuf:", myid, sendcount);
for(int i = 0; i < sendcount; ++i)
{
printf(" %d", sendbuf[i]);
}
printf("\n");
#endif
int* recvcounts;
int* recvbuf;
int recvsize = collectAllCommPara(sendcount, sendbuf, &recvcounts, &recvbuf);
if(myid == 0)
{
#ifdef _DEBUG
printf("myid %d : recvsize = %d, recvbuf:", myid, recvsize);
for(int i = 0; i < recvsize; ++i)
{
printf(" %d", recvbuf[i]);
}
printf("\n");
#endif
FILE* COMMLISTFILE = fopen(FILENAME, "w");
int idx = 0;
ALLCOMM ac;
initALLCOMM(&ac);
int commlist_size = 0;
int* commlist = NULL;
for(int i = 0; i < nprocs; ++i)
{
#ifdef _DEBUG
printf("start buildCommList from recvbuf[%d]\n", idx);
printf("initial ac.n = %d, ac.size = %d\n", ac.n, ac.size);
printf("initial commlist_size = %d\n", commlist_size);
#endif
int ncomm = buildCommList(&recvbuf[idx], &ac, &commlist_size, &commlist);
idx += recvcounts[i];
#ifdef _DEBUG
printf("proc %d has %d comms:\n", i, ncomm);
for(int ic = 0; ic < ncomm*2; ++ic)
{
printf("%d/%d: %d\n", ic, ncomm*2, commlist[ic]);
}
#endif
fprintf(COMMLISTFILE, "%d %d", i, ncomm);
for(int ic = 0; ic < ncomm; ++ic)
{
fprintf(COMMLISTFILE, " %d %d", commlist[2*ic], commlist[2*ic + 1]);
}
fprintf(COMMLISTFILE, "\n");
#ifdef _DEBUG
printf("proc %d has written to file\n", i);
#endif
}
fclose(COMMLISTFILE);
}
return 0;
}
#endif
......@@ -35,7 +35,7 @@
#include "ompi/runtime/ompi_spc.h"
// for log messages
#include <msglog.h>
#include "msglog.h"
// end
#if OMPI_BUILD_MPI_PROFILING
......@@ -114,17 +114,17 @@ int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
if ((MPI_IN_PLACE != sendbuf && 0 == sendcount) ||
(0 == recvcount)) {
return MPI_SUCCESS;
}
}
}
else if ( OMPI_COMM_IS_INTER(comm) ){
/* for inter comunicators, the communication pattern
need not be symmetric. Specifically, one group is
allows to have sendcount=0, while the other has
allows to have sendcount=0, while the other has
a valid sendcount. Thus, the only way not to do
anything is if both sendcount and recvcount are zero. */
if ( 0 == sendcount && 0 == recvcount ) {
return MPI_SUCCESS;
}
if ( 0 == sendcount && 0 == recvcount ) {
return MPI_SUCCESS;
}
}
OPAL_CR_ENTER_LIBRARY();
......@@ -143,7 +143,7 @@ int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
const int LOG_ARGC=4;
int LOG_ARGV[4]={sendcount, sendelemsize, recvcount, recvelemsize};
LOGMSG1(FUNC_ID, call_time, comm, LOG_ARGC, LOG_ARGV);
addComm(comm, &localComm);
addCOMM(comm, &localComm);
// end
OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME);
......
......@@ -35,7 +35,7 @@
#include "ompi/runtime/ompi_spc.h"
// for log messages
#include <msglog.h>
#include "msglog.h"
// end
#if OMPI_BUILD_MPI_PROFILING
......@@ -133,14 +133,14 @@ int MPI_Allgatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
sum(recvounts) > 0 if there's anything to do. */
if ( OMPI_COMM_IS_INTRA( comm) ) {
for (i = 0; i < ompi_comm_size(comm); ++i) {
if (0 != recvcounts[i]) {
break;
}
}
if (i >= ompi_comm_size(comm)) {
return MPI_SUCCESS;
}
for (i = 0; i < ompi_comm_size(comm); ++i) {
if (0 != recvcounts[i]) {
break;
}
}
if (i >= ompi_comm_size(comm)) {
return MPI_SUCCESS;
}
}
/* There is no rule that can be applied for inter-communicators, since
recvcount(s)=0 only indicates that the processes in the other group
......@@ -160,13 +160,14 @@ int MPI_Allgatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
// for log messages
const int FUNC_ID=2;
int sendelemsize;
int* recvsizelist=INT_POOL;
resize_INT_POOL(size, &SIZE_LIST);
int* recvsizelist=SIZE_LIST.pool;
int LOGMSG_SENDSIZE=LOGMSG_SIZE(sendtype, sendcount, &sendelemsize);
int LOGMSG_RECVSIZE=LOGMSG_SIZE_V(size, recvtype, recvcounts, recvsizelist);
const int LOG_ARGC=2;
int LOG_ARGV[2]={sendcount, sendelemsize};
LOGMSG2(FUNC_ID, call_time, comm, LOG_ARGC, size, LOG_ARGV, recvsizelist);
addComm(comm, &localComm);
addCOMM(comm, &localComm);
// end
OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME);
......
......@@ -34,7 +34,7 @@
#include "ompi/runtime/ompi_spc.h"
// for log messages
#include <msglog.h>
#include "msglog.h"
// end
#if OMPI_BUILD_MPI_PROFILING
......@@ -82,8 +82,8 @@ int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count,
err = MPI_SUCCESS;
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
if (ompi_comm_invalid(comm)) {
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM,
if (ompi_comm_invalid(comm)) {
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM,
FUNC_NAME);
} else if (MPI_OP_NULL == op) {
err = MPI_ERR_OP;
......@@ -93,12 +93,12 @@ int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count,
return ret;
} else if ((MPI_IN_PLACE == sendbuf && OMPI_COMM_IS_INTER(comm)) ||
MPI_IN_PLACE == recvbuf ) {
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_BUFFER,
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_BUFFER,
FUNC_NAME);
} else if( (sendbuf == recvbuf) &&
(MPI_BOTTOM != sendbuf) &&
(count > 1) ) {
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_BUFFER,
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_BUFFER,
FUNC_NAME);
} else {
OMPI_CHECK_DATATYPE_FOR_SEND(err, datatype, count);
......@@ -125,11 +125,11 @@ int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count,
OBJ_RELEASE(op);
// for log messages
const int FUNC_ID=3;
const int LOG_ARGC=3;
const int FUNC_ID=3;
const int LOG_ARGC=3;
int LOG_ARGV[3]={count, MPI_Type_c2f(datatype), MPI_Op_c2f(op)};
LOGMSG1(FUNC_ID, call_time, comm, LOG_ARGC, LOG_ARGV);
addComm(comm, &localComm);
addCOMM(comm, &localComm);
// end
OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME);
......
......@@ -36,7 +36,7 @@
#include "ompi/runtime/ompi_spc.h"
// for log messages
#include <msglog.h>
#include "msglog.h"
// end
#if OMPI_BUILD_MPI_PROFILING
......@@ -128,7 +128,7 @@ int MPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
const int LOG_ARGC=4;
int LOG_ARGV[4]={sendcount, sendelemsize, recvcount, recvelemsize};
LOGMSG1(FUNC_ID, call_time, comm, LOG_ARGC, LOG_ARGV);
addComm(comm, &localComm);
addCOMM(comm, &localComm);
// end
OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME);
......
......@@ -34,7 +34,7 @@
#include "ompi/runtime/ompi_spc.h"
// for log messages
#include <msglog.h>
#include "msglog.h"
// end
#if OMPI_BUILD_MPI_PROFILING
......@@ -141,12 +141,13 @@ int MPI_Alltoallv(const void *sendbuf, const int sendcounts[],
// for log messages
const int FUNC_ID=5;
int* sendsizelist=&INT_POOL[0];
int* recvsizelist=&INT_POOL[size];
resize_INT_POOL(size*2, &SIZE_LIST);
int* sendsizelist=&(SIZE_LIST.pool[0]);
int* recvsizelist=&(SIZE_LIST.pool[size]);
int LOGMSG_SENDSIZE=LOGMSG_SIZE_V(size, sendtype, sendcounts, sendsizelist);
int LOGMSG_RECVSIZE=LOGMSG_SIZE_V(size, recvtype, recvcounts, recvsizelist);
LOGMSG2(FUNC_ID, call_time, comm, size, size, sendsizelist, recvsizelist);
addComm(comm, &localComm);
addCOMM(comm, &localComm);
// end
OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME);
......
......@@ -34,7 +34,7 @@
#include "ompi/runtime/ompi_spc.h"
// for log messages
#include <msglog.h>
#include "msglog.h"
// end
#if OMPI_BUILD_MPI_PROFILING
......@@ -136,12 +136,13 @@ int MPI_Alltoallw(const void *sendbuf, const int sendcounts[],
// for log messages
const int FUNC_ID=6;
int* sendsizelist=&INT_POOL[0];
int* recvsizelist=&INT_POOL[size];
resize_INT_POOL(size*2, &SIZE_LIST);
int* sendsizelist=&(SIZE_LIST.pool[0]);
int* recvsizelist=&(SIZE_LIST.pool[size]);
int LOGMSG_SENDSIZE=LOGMSG_SIZE_W(size, sendtypes, sendcounts, sendsizelist);
int LOGMSG_RECVSIZE=LOGMSG_SIZE_W(size, recvtypes, recvcounts, recvsizelist);
LOGMSG2(FUNC_ID, call_time, comm, size, size, sendsizelist, recvsizelist);
addComm(comm, &localComm);
addCOMM(comm, &localComm);
// end
OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME);
......
......@@ -28,7 +28,7 @@
#include "ompi/runtime/ompi_spc.h"
// for log messages
#include <msglog.h>
#include "msglog.h"
// end
#if OMPI_BUILD_MPI_PROFILING
......@@ -44,8 +44,8 @@ static const char FUNC_NAME[] = "MPI_Barrier";
int MPI_Barrier(MPI_Comm comm)
{
// for log messages
struct timeval call_time;
gettimeofday(&call_time, NULL);
struct timeval call_time;
gettimeofday(&call_time, NULL);
// end
int err = MPI_SUCCESS;
......@@ -86,9 +86,12 @@ int MPI_Barrier(MPI_Comm comm)
/* All done */
// for log messages
const int FUNC_ID=26;
LOGMSG0(FUNC_ID, call_time, comm);
addComm(comm, &localComm);
if(LOGTHIS == 1)
{
const int FUNC_ID=26;
LOGMSG0(FUNC_ID, call_time, comm);
addCOMM(comm, &localComm);
}
// end
OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME);
......
......@@ -29,7 +29,7 @@
#include "ompi/runtime/ompi_spc.h"
// for log messages
#include <msglog.h>
#include "msglog.h"
// end
#if OMPI_BUILD_MPI_PROFILING
......@@ -46,8 +46,11 @@ int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype,
int root, MPI_Comm comm)
{
// for log messages