Skip to content
Snippets Groups Projects
Commit 48915ed6 authored by Gerardo Ganis's avatar Gerardo Ganis
Browse files

multicore: reorganize files and name according to ROOT-8513 - Part 3

Implement new naming in the relevant tutorials.
Residual fixes in the new class reorganization
parent 803f0cf9
Branches
Tags
No related merge requests found
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include "THashList.h" #include "THashList.h"
#include "TMPClient.h" #include "TMPClient.h"
#include "TMPWorkerTree.h" #include "TMPWorkerTree.h"
#include "TPoolWorker.h"
#include "TSelector.h" #include "TSelector.h"
#include "TTreeReader.h" #include "TTreeReader.h"
#include <algorithm> //std::generate #include <algorithm> //std::generate
......
...@@ -14,9 +14,14 @@ ...@@ -14,9 +14,14 @@
#include "TMPWorker.h" #include "TMPWorker.h"
#include "TFile.h" #include "TFile.h"
#include "TEntryList.h"
#include "TEventList.h"
#include "TH1.h"
#include "TKey.h" #include "TKey.h"
#include "TSelector.h"
#include "TTree.h" #include "TTree.h"
#include "TTreeCache.h" #include "TTreeCache.h"
#include "TTreeReader.h"
#include <memory> //unique_ptr #include <memory> //unique_ptr
#include <string> #include <string>
...@@ -45,7 +50,7 @@ protected: ...@@ -45,7 +50,7 @@ protected:
void HandleInput(MPCodeBufPair& msg); ///< Execute instructions received from a MP client void HandleInput(MPCodeBufPair& msg); ///< Execute instructions received from a MP client
void Init(int fd, unsigned workerN); void Init(int fd, unsigned workerN);
TFile *OpenFile(const std::string& fileName); TFile *OpenFile(const std::string& fileName);
virtual void Process(unsigned code, MPCodeBufPair& msg); virtual void Process(unsigned, MPCodeBufPair&) { }
TTree *RetrieveTree(TFile *fp); TTree *RetrieveTree(TFile *fp);
virtual void SendResult() { } virtual void SendResult() { }
void Setup(); void Setup();
...@@ -105,4 +110,161 @@ private: ...@@ -105,4 +110,161 @@ private:
bool fFirstEntry = true; bool fFirstEntry = true;
}; };
//////////////////////////////////////////////////////////////////////////
/// Auxilliary templated functions
/// If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the
/// problem of that object being automatically owned by the current open file.
/// For these three types, we call SetDirectory(nullptr) to detach the returned
/// object from the file we are reading the TTree from.
/// Note: the only sane case in which this should happen is when a TH1F* is
/// returned.
template<class T, typename std::enable_if<std::is_pointer<T>::value && std::is_constructible<TObject*, T>::value>::type* = nullptr>
void DetachRes(T res)
{
auto th1p = dynamic_cast<TH1*>(res);
if(th1p != nullptr) {
th1p->SetDirectory(nullptr);
return;
}
auto ttreep = dynamic_cast<TTree*>(res);
if(ttreep != nullptr) {
ttreep->SetDirectory(nullptr);
return;
}
auto tentrylist = dynamic_cast<TEntryList*>(res);
if(tentrylist != nullptr) {
tentrylist->SetDirectory(nullptr);
return;
}
auto teventlist = dynamic_cast<TEventList*>(res);
if(teventlist != nullptr) {
teventlist->SetDirectory(nullptr);
return;
}
return;
}
//////////////////////////////////////////////////////////////////////////
/// Generic function processing SendResult and Process overload
template<class F>
void TMPWorkerTreeFunc<F>::SendResult()
{
//send back result
MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
}
template<class F>
void TMPWorkerTreeFunc<F>::Process(unsigned code, MPCodeBufPair& msg)
{
//evaluate the index of the file to process in fFileNames
//(we actually don't need the parameter if code == kProcTree)
unsigned fileN = 0;
unsigned nProcessed = 0;
if (code == MPCode::kProcRange || code == MPCode::kProcTree) {
if (code == MPCode::kProcTree && !fTree) {
// This must be defined
Error("TMPWorkerTreeFunc::Process", "[S]: Process:kProcTree fTree undefined!\n");
return;
}
//retrieve the total number of entries ranges processed so far by TPool
nProcessed = ReadBuffer<unsigned>(msg.second.get());
//evaluate the file and the entries range to process
fileN = nProcessed / fNWorkers;
} else {
//evaluate the file and the entries range to process
fileN = ReadBuffer<unsigned>(msg.second.get());
}
std::unique_ptr<TFile> fp;
TTree *tree = nullptr;
if (code != MPCode::kProcTree ||
(code == MPCode::kProcTree && fTree->GetCurrentFile())) {
//open file
if (code == MPCode::kProcTree && fTree->GetCurrentFile()) {
// Single tree from file: we need to reopen, because file descriptor gets invalidated across Fork
fp.reset(OpenFile(fTree->GetCurrentFile()->GetName()));
} else {
fp.reset(OpenFile(fFileNames[fileN]));
}
if (fp == nullptr) {
//errors are handled inside OpenFile
return;
}
//retrieve the TTree with the specified name from file
//we are not the owner of the TTree object, the file is!
tree = RetrieveTree(fp.get());
if(tree == nullptr) {
//errors are handled inside RetrieveTree
return;
}
} else {
// Tree in memory: OK
tree = fTree;
}
// Setup the cache, if required
SetupTreeCache(tree);
//create entries range
Long64_t start = 0;
Long64_t finish = 0;
if (code == MPCode::kProcRange || code == MPCode::kProcTree) {
//example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
//and this worker must take the rangeN-th range
unsigned nEntries = tree->GetEntries();
unsigned nBunch = nEntries / fNWorkers;
unsigned rangeN = nProcessed % fNWorkers;
start = rangeN*nBunch;
if(rangeN < (fNWorkers-1))
finish = (rangeN+1)*nBunch;
else
finish = nEntries;
} else {
start = 0;
finish = tree->GetEntries();
}
//check if we are going to reach the max of entries
//change finish accordingly
if (fMaxNEntries)
if (fProcessedEntries + finish - start > fMaxNEntries)
finish = start + fMaxNEntries - fProcessedEntries;
// create a TTreeReader that reads this range of entries
TTreeReader reader(tree);
TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
if(status != TTreeReader::kEntryValid) {
std::string reply = "S" + std::to_string(GetNWorker());
reply += ": could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish);
MPSend(GetSocket(), MPCode::kProcError, reply.data());
return;
}
//execute function
auto res = fProcFunc(reader);
//detach result from file if needed (currently needed for TH1, TTree, TEventList)
DetachRes(res);
//update the number of processed entries
fProcessedEntries += finish - start;
if(fCanReduce) {
PoolUtils::ReduceObjects<TObject *> redfunc;
fReducedResult = static_cast<decltype(fReducedResult)>(redfunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
} else {
fCanReduce = true;
fReducedResult = res;
}
if(fMaxNEntries == fProcessedEntries)
//we are done forever
MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
else
//we are done for now
MPSend(GetSocket(), MPCode::kIdling);
}
#endif #endif
...@@ -35,40 +35,6 @@ ...@@ -35,40 +35,6 @@
/// ///
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////
/// Auxilliary functions
/// If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the
/// problem of that object being automatically owned by the current open file.
/// For these three types, we call SetDirectory(nullptr) to detach the returned
/// object from the file we are reading the TTree from.
/// Note: the only sane case in which this should happen is when a TH1F* is
/// returned.
template<class T, typename std::enable_if<std::is_pointer<T>::value && std::is_constructible<TObject*, T>::value>::type* = nullptr>
void DetachRes(T res)
{
auto th1p = dynamic_cast<TH1*>(res);
if(th1p != nullptr) {
th1p->SetDirectory(nullptr);
return;
}
auto ttreep = dynamic_cast<TTree*>(res);
if(ttreep != nullptr) {
ttreep->SetDirectory(nullptr);
return;
}
auto tentrylist = dynamic_cast<TEntryList*>(res);
if(tentrylist != nullptr) {
tentrylist->SetDirectory(nullptr);
return;
}
auto teventlist = dynamic_cast<TEventList*>(res);
if(teventlist != nullptr) {
teventlist->SetDirectory(nullptr);
return;
}
return;
}
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
/// Class constructors. /// Class constructors.
/// Note that this does not set variables like fPid or fS (worker's socket).\n /// Note that this does not set variables like fPid or fS (worker's socket).\n
...@@ -97,7 +63,7 @@ TMPWorkerTree::TMPWorkerTree(const std::vector<std::string>& fileNames, ...@@ -97,7 +63,7 @@ TMPWorkerTree::TMPWorkerTree(const std::vector<std::string>& fileNames,
} }
TMPWorkerTree::TMPWorkerTree(TTree *tree, unsigned nWorkers, ULong64_t maxEntries) TMPWorkerTree::TMPWorkerTree(TTree *tree, unsigned nWorkers, ULong64_t maxEntries)
: TMPWorker(nWorkers, maxEntries), fTree(tree), : TMPWorker(nWorkers, maxEntries), fTree(tree), fFile(nullptr),
fTreeCache(0), fTreeCacheIsLearning(kFALSE), fTreeCache(0), fTreeCacheIsLearning(kFALSE),
fUseTreeCache(kTRUE), fCacheSize(-1) fUseTreeCache(kTRUE), fCacheSize(-1)
{ {
...@@ -259,128 +225,6 @@ void TMPWorkerTree::HandleInput(MPCodeBufPair& msg) ...@@ -259,128 +225,6 @@ void TMPWorkerTree::HandleInput(MPCodeBufPair& msg)
} }
} }
//////////////////////////////////////////////////////////////////////////
/// Generic function processing SendResult and Process overload
template<class F>
void TMPWorkerTreeFunc<F>::SendResult()
{
//send back result
MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
}
template<class F>
void TMPWorkerTreeFunc<F>::Process(unsigned code, MPCodeBufPair& msg)
{
//evaluate the index of the file to process in fFileNames
//(we actually don't need the parameter if code == kProcTree)
unsigned fileN = 0;
unsigned nProcessed = 0;
if (code == MPCode::kProcRange || code == MPCode::kProcTree) {
if (code == MPCode::kProcTree && !fTree) {
// This must be defined
Error("TMPWorkerTreeFunc::Process", "[S]: Process:kProcTree fTree undefined!\n");
return;
}
//retrieve the total number of entries ranges processed so far by TPool
nProcessed = ReadBuffer<unsigned>(msg.second.get());
//evaluate the file and the entries range to process
fileN = nProcessed / fNWorkers;
} else {
//evaluate the file and the entries range to process
fileN = ReadBuffer<unsigned>(msg.second.get());
}
std::unique_ptr<TFile> fp;
TTree *tree = nullptr;
if (code != MPCode::kProcTree ||
(code == MPCode::kProcTree && fTree->GetCurrentFile())) {
//open file
if (code == MPCode::kProcTree && fTree->GetCurrentFile()) {
// Single tree from file: we need to reopen, because file descriptor gets invalidated across Fork
fp.reset(OpenFile(fTree->GetCurrentFile()->GetName()));
} else {
fp.reset(OpenFile(fFileNames[fileN]));
}
if (fp == nullptr) {
//errors are handled inside OpenFile
return;
}
//retrieve the TTree with the specified name from file
//we are not the owner of the TTree object, the file is!
tree = RetrieveTree(fp.get());
if(tree == nullptr) {
//errors are handled inside RetrieveTree
return;
}
} else {
// Tree in memory: OK
tree = fTree;
}
// Setup the cache, if required
SetupTreeCache(tree);
//create entries range
Long64_t start = 0;
Long64_t finish = 0;
if (code == MPCode::kProcRange || code == MPCode::kProcTree) {
//example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
//and this worker must take the rangeN-th range
unsigned nEntries = tree->GetEntries();
unsigned nBunch = nEntries / fNWorkers;
unsigned rangeN = nProcessed % fNWorkers;
start = rangeN*nBunch;
if(rangeN < (fNWorkers-1))
finish = (rangeN+1)*nBunch;
else
finish = nEntries;
} else {
start = 0;
finish = tree->GetEntries();
}
//check if we are going to reach the max of entries
//change finish accordingly
if (fMaxNEntries)
if (fProcessedEntries + finish - start > fMaxNEntries)
finish = start + fMaxNEntries - fProcessedEntries;
// create a TTreeReader that reads this range of entries
TTreeReader reader(tree);
TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
if(status != TTreeReader::kEntryValid) {
std::string reply = "S" + std::to_string(GetNWorker());
reply += ": could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish);
MPSend(GetSocket(), MPCode::kProcError, reply.data());
return;
}
//execute function
auto res = fProcFunc(reader);
//detach result from file if needed (currently needed for TH1, TTree, TEventList)
DetachRes(res);
//update the number of processed entries
fProcessedEntries += finish - start;
if(fCanReduce) {
PoolUtils::ReduceObjects<TObject *> redfunc;
fReducedResult = static_cast<decltype(fReducedResult)>(redfunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
} else {
fCanReduce = true;
fReducedResult = res;
}
if(fMaxNEntries == fProcessedEntries)
//we are done forever
MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
else
//we are done for now
MPSend(GetSocket(), MPCode::kIdling);
}
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
...@@ -414,9 +258,6 @@ void TMPWorkerTreeSel::Process(unsigned int code, MPCodeBufPair& msg) ...@@ -414,9 +258,6 @@ void TMPWorkerTreeSel::Process(unsigned int code, MPCodeBufPair& msg)
return; return;
} }
//evaluate the index of the file to process in fFileNames
//(we actually don't need the parameter if code == kProcTree)
unsigned nProcessed = 0;
//retrieve the total number of entries ranges processed so far by TPool //retrieve the total number of entries ranges processed so far by TPool
nProcessed = ReadBuffer<unsigned>(msg.second.get()); nProcessed = ReadBuffer<unsigned>(msg.second.get());
...@@ -483,7 +324,7 @@ void TMPWorkerTreeSel::Process(unsigned int code, MPCodeBufPair& msg) ...@@ -483,7 +324,7 @@ void TMPWorkerTreeSel::Process(unsigned int code, MPCodeBufPair& msg)
} }
// Prepare to setup the cache, if required // Prepare to setup the cache, if required
setupcache = (tree != fTree) : true : false; setupcache = (tree != fTree) ? true : false;
// Store as reference // Store as reference
fTree = tree; fTree = tree;
......
/// \file /// \file
/// \ingroup tutorial_multicore /// \ingroup tutorial_multicore
/// Illustrate the usage of the TTreeProcessor::Process method. /// Illustrate the usage of the TTreeProcessorMT::Process method.
/// Such method provides an implicit parallelisation of the reading and processing of a TTree. /// Such method provides an implicit parallelisation of the reading and processing of a TTree.
/// In particular, when invoking Process, the user provides a function that iterates on a subrange /// In particular, when invoking Process, the user provides a function that iterates on a subrange
/// of the tree via a TTreeReader. Multiple tasks will be spawned, one for each sub-range, so that /// of the tree via a TTreeReader. Multiple tasks will be spawned, one for each sub-range, so that
...@@ -28,9 +28,9 @@ int imt101_parTreeProcessing() ...@@ -28,9 +28,9 @@ int imt101_parTreeProcessing()
ROOT::TThreadedObject<TH1F> pzHist("pz_dist","p_{Z} Distribution;p_{Z};dN/dp_{Z}", 100, 0, 5); ROOT::TThreadedObject<TH1F> pzHist("pz_dist","p_{Z} Distribution;p_{Z};dN/dp_{Z}", 100, 0, 5);
ROOT::TThreadedObject<TH2F> pxpyHist("px_py","p_{X} vs p_{Y} Distribution;p_{X};p_{Y}", 100, -5., 5., 100, -5., 5.); ROOT::TThreadedObject<TH2F> pxpyHist("px_py","p_{X} vs p_{Y} Distribution;p_{X};p_{Y}", 100, -5., 5., 100, -5., 5.);
// Create a TTreeProcessor: specify the file and the tree in it // Create a TTreeProcessorMT: specify the file and the tree in it
ROOT::TTreeProcessor tp("http://root.cern.ch/files/tp_process_imt.root", ROOT::TTreeProcessorMT tp("http://root.cern.ch/files/tp_process_imt.root",
"events"); "events");
// Define the function that will process a subrange of the tree. // Define the function that will process a subrange of the tree.
// The function must receive only one parameter, a TTreeReader, // The function must receive only one parameter, a TTreeReader,
......
...@@ -44,10 +44,10 @@ Int_t mp102_readNtuplesFillHistosAndFit() ...@@ -44,10 +44,10 @@ Int_t mp102_readNtuplesFillHistosAndFit()
}; };
// Create the pool of processes // Create the pool of processes
ROOT::TProcessExecutor workers(nFiles); ROOT::TTreeProcessorMP workers(nFiles);
// Process the TChain // Process the TChain
auto sumHistogram = workers.ProcTree(inputChain, workItem, "multiCore"); auto sumHistogram = workers.Process(inputChain, workItem, "multiCore");
sumHistogram->Fit("gaus", 0); sumHistogram->Fit("gaus", 0);
return 0; return 0;
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
#include "TTree.h" #include "TTree.h"
#include "TH1F.h" #include "TH1F.h"
#include "TTreeReader.h" #include "TTreeReader.h"
#include "ROOT/TProcessExecutor.hxx" #include "ROOT/TTreeProcessorMP.hxx"
const char *fh1[] = {"http://root.cern.ch/files/h1/dstarmb.root", const char *fh1[] = {"http://root.cern.ch/files/h1/dstarmb.root",
"http://root.cern.ch/files/h1/dstarp1a.root", "http://root.cern.ch/files/h1/dstarp1a.root",
...@@ -42,18 +42,18 @@ int mp103_processSelector(){ ...@@ -42,18 +42,18 @@ int mp103_processSelector(){
TTree *tree = (TTree *) fp->Get("h42"); TTree *tree = (TTree *) fp->Get("h42");
#endif #endif
ROOT::TProcessExecutor pool(3); ROOT::TTreeProcessorMP pool(3);
TList* out = 0; TList* out = 0;
#if defined(__reproduce_davix) #if defined(__reproduce_davix)
//TProcessExecutor::Process with a single tree //TTreeProcessorMP::Process with a single tree
out = pool.ProcTree(*tree, *sel);; out = pool.Process(*tree, *sel);;
sel->GetOutputList()->Delete(); sel->GetOutputList()->Delete();
#endif #endif
//TProcessExecutor::Process with single file name and tree name //TTreeProcessorMP::Process with single file name and tree name
//Note: we have less files than workers here //Note: we have less files than workers here
out = pool.ProcTree(fh1[0], *sel, "h1"); out = pool.Process(fh1[0], *sel, "h42");
sel->GetOutputList()->Delete(); sel->GetOutputList()->Delete();
// Prepare datasets: vector of files, TFileCollection // Prepare datasets: vector of files, TFileCollection
...@@ -64,13 +64,13 @@ int mp103_processSelector(){ ...@@ -64,13 +64,13 @@ int mp103_processSelector(){
fc.Add(new TFileInfo(fh1[i])); fc.Add(new TFileInfo(fh1[i]));
} }
//TProcessExecutor::Process with vector of files and tree name //TTreeProcessorMP::Process with vector of files and tree name
//Note: we have more files than workers here (different behaviour) //Note: we have more files than workers here (different behaviour)
out = pool.ProcTree(files, *sel, "h1"); out = pool.Process(files, *sel, "h42");
sel->GetOutputList()->Delete(); sel->GetOutputList()->Delete();
//TProcessExecutor::Process with TFileCollection, no tree name //TTreeProcessorMP::Process with TFileCollection, no tree name
out = pool.ProcTree(fc, *sel); out = pool.Process(fc, *sel);
sel->GetOutputList()->Delete(); sel->GetOutputList()->Delete();
return 0; return 0;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment