From b33c1f9a54c05871e2cf984116f41a26bef0408e Mon Sep 17 00:00:00 2001 From: Enrico Guiraud <enrico.guiraud@cern.ch> Date: Thu, 3 Aug 2017 13:59:04 +0200 Subject: [PATCH] [TDF] Let TDF support interleaved TBB task execution --- tree/treeplayer/src/TDFNodes.cxx | 55 ++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/tree/treeplayer/src/TDFNodes.cxx b/tree/treeplayer/src/TDFNodes.cxx index dd405b32958..cc68c74c0cc 100644 --- a/tree/treeplayer/src/TDFNodes.cxx +++ b/tree/treeplayer/src/TDFNodes.cxx @@ -19,6 +19,7 @@ #include "TInterpreter.h" #include "TROOT.h" // IsImplicitMTEnabled #include "TTreeReader.h" +#include <climits> #include <cassert> #include <mutex> @@ -102,6 +103,14 @@ void TFilterBase::PrintReport() const // fixed at construction time and no blocking is foreseen. class TSlotStack { private: + unsigned int &GetCount() { + thread_local unsigned int count = 0U; + return count; + } + unsigned int &GetIndex() { + thread_local unsigned int index = UINT_MAX; + return index; + } unsigned int fCursor; std::vector<unsigned int> fBuf; ROOT::TSpinMutex fMutex; @@ -109,25 +118,37 @@ private: public: TSlotStack() = delete; TSlotStack(unsigned int size) : fCursor(size), fBuf(size) { std::iota(fBuf.begin(), fBuf.end(), 0U); } - void Push(unsigned int slotNumber); - unsigned int Pop(); + void ReturnSlot(unsigned int slotNumber); + unsigned int GetSlot(); }; -void TSlotStack::Push(unsigned int slotNumber) -{ - std::lock_guard<ROOT::TSpinMutex> guard(fMutex); - fBuf[fCursor++] = slotNumber; - assert(fCursor <= fBuf.size() && "TSlotStack assumes that at most a fixed number of values can be present in the " - "stack. fCursor is greater than the size of the internal buffer. This violates " - "such assumption."); +void TSlotStack::ReturnSlot(unsigned int slotNumber) +{ + auto &index = GetIndex(); + auto &count = GetCount(); + assert(count > 0U && "TSlotStack has a reference count relative to an index which will become negative."); + count--; + if (0U == count) { + index = UINT_MAX; + std::lock_guard<ROOT::TSpinMutex> guard(fMutex); + fBuf[fCursor++] = slotNumber; + assert(fCursor <= fBuf.size() && "TSlotStack assumes that at most a fixed number of values can be present in the " + "stack. fCursor is greater than the size of the internal buffer. This violates " + "such assumption."); + } } -unsigned int TSlotStack::Pop() +unsigned int TSlotStack::GetSlot() { - assert(fCursor > 0 && - "TSlotStack assumes that a value can be always popped. fCursor is <=0 and this violates such assumption."); + auto &index = GetIndex(); + auto &count = GetCount(); + count++; + if (UINT_MAX != index) return index; std::lock_guard<ROOT::TSpinMutex> guard(fMutex); - return fBuf[--fCursor]; + assert(fCursor > 0 && + "TSlotStack assumes that a value can be always obtained. In this case fCursor is <=0 and this violates such assumption."); + index = fBuf[--fCursor]; + return index; } TLoopManager::TLoopManager(TTree *tree, const ColumnNames_t &defaultBranches) @@ -164,13 +185,13 @@ void TLoopManager::RunEmptySourceMT() // Each task will generate a subrange of entries auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) { - auto slot = slotStack.Pop(); + auto slot = slotStack.GetSlot(); InitNodeSlots(nullptr, slot); for (auto currEntry = range.first; currEntry < range.second; ++currEntry) { RunAndCheckFilters(slot, currEntry); } CleanUpTask(slot); - slotStack.Push(slot); + slotStack.ReturnSlot(slot); }; ROOT::TThreadExecutor pool; @@ -198,14 +219,14 @@ void TLoopManager::RunTreeProcessorMT() tp.reset(new ttpmt_t(*fTree)); tp->Process([this, &slotStack](TTreeReader &r) -> void { - auto slot = slotStack.Pop(); + auto slot = slotStack.GetSlot(); InitNodeSlots(&r, slot); // recursive call to check filters and conditionally execute actions while (r.Next()) { RunAndCheckFilters(slot, r.GetCurrentEntry()); } CleanUpTask(slot); - slotStack.Push(slot); + slotStack.ReturnSlot(slot); }); #endif // not implemented otherwise } -- GitLab