Skip to content
Snippets Groups Projects
Commit e1d0fce9 authored by Enrico Guiraud's avatar Enrico Guiraud Committed by Danilo Piparo
Browse files

[TDF] Let TColumnValue support interleaved task execution

Each TDF node keeps a thread-local tuple of TColumnValues. TColumnValues
read the values of a column for a certain entry, but entry ranges and
TTreeReader-related objects are _task_ local.

Before this change, in case of interleaved task execution, the second
task starting in a thread would re-initialize the TColumnValues and
 overwrite the previous task's settings.

Now TColumnValues deal with multiple initializations by keeping their
internal state in a stack: when a new task initializes a TColumnValue
it pushes the required values in the stack, when a task completes
execution it pops its values from the stack.

Note that interleaved TBB tasks always follow a last-to-start,
first-to-finish pattern, which allows us to use a LIFO structure for
TColumnValue.
parent 26e8ace6
No related branches found
No related tags found
No related merge requests found
...@@ -149,14 +149,21 @@ class TColumnValue { ...@@ -149,14 +149,21 @@ class TColumnValue {
// ReaderValueOrArray_t is a TTreeReaderValue<T> unless T is array_view<U> // ReaderValueOrArray_t is a TTreeReaderValue<T> unless T is array_view<U>
using ProxyParam_t = typename std::conditional<std::is_same<ReaderValueOrArray_t<T>, TTreeReaderValue<T>>::value, T, using ProxyParam_t = typename std::conditional<std::is_same<ReaderValueOrArray_t<T>, TTreeReaderValue<T>>::value, T,
TakeFirstParameter_t<T>>::type; TakeFirstParameter_t<T>>::type;
std::unique_ptr<TTreeReaderValue<T>> fReaderValue{nullptr}; //< Owning ptr to a TTreeReaderValue. Used for
/// non-temporary columns and T != std::array_view<U> // Each element of the following data members will be in use by a _single task_.
std::unique_ptr<TTreeReaderArray<ProxyParam_t>> fReaderArray{nullptr}; //< Owning ptr to a TTreeReaderArray. Used for // The vectors are used as very small stacks (1-2 elements typically) that fill in case of interleaved task execution
/// non-temporary columsn and // i.e. when more than one task needs readers in this worker thread.
/// T == std::array_view<U>.
T *fValuePtr{nullptr}; //< Non-owning ptr to the value of a temporary column. /// Owning ptrs to a TTreeReaderValue. Used for non-temporary columns when T != std::array_view<U>
TCustomColumnBase *fTmpColumn{nullptr}; //< Non-owning ptr to the node responsible for the temporary column. std::vector<std::unique_ptr<TTreeReaderValue<T>>> fReaderValues;
unsigned int fSlot{0}; //< The slot this value belongs to. Only used for temporary columns, not for real branches. /// Owning ptrs to a TTreeReaderArray. Used for non-temporary columns when T == std::array_view<U>.
std::vector<std::unique_ptr<TTreeReaderArray<ProxyParam_t>>> fReaderArrays;
/// Non-owning ptrs to the value of a custom column.
std::vector<T *> fCustomValuePtrs;
/// Non-owning ptrs to the node responsible for the custom column. Needed when querying custom values.
std::vector<TCustomColumnBase *> fCustomColumns;
/// The slot this value belongs to. Needed when querying custom column values.
unsigned int fSlot;
public: public:
TColumnValue() = default; TColumnValue() = default;
...@@ -167,9 +174,9 @@ public: ...@@ -167,9 +174,9 @@ public:
{ {
bool useReaderValue = std::is_same<ProxyParam_t, T>::value; bool useReaderValue = std::is_same<ProxyParam_t, T>::value;
if (useReaderValue) if (useReaderValue)
fReaderValue.reset(new TTreeReaderValue<T>(*r, bn.c_str())); fReaderValues.emplace_back(new TTreeReaderValue<T>(*r, bn.c_str()));
else else
fReaderArray.reset(new TTreeReaderArray<ProxyParam_t>(*r, bn.c_str())); fReaderArrays.emplace_back(new TTreeReaderArray<ProxyParam_t>(*r, bn.c_str()));
} }
template <typename U = T, template <typename U = T,
...@@ -179,25 +186,28 @@ public: ...@@ -179,25 +186,28 @@ public:
template <typename U = T, typename std::enable_if<!std::is_same<ProxyParam_t, U>::value, int>::type = 0> template <typename U = T, typename std::enable_if<!std::is_same<ProxyParam_t, U>::value, int>::type = 0>
std::array_view<ProxyParam_t> Get(Long64_t) std::array_view<ProxyParam_t> Get(Long64_t)
{ {
auto &readerArray = *fReaderArray; auto &readerArray = *fReaderArrays.back();
if (readerArray.GetSize() > 1 && 1 != (&readerArray[1] - &readerArray[0])) { if (readerArray.GetSize() > 1 && 1 != (&readerArray[1] - &readerArray[0])) {
std::string exceptionText = "Branch "; std::string exceptionText = "Branch ";
exceptionText += fReaderArray->GetBranchName(); exceptionText += readerArray.GetBranchName();
exceptionText += " hangs from a non-split branch. For this reason, it cannot be accessed via an array_view." exceptionText += " hangs from a non-split branch. For this reason, it cannot be accessed via an array_view."
" Please read the top level branch instead."; " Please read the top level branch instead.";
throw std::runtime_error(exceptionText); throw std::runtime_error(exceptionText);
} }
return std::array_view<ProxyParam_t>(fReaderArray->begin(), fReaderArray->end()); return std::array_view<ProxyParam_t>(readerArray.begin(), readerArray.end());
} }
void Reset() void Reset()
{ {
fReaderValue = nullptr; if (!fReaderValues.empty()) // we must be using TTreeReaderValues
fReaderArray = nullptr; fReaderValues.pop_back();
fValuePtr = nullptr; else if (!fReaderArrays.empty()) // we must be using TTreeReaderArrays
fTmpColumn = nullptr; fReaderArrays.pop_back();
fSlot = 0; else { // we must be using a custom column
fCustomValuePtrs.pop_back();
fCustomColumns.pop_back();
}
} }
}; };
...@@ -666,13 +676,13 @@ public: ...@@ -666,13 +676,13 @@ public:
// method implementations // method implementations
template <typename T> template <typename T>
void ROOT::Internal::TDF::TColumnValue<T>::SetTmpColumn(unsigned int slot, void ROOT::Internal::TDF::TColumnValue<T>::SetTmpColumn(unsigned int slot,
ROOT::Detail::TDF::TCustomColumnBase *tmpColumn) ROOT::Detail::TDF::TCustomColumnBase *customColumn)
{ {
fTmpColumn = tmpColumn; fCustomColumns.emplace_back(customColumn);
if (tmpColumn->GetTypeId() != typeid(T)) if (customColumn->GetTypeId() != typeid(T))
throw std::runtime_error(std::string("TColumnValue: type specified is ") + typeid(T).name() + throw std::runtime_error(std::string("TColumnValue: type specified is ") + typeid(T).name() +
" but temporary column has type " + tmpColumn->GetTypeId().name()); " but temporary column has type " + customColumn->GetTypeId().name());
fValuePtr = static_cast<T *>(tmpColumn->GetValuePtr(slot)); fCustomValuePtrs.emplace_back(static_cast<T *>(customColumn->GetValuePtr(slot)));
fSlot = slot; fSlot = slot;
} }
...@@ -686,11 +696,11 @@ template <typename U, ...@@ -686,11 +696,11 @@ template <typename U,
int>::type> int>::type>
T &ROOT::Internal::TDF::TColumnValue<T>::Get(Long64_t entry) T &ROOT::Internal::TDF::TColumnValue<T>::Get(Long64_t entry)
{ {
if (fReaderValue) { if (!fReaderValues.empty()) {
return *(fReaderValue->Get()); return *(fReaderValues.back()->Get());
} else { } else {
fTmpColumn->Update(fSlot, entry); fCustomColumns.back()->Update(fSlot, entry);
return *fValuePtr; return *fCustomValuePtrs.back();
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment