From 0dfecd1fe5fe1ea764816ba22cdd7ed647e335d9 Mon Sep 17 00:00:00 2001
From: Enrico Guiraud <enrico.guiraud@cern.ch>
Date: Fri, 2 Jun 2017 17:31:44 +0200
Subject: [PATCH] [TDF] Fix Snapshot not handling multiple input files
 correctly

Once per Snapshot call, each thread creates a TBufferMergerFile.
Once per TBB task (lifetime of an input TChain) each thread creates
a new output TTree, assigns branches to it and adds it as a clone
to the input TChain.
---
 tree/treeplayer/inc/ROOT/TDFActionHelpers.hxx | 13 +++---
 tree/treeplayer/inc/ROOT/TDFInterface.hxx     | 45 +++++++++++++++----
 2 files changed, 43 insertions(+), 15 deletions(-)

diff --git a/tree/treeplayer/inc/ROOT/TDFActionHelpers.hxx b/tree/treeplayer/inc/ROOT/TDFActionHelpers.hxx
index 441bb6b514c..ee1b18ab48c 100644
--- a/tree/treeplayer/inc/ROOT/TDFActionHelpers.hxx
+++ b/tree/treeplayer/inc/ROOT/TDFActionHelpers.hxx
@@ -416,22 +416,23 @@ extern template void MeanHelper::Exec(unsigned int, const std::vector<char> &);
 extern template void MeanHelper::Exec(unsigned int, const std::vector<int> &);
 extern template void MeanHelper::Exec(unsigned int, const std::vector<unsigned int> &);
 
-template <typename F>
+template <typename F1, typename F2>
 class SnapshotHelper {
-   F fCallable;
+   F1 fInitFunc;
+   F2 fExecFunc;
 
 public:
-   using BranchTypes_t = typename TRemoveFirst<typename TFunctionTraits<F>::Args_t>::Types_t;
-   SnapshotHelper(F &&f) : fCallable(f) {}
+   using BranchTypes_t = typename TRemoveFirst<typename TFunctionTraits<F2>::Args_t>::Types_t;
+   SnapshotHelper(F1 &&f1, F2 &&f2) : fInitFunc(f1), fExecFunc(f2) {}
 
-   void Init(TTreeReader*) { /* TODO */ }
+   void Init(TTreeReader *r, unsigned int slot) { fInitFunc(r, slot); }
 
    template <typename... Args>
    void Exec(unsigned int slot, Args &&... args)
    {
       // check that the decayed types of Args are the same as the branch types
       static_assert(std::is_same<TTypeList<typename std::decay<Args>::type...>, BranchTypes_t>::value, "");
-      fCallable(slot, std::forward<Args>(args)...);
+      fExecFunc(slot, std::forward<Args>(args)...);
    }
 
    void Finalize() { /* noop */}
diff --git a/tree/treeplayer/inc/ROOT/TDFInterface.hxx b/tree/treeplayer/inc/ROOT/TDFInterface.hxx
index 35179448174..0060aede8ad 100644
--- a/tree/treeplayer/inc/ROOT/TDFInterface.hxx
+++ b/tree/treeplayer/inc/ROOT/TDFInterface.hxx
@@ -26,6 +26,7 @@
 #include "TProfile2D.h" // For Histo actions
 #include "TRegexp.h"
 #include "TROOT.h" // IsImplicitMTEnabled
+#include "TTreeReader.h"
 
 #include <initializer_list>
 #include <memory>
@@ -1090,6 +1091,7 @@ protected:
          TTree t(treenameInt.c_str(), treenameInt.c_str());
 
          bool FirstEvent = true;
+         // TODO move fillTree and initLambda to SnapshotHelper's body
          auto fillTree = [&t, &bnames, &FirstEvent](unsigned int /* unused */, Args &... args) {
             if (FirstEvent) {
                // hack to call TTree::Branch on all variadic template arguments
@@ -1100,9 +1102,17 @@ protected:
             t.Fill();
          };
 
-         using Op_t = TDFInternal::SnapshotHelper<decltype(fillTree)>;
+         auto initLambda = [&t] (TTreeReader *r, unsigned int slot) {
+            if(r) {
+               // not an empty-source TDF
+               auto tree = r->GetTree();
+               tree->AddClone(&t);
+            }
+         };
+
+         using Op_t = TDFInternal::SnapshotHelper<decltype(initLambda), decltype(fillTree)>;
          using DFA_t = TDFInternal::TAction<Op_t, Proxied>;
-         df->Book(std::make_shared<DFA_t>(Op_t(std::move(fillTree)), bnames, *fProxiedPtr));
+         df->Book(std::make_shared<DFA_t>(Op_t(std::move(initLambda), std::move(fillTree)), bnames, *fProxiedPtr));
          fProxiedPtr->IncrChildrenCount();
          df->Run();
          t.Write();
@@ -1110,16 +1120,15 @@ protected:
          unsigned int nSlots = df->GetNSlots();
          TBufferMerger merger(filenameInt.c_str(), "RECREATE");
          std::vector<std::shared_ptr<TBufferMergerFile>> files(nSlots);
-         std::vector<TTree *> trees(nSlots);
+         std::vector<TTree *> trees(nSlots); // ROOT owns/manages these TTrees
+         std::vector<int> isFirstEvent(nSlots, 1); // vector<bool> is evil
 
          auto fillTree = [&](unsigned int slot, Args &... args) {
-            if (!trees[slot]) {
-               files[slot] = merger.GetFile();
-               trees[slot] = new TTree(treenameInt.c_str(), treenameInt.c_str());
-               trees[slot]->ResetBit(kMustCleanup);
+            if (isFirstEvent[slot]) {
                // hack to call TTree::Branch on all variadic template arguments
                std::initializer_list<int> expander = {(trees[slot]->Branch(bnames[S].c_str(), &args), 0)..., 0};
                (void)expander; // avoid unused variable warnings for older compilers such as gcc 4.9
+               isFirstEvent[slot] = 0;
             }
             trees[slot]->Fill();
             auto entries = trees[slot]->GetEntries();
@@ -1127,9 +1136,27 @@ protected:
             if ((autoflush > 0) && (entries % autoflush == 0)) files[slot]->Write();
          };
 
-         using Op_t = TDFInternal::SnapshotHelper<decltype(fillTree)>;
+         // called at the beginning of each task
+         auto initLambda = [&trees, &merger, &files, &treenameInt, &isFirstEvent] (TTreeReader *r, unsigned int slot) {
+            if(!trees[slot]) {
+               // first time this thread executes something, let's create a TBufferMerger output directory
+               files[slot] = merger.GetFile();
+            } else {
+               files[slot]->Write();
+            }
+            trees[slot] = new TTree(treenameInt.c_str(), treenameInt.c_str());
+            trees[slot]->ResetBit(kMustCleanup);
+            if(r) {
+               // not an empty-source TDF
+               auto tree = r->GetTree();
+               tree->AddClone(trees[slot]);
+            }
+            isFirstEvent[slot] = 1;
+         };
+
+         using Op_t = TDFInternal::SnapshotHelper<decltype(initLambda), decltype(fillTree)>;
          using DFA_t = TDFInternal::TAction<Op_t, Proxied>;
-         df->Book(std::make_shared<DFA_t>(Op_t(std::move(fillTree)), bnames, *fProxiedPtr));
+         df->Book(std::make_shared<DFA_t>(Op_t(std::move(initLambda), std::move(fillTree)), bnames, *fProxiedPtr));
          fProxiedPtr->IncrChildrenCount();
          df->Run();
          for (auto &&file : files) file->Write();
-- 
GitLab