diff --git a/tree/treeplayer/inc/ROOT/TTreeProcessorMT.hxx b/tree/treeplayer/inc/ROOT/TTreeProcessorMT.hxx index 3bbc098ff5328fd0c19e9967517baa852856cf8c..15d05068b89acdae6f87c870d5a70142975c93e7 100644 --- a/tree/treeplayer/inc/ROOT/TTreeProcessorMT.hxx +++ b/tree/treeplayer/inc/ROOT/TTreeProcessorMT.hxx @@ -174,7 +174,7 @@ namespace ROOT { Internal::FriendInfo GetFriendInfo(TTree &tree); std::string FindTreeName(); - + static unsigned int fgMaxTasksPerFilePerWorker; public: TTreeProcessorMT(std::string_view filename, std::string_view treename = ""); TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename = ""); @@ -182,6 +182,8 @@ namespace ROOT { TTreeProcessorMT(TTree &tree); void Process(std::function<void(TTreeReader &)> func); + static void SetMaxTasksPerFilePerWorker(unsigned int m); + static unsigned int GetMaxTasksPerFilePerWorker(); }; } // End of namespace ROOT diff --git a/tree/treeplayer/src/TTreeProcessorMT.cxx b/tree/treeplayer/src/TTreeProcessorMT.cxx index 97ecf117f8e892ffd49d2dfc8785d70b1e28393a..a0e62a51b05080dd9d5609bc2073adae20fd4daf 100644 --- a/tree/treeplayer/src/TTreeProcessorMT.cxx +++ b/tree/treeplayer/src/TTreeProcessorMT.cxx @@ -31,6 +31,9 @@ objects. using namespace ROOT; namespace ROOT { + +unsigned int TTreeProcessorMT::fgMaxTasksPerFilePerWorker = 24U; + namespace Internal { //////////////////////////////////////////////////////////////////////// /// Return a vector of cluster boundaries for the given tree and files. @@ -42,7 +45,7 @@ static ClustersAndEntries MakeClusters(const std::string &treeName, const std::v // analysis once, all necessary streamers will be loaded into memory. TDirectory::TContext c; const auto nFileNames = fileNames.size(); - std::vector<std::vector<EntryCluster>> clustersPerFile; clustersPerFile.reserve(nFileNames); + std::vector<std::vector<EntryCluster>> clustersPerFileProto; std::vector<Long64_t> entriesPerFile; entriesPerFile.reserve(nFileNames); Long64_t offset = 0ll; for (const auto &fileName : fileNames) { @@ -52,7 +55,7 @@ static ClustersAndEntries MakeClusters(const std::string &treeName, const std::v Error("TTreeProcessorMT::Process", "An error occurred while opening file %s: skipping it.", fileNameC); - clustersPerFile.emplace_back(std::vector<EntryCluster>()); + clustersPerFileProto.emplace_back(std::vector<EntryCluster>()); entriesPerFile.emplace_back(0ULL); continue; } @@ -63,7 +66,7 @@ static ClustersAndEntries MakeClusters(const std::string &treeName, const std::v Error("TTreeProcessorMT::Process", "An error occurred while getting tree %s from file %s: skipping this file.", treeName.c_str(), fileNameC); - clustersPerFile.emplace_back(std::vector<EntryCluster>()); + clustersPerFileProto.emplace_back(std::vector<EntryCluster>()); entriesPerFile.emplace_back(0ULL); continue; } @@ -79,10 +82,64 @@ static ClustersAndEntries MakeClusters(const std::string &treeName, const std::v clusters.emplace_back(EntryCluster{start + offset, end + offset}); } offset += entries; - clustersPerFile.emplace_back(std::move(clusters)); + clustersPerFileProto.emplace_back(std::move(clusters)); entriesPerFile.emplace_back(entries); } + // Here we "fuse" together clusters if the number of clusters is to big with respect to + // the number of slots, otherwise we can incurr in an overhead which is so big to make + // the parallelisation detrimental for performance. + // For example, this is the case when following a merging of many small files a file + // contains a tree with many entries and with clusters of just a few entries. + // The criterium according to which we fuse clusters together is to have at most + // TTreeProcessorMT::GetMaxTasksPerFilePerWorker() clusters per file per slot. + // For example: given 2 files and 16 workers, at most + // 16 * 2 * TTreeProcessorMT::GetMaxTasksPerFilePerWorker() clusters will be created, at most + // 16 * TTreeProcessorMT::GetMaxTasksPerFilePerWorker() per file. + + const auto maxClustersPerFile = TTreeProcessorMT::GetMaxTasksPerFilePerWorker() * ROOT::GetImplicitMTPoolSize(); + std::vector<std::vector<EntryCluster>> clustersPerFile(clustersPerFileProto.size()); + auto clustersPerFileProtoIt = clustersPerFileProto.begin(); + auto clustersPerFileIt = clustersPerFile.begin(); + for (; clustersPerFileProtoIt != clustersPerFileProto.end(); clustersPerFileProtoIt++, clustersPerFileIt++) { + const auto clustersInThisFileSize = clustersPerFileProtoIt->size(); + const auto nFolds = clustersInThisFileSize / maxClustersPerFile; + // If the number of clusters is less than maxClustersPerFile + // we take the clusters as they are + if (nFolds == 0) { + std::for_each(clustersPerFileProtoIt->begin(), clustersPerFileProtoIt->end(), + [&clustersPerFileIt](const EntryCluster &clust) { clustersPerFileIt->emplace_back(clust); }); + continue; + } + // Otherwise, we have to merge clusters, distributing the reminder eavenly + // onto the first clusters + auto nReminderClusters = clustersInThisFileSize % maxClustersPerFile; + auto clustIt = clustersPerFileProtoIt->begin(); + Long64_t start = clustIt->start; + clustIt++; + Long64_t end = 0ULL; + auto clusterCursor = 1U; + for (; clustIt != clustersPerFileProtoIt->end(); clustIt++, clusterCursor++) { + const auto reminderCluster = nReminderClusters != 0 ? 1U : 0U; + if (clusterCursor == (nFolds + reminderCluster)) + { + clustersPerFileIt->emplace_back(EntryCluster({start, end})); + start = clustIt->start; + clusterCursor = 0U; + if (nReminderClusters!=0) { + nReminderClusters--; + } + } + else { + end = clustIt->end; + } + } + // Here we need to add the last cluster to the set because + // the iteration ends before we have the possibility to do this + clustersPerFileIt->emplace_back(EntryCluster({start, end})); + } + + return std::make_pair(std::move(clustersPerFile), std::move(entriesPerFile)); } @@ -353,3 +410,23 @@ void TTreeProcessorMT::Process(std::function<void(TTreeReader &)> func) pool.Foreach(processFile, fileIdxs); } + +//////////////////////////////////////////////////////////////////////// +/// \brief Sets the maximum number of tasks created per file, per worker. +/// \return The maximum number of tasks created per file, per worker +unsigned int TTreeProcessorMT::GetMaxTasksPerFilePerWorker() +{ + return fgMaxTasksPerFilePerWorker; +} + +//////////////////////////////////////////////////////////////////////// +/// \brief Sets the maximum number of tasks created per file, per worker. +/// \param[in] maxTasksPerFile Name of the file containing the tree to process. +/// +/// This allows to create a reasonable number of tasks even if any of the +/// processed files features a bad clustering, for example with a lot of +/// entries and just a few entries per cluster. +void TTreeProcessorMT::SetMaxTasksPerFilePerWorker(unsigned int maxTasksPerFile) +{ + fgMaxTasksPerFilePerWorker = maxTasksPerFile; +} \ No newline at end of file