Skip to content
Snippets Groups Projects
Commit b33c1f9a authored by Enrico Guiraud's avatar Enrico Guiraud Committed by Danilo Piparo
Browse files

[TDF] Let TDF support interleaved TBB task execution

parent edef6148
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@
#include "TInterpreter.h"
#include "TROOT.h" // IsImplicitMTEnabled
#include "TTreeReader.h"
#include <climits>
#include <cassert>
#include <mutex>
......@@ -102,6 +103,14 @@ void TFilterBase::PrintReport() const
// fixed at construction time and no blocking is foreseen.
class TSlotStack {
private:
unsigned int &GetCount() {
thread_local unsigned int count = 0U;
return count;
}
unsigned int &GetIndex() {
thread_local unsigned int index = UINT_MAX;
return index;
}
unsigned int fCursor;
std::vector<unsigned int> fBuf;
ROOT::TSpinMutex fMutex;
......@@ -109,25 +118,37 @@ private:
public:
TSlotStack() = delete;
TSlotStack(unsigned int size) : fCursor(size), fBuf(size) { std::iota(fBuf.begin(), fBuf.end(), 0U); }
void Push(unsigned int slotNumber);
unsigned int Pop();
void ReturnSlot(unsigned int slotNumber);
unsigned int GetSlot();
};
void TSlotStack::Push(unsigned int slotNumber)
{
std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
fBuf[fCursor++] = slotNumber;
assert(fCursor <= fBuf.size() && "TSlotStack assumes that at most a fixed number of values can be present in the "
"stack. fCursor is greater than the size of the internal buffer. This violates "
"such assumption.");
void TSlotStack::ReturnSlot(unsigned int slotNumber)
{
auto &index = GetIndex();
auto &count = GetCount();
assert(count > 0U && "TSlotStack has a reference count relative to an index which will become negative.");
count--;
if (0U == count) {
index = UINT_MAX;
std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
fBuf[fCursor++] = slotNumber;
assert(fCursor <= fBuf.size() && "TSlotStack assumes that at most a fixed number of values can be present in the "
"stack. fCursor is greater than the size of the internal buffer. This violates "
"such assumption.");
}
}
unsigned int TSlotStack::Pop()
unsigned int TSlotStack::GetSlot()
{
assert(fCursor > 0 &&
"TSlotStack assumes that a value can be always popped. fCursor is <=0 and this violates such assumption.");
auto &index = GetIndex();
auto &count = GetCount();
count++;
if (UINT_MAX != index) return index;
std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
return fBuf[--fCursor];
assert(fCursor > 0 &&
"TSlotStack assumes that a value can be always obtained. In this case fCursor is <=0 and this violates such assumption.");
index = fBuf[--fCursor];
return index;
}
TLoopManager::TLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
......@@ -164,13 +185,13 @@ void TLoopManager::RunEmptySourceMT()
// Each task will generate a subrange of entries
auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
auto slot = slotStack.Pop();
auto slot = slotStack.GetSlot();
InitNodeSlots(nullptr, slot);
for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
RunAndCheckFilters(slot, currEntry);
}
CleanUpTask(slot);
slotStack.Push(slot);
slotStack.ReturnSlot(slot);
};
ROOT::TThreadExecutor pool;
......@@ -198,14 +219,14 @@ void TLoopManager::RunTreeProcessorMT()
tp.reset(new ttpmt_t(*fTree));
tp->Process([this, &slotStack](TTreeReader &r) -> void {
auto slot = slotStack.Pop();
auto slot = slotStack.GetSlot();
InitNodeSlots(&r, slot);
// recursive call to check filters and conditionally execute actions
while (r.Next()) {
RunAndCheckFilters(slot, r.GetCurrentEntry());
}
CleanUpTask(slot);
slotStack.Push(slot);
slotStack.ReturnSlot(slot);
});
#endif // not implemented otherwise
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment