From e1d0fce9450479dc1af872794f5e3c8664a705a4 Mon Sep 17 00:00:00 2001
From: Enrico Guiraud <enrico.guiraud@cern.ch>
Date: Fri, 4 Aug 2017 17:31:28 +0200
Subject: [PATCH] [TDF] Let TColumnValue support interleaved task execution

Each TDF node keeps a thread-local tuple of TColumnValues. TColumnValues
read the values of a column for a certain entry, but entry ranges and
TTreeReader-related objects are _task_ local.

Before this change, in case of interleaved task execution, the second
task starting in a thread would re-initialize the TColumnValues and
 overwrite the previous task's settings.

Now TColumnValues deal with multiple initializations by keeping their
internal state in a stack: when a new task initializes a TColumnValue
it pushes the required values in the stack, when a task completes
execution it pops its values from the stack.

Note that interleaved TBB tasks always follow a last-to-start,
first-to-finish pattern, which allows us to use a LIFO structure for
TColumnValue.
---
 tree/treeplayer/inc/ROOT/TDFNodes.hxx | 64 ++++++++++++++++-----------
 1 file changed, 37 insertions(+), 27 deletions(-)

diff --git a/tree/treeplayer/inc/ROOT/TDFNodes.hxx b/tree/treeplayer/inc/ROOT/TDFNodes.hxx
index 011e7e0e85d..7a2e6bff9e8 100644
--- a/tree/treeplayer/inc/ROOT/TDFNodes.hxx
+++ b/tree/treeplayer/inc/ROOT/TDFNodes.hxx
@@ -149,14 +149,21 @@ class TColumnValue {
    // ReaderValueOrArray_t is a TTreeReaderValue<T> unless T is array_view<U>
    using ProxyParam_t = typename std::conditional<std::is_same<ReaderValueOrArray_t<T>, TTreeReaderValue<T>>::value, T,
                                                   TakeFirstParameter_t<T>>::type;
-   std::unique_ptr<TTreeReaderValue<T>> fReaderValue{nullptr}; //< Owning ptr to a TTreeReaderValue. Used for
-                                                               /// non-temporary columns and T != std::array_view<U>
-   std::unique_ptr<TTreeReaderArray<ProxyParam_t>> fReaderArray{nullptr}; //< Owning ptr to a TTreeReaderArray. Used for
-                                                                          /// non-temporary columsn and
-                                                                          /// T == std::array_view<U>.
-   T *fValuePtr{nullptr};                  //< Non-owning ptr to the value of a temporary column.
-   TCustomColumnBase *fTmpColumn{nullptr}; //< Non-owning ptr to the node responsible for the temporary column.
-   unsigned int fSlot{0}; //< The slot this value belongs to. Only used for temporary columns, not for real branches.
+
+   // Each element of the following data members will be in use by a _single task_.
+   // The vectors are used as very small stacks (1-2 elements typically) that fill in case of interleaved task execution
+   // i.e. when more than one task needs readers in this worker thread.
+
+   /// Owning ptrs to a TTreeReaderValue. Used for non-temporary columns when T != std::array_view<U>
+   std::vector<std::unique_ptr<TTreeReaderValue<T>>> fReaderValues;
+   /// Owning ptrs to a TTreeReaderArray. Used for non-temporary columns when T == std::array_view<U>.
+   std::vector<std::unique_ptr<TTreeReaderArray<ProxyParam_t>>> fReaderArrays;
+   /// Non-owning ptrs to the value of a custom column.
+   std::vector<T *> fCustomValuePtrs;
+   /// Non-owning ptrs to the node responsible for the custom column. Needed when querying custom values.
+   std::vector<TCustomColumnBase *> fCustomColumns;
+   /// The slot this value belongs to. Needed when querying custom column values.
+   unsigned int fSlot;
 
 public:
    TColumnValue() = default;
@@ -167,9 +174,9 @@ public:
    {
       bool useReaderValue = std::is_same<ProxyParam_t, T>::value;
       if (useReaderValue)
-         fReaderValue.reset(new TTreeReaderValue<T>(*r, bn.c_str()));
+         fReaderValues.emplace_back(new TTreeReaderValue<T>(*r, bn.c_str()));
       else
-         fReaderArray.reset(new TTreeReaderArray<ProxyParam_t>(*r, bn.c_str()));
+         fReaderArrays.emplace_back(new TTreeReaderArray<ProxyParam_t>(*r, bn.c_str()));
    }
 
    template <typename U = T,
@@ -179,25 +186,28 @@ public:
    template <typename U = T, typename std::enable_if<!std::is_same<ProxyParam_t, U>::value, int>::type = 0>
    std::array_view<ProxyParam_t> Get(Long64_t)
    {
-      auto &readerArray = *fReaderArray;
+      auto &readerArray = *fReaderArrays.back();
       if (readerArray.GetSize() > 1 && 1 != (&readerArray[1] - &readerArray[0])) {
          std::string exceptionText = "Branch ";
-         exceptionText += fReaderArray->GetBranchName();
+         exceptionText += readerArray.GetBranchName();
          exceptionText += " hangs from a non-split branch. For this reason, it cannot be accessed via an array_view."
                           " Please read the top level branch instead.";
          throw std::runtime_error(exceptionText);
       }
 
-      return std::array_view<ProxyParam_t>(fReaderArray->begin(), fReaderArray->end());
+      return std::array_view<ProxyParam_t>(readerArray.begin(), readerArray.end());
    }
 
    void Reset()
    {
-      fReaderValue = nullptr;
-      fReaderArray = nullptr;
-      fValuePtr = nullptr;
-      fTmpColumn = nullptr;
-      fSlot = 0;
+      if (!fReaderValues.empty()) // we must be using TTreeReaderValues
+         fReaderValues.pop_back();
+      else if (!fReaderArrays.empty()) // we must be using TTreeReaderArrays
+         fReaderArrays.pop_back();
+      else { // we must be using a custom column
+         fCustomValuePtrs.pop_back();
+         fCustomColumns.pop_back();
+      }
    }
 };
 
@@ -666,13 +676,13 @@ public:
 // method implementations
 template <typename T>
 void ROOT::Internal::TDF::TColumnValue<T>::SetTmpColumn(unsigned int slot,
-                                                        ROOT::Detail::TDF::TCustomColumnBase *tmpColumn)
+                                                        ROOT::Detail::TDF::TCustomColumnBase *customColumn)
 {
-   fTmpColumn = tmpColumn;
-   if (tmpColumn->GetTypeId() != typeid(T))
+   fCustomColumns.emplace_back(customColumn);
+   if (customColumn->GetTypeId() != typeid(T))
       throw std::runtime_error(std::string("TColumnValue: type specified is ") + typeid(T).name() +
-                               " but temporary column has type " + tmpColumn->GetTypeId().name());
-   fValuePtr = static_cast<T *>(tmpColumn->GetValuePtr(slot));
+                               " but temporary column has type " + customColumn->GetTypeId().name());
+   fCustomValuePtrs.emplace_back(static_cast<T *>(customColumn->GetValuePtr(slot)));
    fSlot = slot;
 }
 
@@ -686,11 +696,11 @@ template <typename U,
                                   int>::type>
 T &ROOT::Internal::TDF::TColumnValue<T>::Get(Long64_t entry)
 {
-   if (fReaderValue) {
-      return *(fReaderValue->Get());
+   if (!fReaderValues.empty()) {
+      return *(fReaderValues.back()->Get());
    } else {
-      fTmpColumn->Update(fSlot, entry);
-      return *fValuePtr;
+      fCustomColumns.back()->Update(fSlot, entry);
+      return *fCustomValuePtrs.back();
    }
 }
 
-- 
GitLab