Skip to content
Snippets Groups Projects
Commit 4e2ad81f authored by Danilo Piparo's avatar Danilo Piparo
Browse files

[TTreeProcessorMT][ROOT-9791] Create a max number of tasks per file processed

and not one task per cluster.
We need to "fuse" together clusters if the number of clusters is to big with respect to
the number of workers, otherwise we can incur in an overhead which is so big to make
the parallelisation detrimental for performance.
For example, this is the case when following a merging of many small files a file
contains a tree with many entries and with clusters of just a few entries.
The criterium according to which we fuse clusters together is to have at most
TTreeProcessorMT::GetMaxTasksPerFilePerWorker() clusters per file per slot.
For example: given 2 files and 16 workers, at most 16 * 2 * TTreeProcessorMT::GetMaxTasksPerFilePerWorker()
clusters will be created, at most 16 * TTreeProcessorMT::GetMaxTasksPerFilePerWorker() per file.
parent 2278fb99
No related branches found
No related tags found
No related merge requests found
......@@ -174,7 +174,7 @@ namespace ROOT {
Internal::FriendInfo GetFriendInfo(TTree &tree);
std::string FindTreeName();
static unsigned int fgMaxTasksPerFilePerWorker;
public:
TTreeProcessorMT(std::string_view filename, std::string_view treename = "");
TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename = "");
......@@ -182,6 +182,8 @@ namespace ROOT {
TTreeProcessorMT(TTree &tree);
void Process(std::function<void(TTreeReader &)> func);
static void SetMaxTasksPerFilePerWorker(unsigned int m);
static unsigned int GetMaxTasksPerFilePerWorker();
};
} // End of namespace ROOT
......
......@@ -31,6 +31,9 @@ objects.
using namespace ROOT;
namespace ROOT {
unsigned int TTreeProcessorMT::fgMaxTasksPerFilePerWorker = 24U;
namespace Internal {
////////////////////////////////////////////////////////////////////////
/// Return a vector of cluster boundaries for the given tree and files.
......@@ -42,7 +45,7 @@ static ClustersAndEntries MakeClusters(const std::string &treeName, const std::v
// analysis once, all necessary streamers will be loaded into memory.
TDirectory::TContext c;
const auto nFileNames = fileNames.size();
std::vector<std::vector<EntryCluster>> clustersPerFile; clustersPerFile.reserve(nFileNames);
std::vector<std::vector<EntryCluster>> clustersPerFileProto;
std::vector<Long64_t> entriesPerFile; entriesPerFile.reserve(nFileNames);
Long64_t offset = 0ll;
for (const auto &fileName : fileNames) {
......@@ -52,7 +55,7 @@ static ClustersAndEntries MakeClusters(const std::string &treeName, const std::v
Error("TTreeProcessorMT::Process",
"An error occurred while opening file %s: skipping it.",
fileNameC);
clustersPerFile.emplace_back(std::vector<EntryCluster>());
clustersPerFileProto.emplace_back(std::vector<EntryCluster>());
entriesPerFile.emplace_back(0ULL);
continue;
}
......@@ -63,7 +66,7 @@ static ClustersAndEntries MakeClusters(const std::string &treeName, const std::v
Error("TTreeProcessorMT::Process",
"An error occurred while getting tree %s from file %s: skipping this file.",
treeName.c_str(), fileNameC);
clustersPerFile.emplace_back(std::vector<EntryCluster>());
clustersPerFileProto.emplace_back(std::vector<EntryCluster>());
entriesPerFile.emplace_back(0ULL);
continue;
}
......@@ -79,10 +82,64 @@ static ClustersAndEntries MakeClusters(const std::string &treeName, const std::v
clusters.emplace_back(EntryCluster{start + offset, end + offset});
}
offset += entries;
clustersPerFile.emplace_back(std::move(clusters));
clustersPerFileProto.emplace_back(std::move(clusters));
entriesPerFile.emplace_back(entries);
}
// Here we "fuse" together clusters if the number of clusters is to big with respect to
// the number of slots, otherwise we can incurr in an overhead which is so big to make
// the parallelisation detrimental for performance.
// For example, this is the case when following a merging of many small files a file
// contains a tree with many entries and with clusters of just a few entries.
// The criterium according to which we fuse clusters together is to have at most
// TTreeProcessorMT::GetMaxTasksPerFilePerWorker() clusters per file per slot.
// For example: given 2 files and 16 workers, at most
// 16 * 2 * TTreeProcessorMT::GetMaxTasksPerFilePerWorker() clusters will be created, at most
// 16 * TTreeProcessorMT::GetMaxTasksPerFilePerWorker() per file.
const auto maxClustersPerFile = TTreeProcessorMT::GetMaxTasksPerFilePerWorker() * ROOT::GetImplicitMTPoolSize();
std::vector<std::vector<EntryCluster>> clustersPerFile(clustersPerFileProto.size());
auto clustersPerFileProtoIt = clustersPerFileProto.begin();
auto clustersPerFileIt = clustersPerFile.begin();
for (; clustersPerFileProtoIt != clustersPerFileProto.end(); clustersPerFileProtoIt++, clustersPerFileIt++) {
const auto clustersInThisFileSize = clustersPerFileProtoIt->size();
const auto nFolds = clustersInThisFileSize / maxClustersPerFile;
// If the number of clusters is less than maxClustersPerFile
// we take the clusters as they are
if (nFolds == 0) {
std::for_each(clustersPerFileProtoIt->begin(), clustersPerFileProtoIt->end(),
[&clustersPerFileIt](const EntryCluster &clust) { clustersPerFileIt->emplace_back(clust); });
continue;
}
// Otherwise, we have to merge clusters, distributing the reminder eavenly
// onto the first clusters
auto nReminderClusters = clustersInThisFileSize % maxClustersPerFile;
auto clustIt = clustersPerFileProtoIt->begin();
Long64_t start = clustIt->start;
clustIt++;
Long64_t end = 0ULL;
auto clusterCursor = 1U;
for (; clustIt != clustersPerFileProtoIt->end(); clustIt++, clusterCursor++) {
const auto reminderCluster = nReminderClusters != 0 ? 1U : 0U;
if (clusterCursor == (nFolds + reminderCluster))
{
clustersPerFileIt->emplace_back(EntryCluster({start, end}));
start = clustIt->start;
clusterCursor = 0U;
if (nReminderClusters!=0) {
nReminderClusters--;
}
}
else {
end = clustIt->end;
}
}
// Here we need to add the last cluster to the set because
// the iteration ends before we have the possibility to do this
clustersPerFileIt->emplace_back(EntryCluster({start, end}));
}
return std::make_pair(std::move(clustersPerFile), std::move(entriesPerFile));
}
......@@ -353,3 +410,23 @@ void TTreeProcessorMT::Process(std::function<void(TTreeReader &)> func)
pool.Foreach(processFile, fileIdxs);
}
////////////////////////////////////////////////////////////////////////
/// \brief Sets the maximum number of tasks created per file, per worker.
/// \return The maximum number of tasks created per file, per worker
unsigned int TTreeProcessorMT::GetMaxTasksPerFilePerWorker()
{
return fgMaxTasksPerFilePerWorker;
}
////////////////////////////////////////////////////////////////////////
/// \brief Sets the maximum number of tasks created per file, per worker.
/// \param[in] maxTasksPerFile Name of the file containing the tree to process.
///
/// This allows to create a reasonable number of tasks even if any of the
/// processed files features a bad clustering, for example with a lot of
/// entries and just a few entries per cluster.
void TTreeProcessorMT::SetMaxTasksPerFilePerWorker(unsigned int maxTasksPerFile)
{
fgMaxTasksPerFilePerWorker = maxTasksPerFile;
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment