Skip to content
Snippets Groups Projects
Commit 0dfecd1f authored by Enrico Guiraud's avatar Enrico Guiraud Committed by Danilo Piparo
Browse files

[TDF] Fix Snapshot not handling multiple input files correctly

Once per Snapshot call, each thread creates a TBufferMergerFile.
Once per TBB task (lifetime of an input TChain) each thread creates
a new output TTree, assigns branches to it and adds it as a clone
to the input TChain.
parent 3175f3e3
Branches
Tags
No related merge requests found
...@@ -416,22 +416,23 @@ extern template void MeanHelper::Exec(unsigned int, const std::vector<char> &); ...@@ -416,22 +416,23 @@ extern template void MeanHelper::Exec(unsigned int, const std::vector<char> &);
extern template void MeanHelper::Exec(unsigned int, const std::vector<int> &); extern template void MeanHelper::Exec(unsigned int, const std::vector<int> &);
extern template void MeanHelper::Exec(unsigned int, const std::vector<unsigned int> &); extern template void MeanHelper::Exec(unsigned int, const std::vector<unsigned int> &);
template <typename F> template <typename F1, typename F2>
class SnapshotHelper { class SnapshotHelper {
F fCallable; F1 fInitFunc;
F2 fExecFunc;
public: public:
using BranchTypes_t = typename TRemoveFirst<typename TFunctionTraits<F>::Args_t>::Types_t; using BranchTypes_t = typename TRemoveFirst<typename TFunctionTraits<F2>::Args_t>::Types_t;
SnapshotHelper(F &&f) : fCallable(f) {} SnapshotHelper(F1 &&f1, F2 &&f2) : fInitFunc(f1), fExecFunc(f2) {}
void Init(TTreeReader*) { /* TODO */ } void Init(TTreeReader *r, unsigned int slot) { fInitFunc(r, slot); }
template <typename... Args> template <typename... Args>
void Exec(unsigned int slot, Args &&... args) void Exec(unsigned int slot, Args &&... args)
{ {
// check that the decayed types of Args are the same as the branch types // check that the decayed types of Args are the same as the branch types
static_assert(std::is_same<TTypeList<typename std::decay<Args>::type...>, BranchTypes_t>::value, ""); static_assert(std::is_same<TTypeList<typename std::decay<Args>::type...>, BranchTypes_t>::value, "");
fCallable(slot, std::forward<Args>(args)...); fExecFunc(slot, std::forward<Args>(args)...);
} }
void Finalize() { /* noop */} void Finalize() { /* noop */}
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "TProfile2D.h" // For Histo actions #include "TProfile2D.h" // For Histo actions
#include "TRegexp.h" #include "TRegexp.h"
#include "TROOT.h" // IsImplicitMTEnabled #include "TROOT.h" // IsImplicitMTEnabled
#include "TTreeReader.h"
#include <initializer_list> #include <initializer_list>
#include <memory> #include <memory>
...@@ -1090,6 +1091,7 @@ protected: ...@@ -1090,6 +1091,7 @@ protected:
TTree t(treenameInt.c_str(), treenameInt.c_str()); TTree t(treenameInt.c_str(), treenameInt.c_str());
bool FirstEvent = true; bool FirstEvent = true;
// TODO move fillTree and initLambda to SnapshotHelper's body
auto fillTree = [&t, &bnames, &FirstEvent](unsigned int /* unused */, Args &... args) { auto fillTree = [&t, &bnames, &FirstEvent](unsigned int /* unused */, Args &... args) {
if (FirstEvent) { if (FirstEvent) {
// hack to call TTree::Branch on all variadic template arguments // hack to call TTree::Branch on all variadic template arguments
...@@ -1100,9 +1102,17 @@ protected: ...@@ -1100,9 +1102,17 @@ protected:
t.Fill(); t.Fill();
}; };
using Op_t = TDFInternal::SnapshotHelper<decltype(fillTree)>; auto initLambda = [&t] (TTreeReader *r, unsigned int slot) {
if(r) {
// not an empty-source TDF
auto tree = r->GetTree();
tree->AddClone(&t);
}
};
using Op_t = TDFInternal::SnapshotHelper<decltype(initLambda), decltype(fillTree)>;
using DFA_t = TDFInternal::TAction<Op_t, Proxied>; using DFA_t = TDFInternal::TAction<Op_t, Proxied>;
df->Book(std::make_shared<DFA_t>(Op_t(std::move(fillTree)), bnames, *fProxiedPtr)); df->Book(std::make_shared<DFA_t>(Op_t(std::move(initLambda), std::move(fillTree)), bnames, *fProxiedPtr));
fProxiedPtr->IncrChildrenCount(); fProxiedPtr->IncrChildrenCount();
df->Run(); df->Run();
t.Write(); t.Write();
...@@ -1110,16 +1120,15 @@ protected: ...@@ -1110,16 +1120,15 @@ protected:
unsigned int nSlots = df->GetNSlots(); unsigned int nSlots = df->GetNSlots();
TBufferMerger merger(filenameInt.c_str(), "RECREATE"); TBufferMerger merger(filenameInt.c_str(), "RECREATE");
std::vector<std::shared_ptr<TBufferMergerFile>> files(nSlots); std::vector<std::shared_ptr<TBufferMergerFile>> files(nSlots);
std::vector<TTree *> trees(nSlots); std::vector<TTree *> trees(nSlots); // ROOT owns/manages these TTrees
std::vector<int> isFirstEvent(nSlots, 1); // vector<bool> is evil
auto fillTree = [&](unsigned int slot, Args &... args) { auto fillTree = [&](unsigned int slot, Args &... args) {
if (!trees[slot]) { if (isFirstEvent[slot]) {
files[slot] = merger.GetFile();
trees[slot] = new TTree(treenameInt.c_str(), treenameInt.c_str());
trees[slot]->ResetBit(kMustCleanup);
// hack to call TTree::Branch on all variadic template arguments // hack to call TTree::Branch on all variadic template arguments
std::initializer_list<int> expander = {(trees[slot]->Branch(bnames[S].c_str(), &args), 0)..., 0}; std::initializer_list<int> expander = {(trees[slot]->Branch(bnames[S].c_str(), &args), 0)..., 0};
(void)expander; // avoid unused variable warnings for older compilers such as gcc 4.9 (void)expander; // avoid unused variable warnings for older compilers such as gcc 4.9
isFirstEvent[slot] = 0;
} }
trees[slot]->Fill(); trees[slot]->Fill();
auto entries = trees[slot]->GetEntries(); auto entries = trees[slot]->GetEntries();
...@@ -1127,9 +1136,27 @@ protected: ...@@ -1127,9 +1136,27 @@ protected:
if ((autoflush > 0) && (entries % autoflush == 0)) files[slot]->Write(); if ((autoflush > 0) && (entries % autoflush == 0)) files[slot]->Write();
}; };
using Op_t = TDFInternal::SnapshotHelper<decltype(fillTree)>; // called at the beginning of each task
auto initLambda = [&trees, &merger, &files, &treenameInt, &isFirstEvent] (TTreeReader *r, unsigned int slot) {
if(!trees[slot]) {
// first time this thread executes something, let's create a TBufferMerger output directory
files[slot] = merger.GetFile();
} else {
files[slot]->Write();
}
trees[slot] = new TTree(treenameInt.c_str(), treenameInt.c_str());
trees[slot]->ResetBit(kMustCleanup);
if(r) {
// not an empty-source TDF
auto tree = r->GetTree();
tree->AddClone(trees[slot]);
}
isFirstEvent[slot] = 1;
};
using Op_t = TDFInternal::SnapshotHelper<decltype(initLambda), decltype(fillTree)>;
using DFA_t = TDFInternal::TAction<Op_t, Proxied>; using DFA_t = TDFInternal::TAction<Op_t, Proxied>;
df->Book(std::make_shared<DFA_t>(Op_t(std::move(fillTree)), bnames, *fProxiedPtr)); df->Book(std::make_shared<DFA_t>(Op_t(std::move(initLambda), std::move(fillTree)), bnames, *fProxiedPtr));
fProxiedPtr->IncrChildrenCount(); fProxiedPtr->IncrChildrenCount();
df->Run(); df->Run();
for (auto &&file : files) file->Write(); for (auto &&file : files) file->Write();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment