Skip to content
Snippets Groups Projects
Commit b8da5b41 authored by Enrico Guiraud's avatar Enrico Guiraud
Browse files

[TDF] Refactor internal callback logic

Now plays with threads more nicely, as we pass to the callback the
actual partial result of processing slot 0 instead of passing
TResultProxy's result pointer.
parent 5afa219d
Branches
Tags
No related merge requests found
......@@ -243,7 +243,7 @@ public:
// Must be implemented to activate callbacks for Histo1D<T> actions.
// Currently no-op, meaning we leave the result object as it is:
// the result object is already being filled because FillTOHelper keeps it in its working slot 0
void PartialUpdate() { }
HIST *PartialUpdate(unsigned int slot) { return fTo->GetAtSlotRaw(slot); }
};
// note: changes to this class should probably be replicated in its partial
......
......@@ -344,7 +344,7 @@ public:
unsigned int GetNSlots() const { return fNSlots; }
/// This method is invoked to update a partial result during the event loop, right before passing the result to a
/// user-defined callback registered via TResultProxy::RegisterCallback
virtual void PartialUpdate() = 0;
virtual void *PartialUpdate(unsigned int slot) = 0;
};
template <typename Helper, typename PrevDataFrame, typename BranchTypes_t = typename Helper::BranchTypes_t>
......@@ -394,19 +394,20 @@ public:
/// This method is invoked to update a partial result during the event loop, right before passing the result to a
/// user-defined callback registered via TResultProxy::RegisterCallback
void PartialUpdate() final { PartialUpdateImpl(0); }
/// TODO the PartialUpdateImpl trick can go away once all action helpers will implement PartialUpdate
void *PartialUpdate(unsigned int slot) final { return PartialUpdateImpl(slot); }
private:
// this overload is SFINAE'd out if Helper does not implement `PartialUpdate`
// the template parameter is required to defer instantiation of the method to SFINAE time
template <typename H = Helper>
auto PartialUpdateImpl(int /*overloadSelector*/) -> decltype(std::declval<H>().PartialUpdate())
auto PartialUpdateImpl(unsigned int slot) -> decltype(std::declval<H>().PartialUpdate(slot))
{
fHelper.PartialUpdate();
return fHelper.PartialUpdate(slot);
}
// this one is always available but has lower precedence thanks to `...`
void PartialUpdateImpl(...) {
Warning("PartialUpdate", "This action does not support callbacks!");
void *PartialUpdateImpl(...) {
throw std::runtime_error("This action does not support callbacks yet!");
}
};
......
......@@ -107,9 +107,9 @@ class TResultProxy {
return fObjPtr.get();
}
TResultProxy(const SPT_t &objPtr, const ShrdPtrBool_t &readiness, const SPTLM_t &firstData,
TResultProxy(const SPT_t &objPtr, const ShrdPtrBool_t &readiness, const SPTLM_t &loopManager,
TDFInternal::TActionBase *actionPtr = nullptr)
: fReadiness(readiness), fImplWeakPtr(firstData), fObjPtr(objPtr), fActionPtr(actionPtr)
: fReadiness(readiness), fImplWeakPtr(loopManager), fObjPtr(objPtr), fActionPtr(actionPtr)
{
}
......@@ -197,9 +197,10 @@ public:
auto lm = fImplWeakPtr.lock();
if (!lm)
throw std::runtime_error("The main TDataFrame is not reachable: did it go out of scope?");
auto c = [this, callback]() {
fActionPtr->PartialUpdate();
callback(*fObjPtr);
auto actionPtr = fActionPtr; // only variables with automatic storage duration can be captured
auto c = [actionPtr, callback]() {
auto partialResult = static_cast<Value_t*>(actionPtr->PartialUpdate(0));
callback(*partialResult);
};
lm->RegisterCallback(std::move(c), everyNevents);
}
......
......@@ -265,7 +265,7 @@ void TLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry)
{
for (auto &actionPtr : fBookedActions) actionPtr->Run(slot, entry);
for (auto &namedFilterPtr : fBookedNamedFilters) namedFilterPtr->CheckFilters(slot, entry);
if (slot == 0)
if (!fCallbacks.empty() && slot == 0)
for (auto &callback : fCallbacks)
callback();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment