Skip to content
Snippets Groups Projects
Commit 506ee8a1 authored by Danilo Piparo's avatar Danilo Piparo
Browse files

TThreadedObject: allow to use thread unsafe objects from different threads

o CMakeLists modify to take into account the new files
o Factorise the finding of the thread index and the TThreadedObject class
  - TBB will be easily pluggable if required
o Tutorial which shows how to fill histograms in parallel from multiple threads
  - "Symmetric" to mp201 where the TMultiProc class is used
parent 2c98db3d
No related branches found
No related tags found
No related merge requests found
...@@ -4,7 +4,8 @@ ...@@ -4,7 +4,8 @@
set(headers TCondition.h TConditionImp.h TMutex.h TMutexImp.h set(headers TCondition.h TConditionImp.h TMutex.h TMutexImp.h
TRWLock.h TSemaphore.h TThread.h TThreadFactory.h TRWLock.h TSemaphore.h TThread.h TThreadFactory.h
TThreadImp.h TAtomicCount.h TThreadPool.h ThreadLocalStorage.h) TThreadImp.h TAtomicCount.h TThreadedObject.h
TThreadPool.h ThreadIndex.h ThreadLocalStorage.h)
if(NOT WIN32) if(NOT WIN32)
set(headers ${headers} TPosixCondition.h TPosixMutex.h set(headers ${headers} TPosixCondition.h TPosixMutex.h
TPosixThread.h TPosixThreadFactory.h PosixThreadInc.h) TPosixThread.h TPosixThreadFactory.h PosixThreadInc.h)
......
#ifndef ROOT_TTHREADEDOBJECT
#define ROOT_TTHREADEDOBJECT
#include <vector>
#include <functional>
#include <memory>
#ifndef ROOT_ThreadIndex
#include "ThreadIndex.h"
#endif
#ifndef ROOT_TList
#include "TList.h"
#endif
#ifndef ROOT_TError
#include "TError.h"
#endif
namespace ROOT {
namespace Internal {
namespace TThreadedObjectUtils {
/// Return a copy of the object or a "Clone" if the copy constructor is not implemented.
template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
struct Cloner {
static T *Clone(const T &obj) {
return new T(obj);
}
};
template<class T>
struct Cloner<T, false> {
static T *Clone(const T &obj) {
return (T*)obj.Clone();
}
};
} // End of namespace TThreadedObjectUtils
} // End of namespace Internals
namespace TThreadedObjectUtils {
template<class T>
using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
/// Merge TObjects
template<class T>
void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
{
if (!target) return;
TList objTList;
// Cannot do better than this
for (auto obj : objs) {
if (obj && obj != target) objTList.Add(obj.get());
}
target->Merge(&objTList);
}
} // end of namespace TThreadedObjectUtils
/**
\class ROOT::TThreadedObject
\brief A wrapper to make object instances thread private, lazily.
\tparam T Class of the object to be made thread private (e.g. TH1F)
\ingroup Multicore
A wrapper which makes objects thread private. The methods of the underlying
object can be invoked via the the arrow operator. The object is created in
a specific thread lazily, i.e. upon invocation of one of its methods.
*/
template<class T>
class TThreadedObject {
public:
/// Construct the TThreaded object and the "model" of the thread private
/// objects.
template<class ...ARGS>
TThreadedObject(ARGS... args):
fModel(std::forward<ARGS>(args)...), fObjPointers(fMaxSlots, nullptr) {};
/// Access a particular slot which corresponds to a single thread.
std::shared_ptr<T> GetAtSlot(unsigned i)
{
if ( i >= fMaxSlots) {
Warning("TThreadedObject::Merge", "Maximum number of slots reached.");
return nullptr;
}
auto objPointer = fObjPointers[i];
if (!objPointer) {
objPointer.reset(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel));
fObjPointers[i] = objPointer;
}
return objPointer;
}
/// Access the wrapped object and allow to call its methods
T *operator->()
{
return GetAtSlot(fThrIndexer.GetThreadIndex()).get();
}
/// Merge all the thread private objects. Can be called once: it does not
/// create any new object but destroys the present bookkeping.
std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
{
// We do not return if we already merged.
if (fIsMerged) {
Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
return fObjPointers[0];
}
mergeFunction(fObjPointers[0], fObjPointers);
fIsMerged = true;
return fObjPointers[0];
}
/// Merge all the thread private objects. Can be called many times. It
/// does create a new instance of class T to represent the "Sum" object.
/// This method is not thread safe: correct or acceptable behaviours
/// depend on the nature of T and of the merging function.
std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
{
if (fIsMerged) {
Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(*fObjPointers[0].get()));
}
auto targetPtr = Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel);
std::shared_ptr<T> targetPtrShared (targetPtr, [](T*){});
mergeFunction(targetPtrShared, fObjPointers);
return std::unique_ptr<T>(targetPtr);
}
private:
static const unsigned fMaxSlots = 128;
const T fModel; ///< Use to store a "model" of the object
std::vector<std::shared_ptr<T>> fObjPointers; ///< A pointer per thread is kept.
ROOT::Internal::ThreadIndexer fThrIndexer; ///< Get the slot index for the threads
bool fIsMerged = false; ///< Remember if the objects have been merged already
};
////////////////////////////////////////////////////////////////////////////////
/// Obtain a TThreadedObject instance
/// \tparam T Class of the object to be made thread private (e.g. TH1F)
/// \tparam ARGS Arguments of the constructor
template<class T, class ...ARGS>
TThreadedObject<T> MakeThreaded(ARGS &&... args)
{
return TThreadedObject<T>(std::forward<ARGS>(args)...);
}
} // End ROOT namespace
#endif
#ifndef ROOT_ThreadIndex
#define ROOT_ThreadIndex
#ifndef ROOT_ThreadLocalStorage
#include "ThreadLocalStorage.h"
#endif
#include <map>
#include <mutex>
#include <thread>
#include <climits>
namespace ROOT {
namespace Internal {
/// Get the index of the current thread
class ThreadIndexer {
private:
unsigned fThreadIndex = 0U;
std::mutex *fThreadIndexerMutexPtr = new std::mutex();
std::map<std::thread::id, unsigned> fIDMap;
public:
unsigned GetThreadIndex() {
TTHREAD_TLS_DECL_ARG(unsigned, gThisThreadIndex, UINT_MAX);
std::lock_guard<std::mutex> guard(*fThreadIndexerMutexPtr);
if (UINT_MAX != gThisThreadIndex) {
return gThisThreadIndex;
}
const auto id = std::this_thread::get_id();
auto keyValIt = fIDMap.find(id);
if (keyValIt != fIDMap.end()) {
gThisThreadIndex = keyValIt->second;
return gThisThreadIndex;
}
gThisThreadIndex = fThreadIndex++;
fIDMap.emplace(std::make_pair(id, gThisThreadIndex));
return gThisThreadIndex;
}
};
}
}
#endif
#include "ThreadIndex.h"
#ifndef ROOT_ThreadLocalStorage
#include "ThreadLocalStorage.h"
#endif
namespace ROOT {
namespace Internal {
// A small generator of thread indices.
// If not already available in the local storage, a map is queried for it
// and the cache is built.
unsigned ThreadIndexer::GetThreadIndex()
{
TTHREAD_TLS_DECL_ARG(unsigned, gThisThreadIndex, UINT_MAX);
std::lock_guard<std::mutex> guard(*fThreadIndexerMutexPtr);
if (UINT_MAX != gThisThreadIndex) {
return gThisThreadIndex;
}
const auto id = std::this_thread::get_id();
auto keyValIt = fIDMap.find(id);
if (keyValIt != fIDMap.end()) {
gThisThreadIndex = keyValIt->second;
return gThisThreadIndex;
}
gThisThreadIndex = fThreadIndex++;
fIDMap.emplace(std::make_pair(id, gThisThreadIndex));
return gThisThreadIndex;
}
}
}
/// \file
/// \ingroup tutorial_multicore
/// Parallel fill of a histogram
/// This tutorial shows how a histogram can be filled in parallel
/// with a multithreaded approach. The difference with the multiprocess case,
/// see mp201, is that here we cannot count on the copy-on-write mechanism, but
/// we rather need to protect the histogram resource with a TThreadedObject
/// class. The result of the filling is monitored with the *SnapshotMerge*
/// method. This method is not thread safe: in presence of ROOT histograms, the
/// system will not crash but the result is not uniquely defined.
///
/// \macro_image
/// \macro_code
/// \author Danilo Piparo
// Measure time in a scope
class TimerRAII {
TStopwatch fTimer;
std::string fMeta;
public:
TimerRAII(const char *meta): fMeta(meta) {
fTimer.Start();
}
~TimerRAII() {
fTimer.Stop();
std::cout << fMeta << " - real time elapsed " << fTimer.RealTime() << "s" << std::endl;
}
};
#include <chrono>
Int_t mt201_parallelHistoFill(UInt_t poolSize = 4)
{
TH1::AddDirectory(false);
// The concrete histogram instances are concretely created in each thread
// lazily, i.e. only if a method is invoked.
auto ts_h = ROOT::MakeThreaded<TH1F>("myHist", "Filled in parallel", 128, -8, 8);
// The function used to fill the histograms in each thread.
auto fillRandomHisto = [&](int seed = 0) {
TRandom3 rndm(seed);
for (auto i : ROOT::TSeqI(1000000)) {
ts_h->Fill(rndm.Gaus(0, 1));
}
};
// The seeds for the random number generators.
auto seeds = ROOT::TSeqI(1, poolSize+1);
std::vector<std::thread> pool;
TimerRAII timer("Filling Histogram in parallel and drawing it.");
// A monitoring thread. This is here only to illustrate the functionality of
// the SnapshotMerge method.
// It allows "to spy" the multithreaded calculation without the need
// of interrupting it.
auto monitor = [&]() {
for (auto i : ROOT::TSeqI(5)) {
std::this_thread::sleep_for(std::chrono::duration<double, std::nano>(500));
auto h = ts_h.SnapshotMerge();
std::cout << "Entries for the snapshot " << h->GetEntries() << std::endl;
}
};
pool.emplace_back(monitor);
// The threads filling the histograms
for (auto seed : ROOT::TSeqI(seeds)) {
pool.emplace_back(fillRandomHisto, seed);
}
// Wait for the threads to finish
for (auto && t : pool) t.join();
// Merge the final result
auto sumRandomHisto = ts_h.Merge();
auto c = new TCanvas();
sumRandomHisto->DrawClone();
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment