diff --git a/proof/proofplayer/inc/TPacketizer.h b/proof/proofplayer/inc/TPacketizer.h index 00e07971d36b5d7260c0bed83502e57a3d4353ad..55bf213c02179035d546be306ad53f8bb9e7013e 100644 --- a/proof/proofplayer/inc/TPacketizer.h +++ b/proof/proofplayer/inc/TPacketizer.h @@ -67,6 +67,10 @@ private: // is (#events processed by 1 slave) / fPacketSizeAsAFraction. // It can be set with PROOF_PacketAsAFraction in input list. + // Add workers controls + Bool_t fHeuristicPSiz; // Whether the packet size is calculated heuristically + Bool_t fDefMaxWrkNode; // Whether the default is used for the max workers per node + TPacketizer(); TPacketizer(const TPacketizer&); // no implementation, will generate void operator=(const TPacketizer&); // error on accidental usage @@ -89,6 +93,7 @@ public: TList *input, TProofProgressStatus *st); virtual ~TPacketizer(); + Int_t AddWorkers(TList *workers); TDSetElement *GetNextPacket(TSlave *sl, TMessage *r); Long64_t GetEntriesProcessed(TSlave *sl) const; diff --git a/proof/proofplayer/inc/TPacketizerUnit.h b/proof/proofplayer/inc/TPacketizerUnit.h index 5dcf73d1c85062221877f1fc8a59f6edfbacaa88..fff75c8ccefdd2f8bc7f872add3cb291e4525e10 100644 --- a/proof/proofplayer/inc/TPacketizerUnit.h +++ b/proof/proofplayer/inc/TPacketizerUnit.h @@ -60,7 +60,6 @@ private: Bool_t fFixedNum; // Whether we must assign a fixed number of cycles per worker Long64_t fPacketSeq; // Sequential number of the last packet assigned - TList *fInput; // Input list TPacketizerUnit(); TPacketizerUnit(const TPacketizerUnit&); // no implementation, will generate diff --git a/proof/proofplayer/inc/TVirtualPacketizer.h b/proof/proofplayer/inc/TVirtualPacketizer.h index 8cb3813b73cd9a74fb5642b2088eff716bf10f02..899097cce09b3c3432a8bb4bf61c4b6b2710abe3 100644 --- a/proof/proofplayer/inc/TVirtualPacketizer.h +++ b/proof/proofplayer/inc/TVirtualPacketizer.h @@ -109,6 +109,8 @@ protected: TString fDataSet; // Name of the dataset being processed (for dataset-driven runs) + TList *fInput; // Input list + TVirtualPacketizer(TList *input, TProofProgressStatus *st = 0); TVirtualPacketizer(const TVirtualPacketizer &); // no implementation, will generate void operator=(const TVirtualPacketizer &); // error on accidental usage diff --git a/proof/proofplayer/src/TPacketizer.cxx b/proof/proofplayer/src/TPacketizer.cxx index 2001006103ed9f41663fdfee1325bc8f5334f439..8c85078d5b8452fa4d7e48e1ed5830684f28940b 100644 --- a/proof/proofplayer/src/TPacketizer.cxx +++ b/proof/proofplayer/src/TPacketizer.cxx @@ -289,6 +289,8 @@ TPacketizer::TPacketizer(TDSet *dset, TList *slaves, Long64_t first, fFileNodes = 0; fMaxPerfIdx = 1; fMaxSlaveCnt = 0; + fHeuristicPSiz = kFALSE; + fDefMaxWrkNode = kTRUE; if (!fProgressStatus) { Error("TPacketizerAdaptive", "No progress status"); @@ -297,23 +299,27 @@ TPacketizer::TPacketizer(TDSet *dset, TList *slaves, Long64_t first, Long_t maxSlaveCnt = 0; if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) { - if (maxSlaveCnt < 1) { - Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be grater than 0"); + if (maxSlaveCnt < 0) { + Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive"); maxSlaveCnt = 0; } + if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE; } else { // Try also with Int_t (recently supported in TProof::SetParameter) Int_t mxslcnt = -1; if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) { - if (mxslcnt < 1) { - Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be grater than 0"); + if (mxslcnt < 0) { + Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive"); mxslcnt = 0; } maxSlaveCnt = (Long_t) mxslcnt; + if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE; } } - if (!maxSlaveCnt) - maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 4); + if (!maxSlaveCnt) { + maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", slaves->GetSize()); + if (maxSlaveCnt != slaves->GetSize()) fDefMaxWrkNode = kFALSE; + } if (maxSlaveCnt > 0) { fMaxSlaveCnt = maxSlaveCnt; PDB(kPacketizer,1) @@ -376,13 +382,9 @@ TPacketizer::TPacketizer(TDSet *dset, TList *slaves, Long64_t first, fSlaveStats = new TMap; fSlaveStats->SetOwner(kFALSE); - TSlave *slave; - TIter si(slaves); - while ((slave = (TSlave*) si.Next())) { - fSlaveStats->Add( slave, new TSlaveStat(slave) ); - fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ? - slave->GetPerfIdx() : fMaxPerfIdx; - } + // Record initial available workers + Int_t nwrks = AddWorkers(slaves); + Info("TPacketizer", "Initial number of workers: %d", nwrks); // Setup file & filenode structure Reset(); @@ -560,6 +562,7 @@ TPacketizer::TPacketizer(TDSet *dset, TList *slaves, Long64_t first, Info("Process","using alternate packet size: %lld", fPacketSize); } else { // Heuristic for starting packet size + fHeuristicPSiz = kTRUE; Int_t nslaves = fSlaveStats->GetSize(); if (nslaves > 0) { fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves); @@ -593,6 +596,44 @@ TPacketizer::~TPacketizer() SafeDelete(fFileNodes); } +//______________________________________________________________________________ +Int_t TPacketizer::AddWorkers(TList *workers) +{ + // Adds new workers. Returns the number of workers added, or -1 on failure. + + if (!workers) { + Error("AddWorkers", "Null list of new workers!"); + return -1; + } + + Int_t curNumOfWrks = fSlaveStats->GetEntries(); + + TSlave *sl; + TIter next(workers); + while (( sl = dynamic_cast<TSlave*>(next()) )) + if (!fSlaveStats->FindObject(sl)) { + fSlaveStats->Add(sl, new TSlaveStat(sl)); + fMaxPerfIdx = sl->GetPerfIdx() > fMaxPerfIdx ? sl->GetPerfIdx() : fMaxPerfIdx; + } + + // If heuristic (and new workers) set the packet size + Int_t nwrks = fSlaveStats->GetSize(); + if (fHeuristicPSiz && nwrks > curNumOfWrks) { + if (nwrks > 0) { + fPacketSize = fTotalEntries / (fPacketAsAFraction * nwrks); + if (fPacketSize < 1) fPacketSize = 1; + } else { + fPacketSize = 1; + } + } + + // Update the max number that can access one file node if the default is used + if (fDefMaxWrkNode && nwrks > fMaxSlaveCnt) fMaxSlaveCnt = nwrks; + + // Done + return nwrks; +} + //______________________________________________________________________________ TPacketizer::TFileStat *TPacketizer::GetNextUnAlloc(TFileNode *node) { @@ -672,13 +713,14 @@ TPacketizer::TFileNode *TPacketizer::NextActiveNode() fActive->Sort(); PDB(kPacketizer,2) { - std::cout << "TPacketizer::NextActiveNode()" << std::endl; + Printf("TPacketizer::NextActiveNode : ----------------------"); fActive->Print(); } TFileNode *fn = (TFileNode*) fActive->First(); if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) { - PDB(kPacketizer,1) Info("NextActiveNode", "reached workers per node limit (%ld)", fMaxSlaveCnt); + PDB(kPacketizer,1) + Info("NextActiveNode", "reached workers per node limit (%ld)", fMaxSlaveCnt); fn = 0; } @@ -1106,12 +1148,14 @@ TDSetElement *TPacketizer::GetNextPacket(TSlave *sl, TMessage *r) return 0; } - // find slave + // Find worker TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl ); R__ASSERT( slstat != 0 ); + PDB(kPacketizer,1) + Info("GetNextPacket","worker-%s (%s)", sl->GetOrdinal(), sl->GetName()); // update stats & free old element Bool_t firstPacket = kFALSE; diff --git a/proof/proofplayer/src/TVirtualPacketizer.cxx b/proof/proofplayer/src/TVirtualPacketizer.cxx index f85b9001da12d4a62204546885b8b3d24f28f009..b4718221b681d08944339a72bac1ae550acdb7de 100644 --- a/proof/proofplayer/src/TVirtualPacketizer.cxx +++ b/proof/proofplayer/src/TVirtualPacketizer.cxx @@ -66,6 +66,7 @@ TVirtualPacketizer::TVirtualPacketizer(TList *input, TProofProgressStatus *st) { // Constructor. + fInput = input; // General configuration parameters fMinPacketTime = 3; Double_t minPacketTime = 0;