From 7ed06c6a166ef7c57a8d5a165c2347e6f168d01a Mon Sep 17 00:00:00 2001
From: Danilo Piparo <danilo.piparo@cern.ch>
Date: Fri, 12 Feb 2016 16:47:11 +0100
Subject: [PATCH] Upgrade and refactor the TThreadedObject class and tutorial

o Properly put in the ROOT directory the TThreadedObject header
o Remove the ThreadIndex header as not necessary anymore
o Refactor and extensively document the TThreadedObject class
  - Better handling of size of slot array
  - Correct handling of pointers
  - Add the Get and GetAtSlotUnchecked methods
  - Move the assigning of slot index from an external helper to the class itself
o Update the tutorial and its documentation
---
 core/thread/CMakeLists.txt                    |   4 +-
 core/thread/inc/{ => ROOT}/TThreadedObject.h  | 117 +++++++++++++-----
 core/thread/inc/ThreadIndex.h                 |  49 --------
 tutorials/multicore/mt201_parallelHistoFill.C |   9 +-
 4 files changed, 95 insertions(+), 84 deletions(-)
 rename core/thread/inc/{ => ROOT}/TThreadedObject.h (52%)
 delete mode 100644 core/thread/inc/ThreadIndex.h

diff --git a/core/thread/CMakeLists.txt b/core/thread/CMakeLists.txt
index abd61e280e2..ff1cd9cccc8 100644
--- a/core/thread/CMakeLists.txt
+++ b/core/thread/CMakeLists.txt
@@ -4,8 +4,8 @@
 
 set(headers TCondition.h TConditionImp.h TMutex.h TMutexImp.h
             TRWLock.h TSemaphore.h TThread.h TThreadFactory.h
-            TThreadImp.h TAtomicCount.h TThreadedObject.h
-            TThreadPool.h ThreadIndex.h ThreadLocalStorage.h)
+            TThreadImp.h TAtomicCount.h ROOT/TThreadedObject.h
+            TThreadPool.h ThreadLocalStorage.h)
 if(NOT WIN32)
   set(headers ${headers} TPosixCondition.h TPosixMutex.h
                          TPosixThread.h TPosixThreadFactory.h PosixThreadInc.h)
diff --git a/core/thread/inc/TThreadedObject.h b/core/thread/inc/ROOT/TThreadedObject.h
similarity index 52%
rename from core/thread/inc/TThreadedObject.h
rename to core/thread/inc/ROOT/TThreadedObject.h
index cf998f46ecb..8a77a326e30 100644
--- a/core/thread/inc/TThreadedObject.h
+++ b/core/thread/inc/ROOT/TThreadedObject.h
@@ -1,15 +1,6 @@
-// Author: Danilo Piparo, CERN  11/2/2015
-
-#ifndef ROOT_TTHREADEDOBJECT
-#define ROOT_TTHREADEDOBJECT
-
-#include <vector>
-#include <functional>
-#include <memory>
-
-#ifndef ROOT_ThreadIndex
-#include "ThreadIndex.h"
-#endif
+// Author: Danilo Piparo, CERN 11/2/2015
+#ifndef ROOT_TThreadedObject
+#define ROOT_TThreadedObject
 
 #ifndef ROOT_TList
 #include "TList.h"
@@ -19,6 +10,13 @@
 #include "TError.h"
 #endif
 
+#include <functional>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
+
 namespace ROOT {
 
    namespace Internal {
@@ -62,28 +60,37 @@ namespace ROOT {
    } // end of namespace TThreadedObjectUtils
 
    /**
-    \class ROOT::TThreadedObject
-    \brief A wrapper to make object instances thread private, lazily.
-    \tparam T Class of the object to be made thread private (e.g. TH1F)
-    \ingroup Multicore
-
-    A wrapper which makes objects thread private. The methods of the underlying
-    object can be invoked via the the arrow operator. The object is created in
-    a specific thread lazily, i.e. upon invocation of one of its methods.
+    * \class ROOT::TThreadedObject
+    * \brief A wrapper to make object instances thread private, lazily.
+    * \tparam T Class of the object to be made thread private (e.g. TH1F)
+    * \ingroup Multicore
+    *
+    * A wrapper which makes objects thread private. The methods of the underlying
+    * object can be invoked via the the arrow operator. The object is created in
+    * a specific thread lazily, i.e. upon invocation of one of its methods.
+    * The correct object pointer from within a particular thread can be accessed
+    * with the overloaded arrow operator or with the Get method.
+    * In case an elaborate thread management is in place, e.g. in presence of
+    * stream of operations or "processing slots", it is also possible to
+    * manually select the correct object pointer explicitly.
     */
    template<class T>
    class TThreadedObject {
    public:
+      static unsigned gMaxSlots; ///< The maximum number of processing slots (distinct threads) which the instances can manage
       /// Construct the TThreaded object and the "model" of the thread private
       /// objects.
+      /// \tparam ARGS Arguments of the constructor of T
       template<class ...ARGS>
       TThreadedObject(ARGS... args):
-      fModel(std::forward<ARGS>(args)...), fObjPointers(fMaxSlots, nullptr)  {};
+      fModel(std::forward<ARGS>(args)...), fObjPointers(gMaxSlots, nullptr) {};
 
-      /// Access a particular slot which corresponds to a single thread.
+      /// Access a particular processing slot. This
+      /// method is *thread-unsafe*: it cannot be invoked from two different
+      /// threads with the same argument.
       std::shared_ptr<T> GetAtSlot(unsigned i)
       {
-         if ( i >= fMaxSlots) {
+         if ( i >= fObjPointers.size()) {
             Warning("TThreadedObject::Merge", "Maximum number of slots reached.");
             return nullptr;
          }
@@ -95,10 +102,39 @@ namespace ROOT {
          return objPointer;
       }
 
-      /// Access the wrapped object and allow to call its methods
+      /// Access a particular slot which corresponds to a single thread.
+      /// This is in general faster than the GetAtSlot method but it is
+      /// responsibility of the caller to make sure that an object is
+      /// initialised for the particular slot.
+      std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
+      {
+         return fObjPointers[i];
+      }
+
+      /// Access the pointer corresponding to the current slot. This method is
+      /// not adequate for being called inside tight loops as it implies a
+      /// lookup in a mapping between the threadIDs and the slot indices.
+      /// A good practice consists in copying the pointer onto the stack and
+      /// proceed with the loop as shown in this work item (psudo-code) which
+      /// will be sent to different threads:
+      /// ~~~{.cpp}
+      /// auto workItem = [](){
+      ///    auto objPtr = tthreadedObject.GetAtThisSlot();
+      ///    for (auto i : ROOT::TSeqI(1000)) {
+      ///       // tthreadedObject->FastMethod(i); // don't do this! Inefficient!
+      ///       objPtr->FastMethod(i);
+      ///    }
+      /// }
+      /// ~~~
+      std::shared_ptr<T> Get()
+      {
+         return GetAtSlot(GetThisSlotNumber());
+      }
+
+      /// Access the wrapped object and allow to call its methods.
       T *operator->()
       {
-         return GetAtSlot(fThrIndexer.GetThreadIndex()).get();
+         return Get().get();
       }
 
       /// Merge all the thread private objects. Can be called once: it does not
@@ -127,20 +163,37 @@ namespace ROOT {
             return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(*fObjPointers[0].get()));
          }
          auto targetPtr = Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel);
-         std::shared_ptr<T> targetPtrShared (targetPtr, [](T*){});
+         std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
          mergeFunction(targetPtrShared, fObjPointers);
          return std::unique_ptr<T>(targetPtr);
       }
 
    private:
-      static const unsigned fMaxSlots = 128;
-      const T fModel;                               ///< Use to store a "model" of the object
-      std::vector<std::shared_ptr<T>> fObjPointers; ///< A pointer per thread is kept.
-      ROOT::Internal::ThreadIndexer fThrIndexer;    ///< Get the slot index for the threads
-      bool fIsMerged = false;                       ///< Remember if the objects have been merged already
-   };
+      const T fModel;                                    ///< Use to store a "model" of the object
+      std::vector<std::shared_ptr<T>> fObjPointers;      ///< A pointer per thread is kept.
+      std::map<std::thread::id, unsigned> fThrIDSlotMap; ///< A mapping between the thread IDs and the slots
+      unsigned fCurrMaxSlotIndex = 0;                    ///< The maximum slot index
+      bool fIsMerged = false;                            ///< Remember if the objects have been merged already
+      std::mutex* fThrIDSlotMutex = new std::mutex;      ///< Mutex to protect the ID-slot map access
+
+      /// Get the slot number for this threadID.
+      unsigned GetThisSlotNumber()
+      {
+         const auto thisThreadID = std::this_thread::get_id();
+         unsigned thisIndex;
+         {
+            std::lock_guard<std::mutex> lg(*fThrIDSlotMutex);
+            auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
+            if (thisSlotNumIt != fThrIDSlotMap.end()) return thisSlotNumIt->second;
+            thisIndex = fCurrMaxSlotIndex++;
+            fThrIDSlotMap[thisThreadID] = thisIndex;
+         }
+         return thisIndex;
+      }
 
+   };
 
+   template<class T> unsigned TThreadedObject<T>::gMaxSlots = 64;
 
    ////////////////////////////////////////////////////////////////////////////////
    /// Obtain a TThreadedObject instance
diff --git a/core/thread/inc/ThreadIndex.h b/core/thread/inc/ThreadIndex.h
deleted file mode 100644
index ccf5e1940e2..00000000000
--- a/core/thread/inc/ThreadIndex.h
+++ /dev/null
@@ -1,49 +0,0 @@
-#ifndef ROOT_ThreadIndex
-#define ROOT_ThreadIndex
-
-#ifndef ROOT_ThreadLocalStorage
-#include "ThreadLocalStorage.h"
-#endif
-
-#include <map>
-#include <mutex>
-#include <thread>
-#include <climits>
-
-namespace ROOT {
-
-   namespace Internal {
-
-      /// Get the index of the current thread
-      class ThreadIndexer {
-      private:
-         unsigned fThreadIndex = 0U;
-         std::mutex *fThreadIndexerMutexPtr = new std::mutex();
-         std::map<std::thread::id, unsigned> fIDMap;
-      public:
-         unsigned GetThreadIndex() {
-            TTHREAD_TLS_DECL_ARG(unsigned, gThisThreadIndex, UINT_MAX);
-            std::lock_guard<std::mutex> guard(*fThreadIndexerMutexPtr);
-
-            if (UINT_MAX != gThisThreadIndex) {
-               return gThisThreadIndex;
-            }
-
-            const auto id =  std::this_thread::get_id();
-
-            auto keyValIt = fIDMap.find(id);
-            if (keyValIt != fIDMap.end()) {
-               gThisThreadIndex = keyValIt->second;
-               return gThisThreadIndex;
-            }
-            gThisThreadIndex = fThreadIndex++;
-            fIDMap.emplace(std::make_pair(id, gThisThreadIndex));
-            return gThisThreadIndex;
-         }
-      };
-
-
-   }
-}
-
-#endif
diff --git a/tutorials/multicore/mt201_parallelHistoFill.C b/tutorials/multicore/mt201_parallelHistoFill.C
index 040a4a95f48..232de0e41a0 100644
--- a/tutorials/multicore/mt201_parallelHistoFill.C
+++ b/tutorials/multicore/mt201_parallelHistoFill.C
@@ -40,8 +40,15 @@ Int_t mt201_parallelHistoFill(UInt_t poolSize = 4)
    // The function used to fill the histograms in each thread.
    auto fillRandomHisto = [&](int seed = 0) {
       TRandom3 rndm(seed);
+      // IMPORTANT!
+      // It is important to realise that a copy on the stack of the object we
+      // would like to perform operations on is the most efficient way of
+      // accessing it, in particular in presence of a tight loop like the one
+      // below where any overhead put on top of the Fill function call would
+      // have an impact.
+      auto histogram = ts_h->Get()
       for (auto i : ROOT::TSeqI(1000000)) {
-         ts_h->Fill(rndm.Gaus(0, 1));
+         histogram->Fill(rndm.Gaus(0, 1));
       }
    };
 
-- 
GitLab