From deb288b8d0b203ca2a1408b38e66a3399d60bb51 Mon Sep 17 00:00:00 2001
From: Xavier Valls <xaviervallspla@gmail.com>
Date: Mon, 17 Sep 2018 19:35:42 +0200
Subject: [PATCH] Add built-in chunking to ForEach

---
 core/imt/inc/ROOT/TThreadExecutor.hxx | 87 +++++++++++++++++++++------
 1 file changed, 69 insertions(+), 18 deletions(-)

diff --git a/core/imt/inc/ROOT/TThreadExecutor.hxx b/core/imt/inc/ROOT/TThreadExecutor.hxx
index 9aeefdbda77..19e7dbb7a25 100644
--- a/core/imt/inc/ROOT/TThreadExecutor.hxx
+++ b/core/imt/inc/ROOT/TThreadExecutor.hxx
@@ -42,17 +42,17 @@ namespace ROOT {
       TThreadExecutor &operator=(TThreadExecutor &) = delete;
 
       template<class F>
-      void Foreach(F func, unsigned nTimes);
+      void Foreach(F func, unsigned nTimes, unsigned nChunks = 0);
       template<class F, class INTEGER>
-      void Foreach(F func, ROOT::TSeq<INTEGER> args);
+      void Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks = 0);
       /// \cond
       template<class F, class T>
-      void Foreach(F func, std::initializer_list<T> args);
+      void Foreach(F func, std::initializer_list<T> args, unsigned nChunks = 0);
       /// \endcond
       template<class F, class T>
-      void Foreach(F func, std::vector<T> &args);
+      void Foreach(F func, std::vector<T> &args, unsigned nChunks = 0);
       template<class F, class T>
-      void Foreach(F func, const std::vector<T> &args);
+      void Foreach(F func, const std::vector<T> &args, unsigned nChunks = 0);
 
       using TExecutor<TThreadExecutor>::Map;
       template<class F, class Cond = noReferenceCond<F>>
@@ -115,16 +115,43 @@ namespace ROOT {
    /// Functions that take more than zero arguments can be executed (with
    /// fixed arguments) by wrapping them in a lambda or with std::bind.
    template<class F>
-   void TThreadExecutor::Foreach(F func, unsigned nTimes) {
-       ParallelFor(0U, nTimes, 1, [&](unsigned int){func();});
+   void TThreadExecutor::Foreach(F func, unsigned nTimes, unsigned nChunks) {
+      if (nChunks == 0) {
+         ParallelFor(0U, nTimes, 1, [&](unsigned int){func();});
+         return;
+      }
+
+      unsigned step = (nTimes + nChunks - 1) / nChunks;
+      auto lambda = [&](unsigned int i)
+      {
+         for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
+            func();
+         }
+      };
+      ParallelFor(0U, nTimes, step, lambda);
    }
 
    //////////////////////////////////////////////////////////////////////////
    /// Execute func in parallel, taking an element of a
    /// sequence as argument.
    template<class F, class INTEGER>
-   void TThreadExecutor::Foreach(F func, ROOT::TSeq<INTEGER> args) {
-       ParallelFor(*args.begin(), *args.end(), args.step(), [&](unsigned int i){func(i);});
+   void TThreadExecutor::Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks) {
+      if (nChunks == 0) {
+         ParallelFor(*args.begin(), *args.end(), args.step(), [&](unsigned int i){func(i);});
+         return;
+      }
+      unsigned start = *args.begin();
+      unsigned end = *args.end();
+      unsigned seqStep = args.step();
+      unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division
+
+      auto lambda = [&](unsigned int i)
+      {
+         for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
+            func(i + j);
+         }
+      };
+      ParallelFor(start, end, step, lambda);
    }
 
    /// \cond
@@ -132,9 +159,9 @@ namespace ROOT {
    /// Execute func in parallel, taking an element of a
    /// initializer_list as argument.
    template<class F, class T>
-   void TThreadExecutor::Foreach(F func, std::initializer_list<T> args) {
-       std::vector<T> vargs(std::move(args));
-       Foreach(func, vargs);
+   void TThreadExecutor::Foreach(F func, std::initializer_list<T> args, unsigned nChunks) {
+      std::vector<T> vargs(std::move(args));
+      Foreach(func, vargs, nChunks);
    }
    /// \endcond
 
@@ -142,17 +169,41 @@ namespace ROOT {
    /// Execute func in parallel, taking an element of an
    /// std::vector as argument.
    template<class F, class T>
-   void TThreadExecutor::Foreach(F func, std::vector<T> &args) {
-        unsigned int nToProcess = args.size();
-        ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
+   void TThreadExecutor::Foreach(F func, std::vector<T> &args, unsigned nChunks) {
+      unsigned int nToProcess = args.size();
+      if (nChunks == 0) {
+         ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
+         return;
+      }
+
+      unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
+      auto lambda = [&](unsigned int i)
+      {
+         for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
+            func(args[i + j]);
+         }
+      };
+      ParallelFor(0U, nToProcess, step, lambda);
    }
 
    //////////////////////////////////////////////////////////////////////////
    /// Execute func in parallel, taking an element of a std::vector as argument.
    template<class F, class T>
-   void TThreadExecutor::Foreach(F func, const std::vector<T> &args) {
-        unsigned int nToProcess = args.size();
-        ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
+   void TThreadExecutor::Foreach(F func, const std::vector<T> &args, unsigned nChunks) {
+      unsigned int nToProcess = args.size();
+      if (nChunks == 0) {
+         ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
+         return;
+      }
+
+      unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
+      auto lambda = [&](unsigned int i)
+      {
+         for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
+            func(args[i + j]);
+         }
+      };
+      ParallelFor(0U, nToProcess, step, lambda);
    }
 
    //////////////////////////////////////////////////////////////////////////
-- 
GitLab