From 1d46ed54b62c1ff6b39c2867e43b36a1328998ae Mon Sep 17 00:00:00 2001
From: Sergey Linev <S.Linev@gsi.de>
Date: Fri, 17 Aug 2018 11:29:33 +0200
Subject: [PATCH] webgui: introduce data queue in TWebWindow

In multithreaded environment user data callback invoked not necessary
from the thread where data is received (typically THttpThread) but from
thread where TWebWindow is running. For that special queue is created
and Sync() function is introduced.

Still open question - how to correctly detect main thread, where system
events should be processed when blocking function is called
---
 gui/webdisplay/inc/ROOT/TWebWindow.hxx | 19 +++++-
 gui/webdisplay/src/TWebWindow.cxx      | 82 +++++++++++++++++++++-----
 2 files changed, 86 insertions(+), 15 deletions(-)

diff --git a/gui/webdisplay/inc/ROOT/TWebWindow.hxx b/gui/webdisplay/inc/ROOT/TWebWindow.hxx
index e543674cfdd..2bc4b65e850 100644
--- a/gui/webdisplay/inc/ROOT/TWebWindow.hxx
+++ b/gui/webdisplay/inc/ROOT/TWebWindow.hxx
@@ -23,6 +23,7 @@
 #include <map>
 #include <functional>
 #include <mutex>
+#include <thread>
 
 class THttpCallArg;
 class THttpServer;
@@ -69,11 +70,18 @@ private:
       int fClientCredits{0};         ///<! number of credits received from client
       bool fDoingSend{false};        ///<! true when performing send operation
       std::queue<QueueItem> fQueue;  ///<! output queue
-      WebWindowDataCallback_t fCallBack; ///<! additional data callback for extra channels
+      // WebWindowDataCallback_t fCallBack; ///<! additional data callback for extra channels
       WebConn() = default;
       WebConn(unsigned id, unsigned wsid) : fConnId(id), fWSId(wsid) {}
    };
 
+   struct DataEntry {
+      unsigned fConnId{0};         ///<! connection id
+      std::string fData;           ///<! data for given connection
+      DataEntry() = default;
+      DataEntry(unsigned connid, std::string &&data) : fConnId(connid), fData(data) {}
+   };
+
    std::shared_ptr<TWebWindowsManager> fMgr;        ///<! display manager
    bool fBatchMode{false};                          ///<! batch mode
    std::string fDefaultPage;                        ///<! HTML page (or file name) returned when window URL is opened
@@ -89,6 +97,9 @@ private:
    bool fNativeOnlyConn{false};                     ///<! only native connection are allowed, created by Show() method
    static const unsigned fMaxQueueLength{10};       ///<! maximal number of queue entries
    WebWindowDataCallback_t fDataCallback;           ///<! main callback when data over channel 1 is arrived
+   std::thread::id fDataThrdId;                     ///<! thread id where data callback should be invoked
+   std::queue<DataEntry> fDataQueue;                ///<! data queue for main callback
+   std::mutex fDataMutex;                           ///<! mutex to protect data queue
    unsigned fWidth{0};                              ///<! initial window width when displayed
    unsigned fHeight{0};                             ///<! initial window height when displayed
 
@@ -112,6 +123,10 @@ private:
 
    std::string _MakeSendHeader(std::shared_ptr<WebConn> &conn, bool txt, const std::string &data, int chid);
 
+   void ProvideData(unsigned connid, std::string &&arg);
+
+   void InovkeCallbacks(bool force = false);
+
    void SubmitData(unsigned connid, bool txt, std::string &&data, int chid = 1);
 
    bool CheckDataToSend(std::shared_ptr<WebConn> &conn);
@@ -193,6 +208,8 @@ public:
 
    THttpServer *GetServer();
 
+   void Sync();
+
    bool Show(const std::string &where = "");
 
    bool CanSend(unsigned connid, bool direct = true);
diff --git a/gui/webdisplay/src/TWebWindow.cxx b/gui/webdisplay/src/TWebWindow.cxx
index a11a6a8f2fb..30f4dcf8e1e 100644
--- a/gui/webdisplay/src/TWebWindow.cxx
+++ b/gui/webdisplay/src/TWebWindow.cxx
@@ -32,11 +32,11 @@ namespace Experimental {
 
 class TWebWindowWSHandler : public THttpWSHandler {
 public:
-   TWebWindow &fDispl; ///<! display reference
+   TWebWindow &fWindow; ///<! window reference
 
    /// constructor
-   TWebWindowWSHandler(TWebWindow &displ, const char *name)
-      : THttpWSHandler(name, "TWebWindow websockets handler"), fDispl(displ)
+   TWebWindowWSHandler(TWebWindow &wind, const char *name)
+      : THttpWSHandler(name, "TWebWindow websockets handler"), fWindow(wind)
    {
    }
 
@@ -46,17 +46,17 @@ public:
 
    /// returns content of default web-page
    /// THttpWSHandler interface
-   virtual TString GetDefaultPageContent() override { return IsDisabled() ? "" : fDispl.fDefaultPage.c_str(); }
+   virtual TString GetDefaultPageContent() override { return IsDisabled() ? "" : fWindow.fDefaultPage.c_str(); }
 
-   /// Process websocket request
+   /// Process websocket request - called from THttpServer thread
    /// THttpWSHandler interface
-   virtual Bool_t ProcessWS(THttpCallArg *arg) override { return arg && !IsDisabled() ? fDispl.ProcessWS(*arg) : kFALSE; }
+   virtual Bool_t ProcessWS(THttpCallArg *arg) override { return arg && !IsDisabled() ? fWindow.ProcessWS(*arg) : kFALSE; }
 
    /// Allows usage of multithreading in send operations
    virtual Bool_t AllowMT() const { return kTRUE; }
 
    /// React on completion of multithreaded send operaiotn
-   virtual void CompleteMTSend(UInt_t wsid) { if (!IsDisabled()) fDispl.CompleteMTSend(wsid); }
+   virtual void CompleteMTSend(UInt_t wsid) { if (!IsDisabled()) fWindow.CompleteMTSend(wsid); }
 };
 
 } // namespace Experimental
@@ -204,8 +204,50 @@ std::shared_ptr<ROOT::Experimental::TWebWindow::WebConn> ROOT::Experimental::TWe
    return nullptr;
 }
 
+//////////////////////////////////////////////////////////////////////////////////////////
+/// Provide data to user callback
+/// User callback must be executed in the window thread
+
+void ROOT::Experimental::TWebWindow::ProvideData(unsigned connid, std::string &&arg)
+{
+   {
+      std::lock_guard<std::mutex> grd(fDataMutex);
+      fDataQueue.emplace(connid, std::move(arg));
+   }
+
+   InovkeCallbacks();
+}
+
+//////////////////////////////////////////////////////////////////////////////////////////
+/// Invoke callbacks with existing data
+/// Must be called from appropriate thread
+
+void ROOT::Experimental::TWebWindow::InovkeCallbacks(bool force)
+{
+   if ((fDataThrdId != std::this_thread::get_id()) && !force)
+      return;
+
+   while (fDataCallback) {
+      std::string arg;
+      unsigned connid;
+
+      {
+         std::lock_guard<std::mutex> grd(fDataMutex);
+         if (fDataQueue.size() == 0)
+            return;
+         DataEntry &entry = fDataQueue.front();
+         connid = entry.fConnId;
+         arg = std::move(entry.fData);
+      }
+
+      fDataCallback(connid, arg);
+   }
+}
+
+
 //////////////////////////////////////////////////////////////////////////////////////////
 /// Processing of websockets call-backs, invoked from TWebWindowWSHandler
+/// Method invoked from http server thread, therefore appropriate mutex must be used on all relevant data
 
 bool ROOT::Experimental::TWebWindow::ProcessWS(THttpCallArg &arg)
 {
@@ -249,8 +291,7 @@ bool ROOT::Experimental::TWebWindow::ProcessWS(THttpCallArg &arg)
       auto conn = RemoveConnection(arg.GetWSId());
 
       if (conn) {
-         if (fDataCallback)
-            fDataCallback(conn->fConnId, "CONN_CLOSED");
+         ProvideData(conn->fConnId, "CONN_CLOSED");
 
          fMgr->HaltClient(conn->fProcId);
       }
@@ -343,23 +384,24 @@ bool ROOT::Experimental::TWebWindow::ProcessWS(THttpCallArg &arg)
             Send(conn->fConnId, std::string("SHOWPANEL:") + fPanelName);
             conn->fReady = 5;
          } else {
-            fDataCallback(conn->fConnId, "CONN_READY");
+            ProvideData(conn->fConnId, "CONN_READY");
             conn->fReady = 10;
          }
       }
    } else if (fPanelName.length() && (conn->fReady < 10)) {
       if (cdata == "PANEL_READY") {
          R__DEBUG_HERE("webgui") << "Get panel ready " << fPanelName;
-         fDataCallback(conn->fConnId, "CONN_READY");
+         ProvideData(conn->fConnId, "CONN_READY");
          conn->fReady = 10;
       } else {
-         fDataCallback(conn->fConnId, "CONN_CLOSED");
+         ProvideData(conn->fConnId, "CONN_CLOSED");
          RemoveConnection(conn->fWSId);
       }
    } else if (nchannel == 1) {
-      fDataCallback(conn->fConnId, cdata);
+      ProvideData(conn->fConnId, std::move(cdata));
    } else if (nchannel > 1) {
-      conn->fCallBack(conn->fConnId, cdata);
+      // add processing of extra channels later
+      // conn->fCallBack(conn->fConnId, cdata);
    }
 
    CheckDataToSend();
@@ -503,6 +545,17 @@ void ROOT::Experimental::TWebWindow::CheckDataToSend(bool only_once)
    } while (!only_once);
 }
 
+///////////////////////////////////////////////////////////////////////////////////
+/// Special method to process all internal activity when window runs in separate thread
+
+void ROOT::Experimental::TWebWindow::Sync()
+{
+   InvokeCallbacks();
+
+   CheckDataToSend();
+}
+
+
 ///////////////////////////////////////////////////////////////////////////////////
 /// Returns relative URL address for the specified window
 /// Address can be required if one needs to access data from one window into another window
@@ -703,6 +756,7 @@ void ROOT::Experimental::TWebWindow::SendBinary(unsigned connid, const void *dat
 void ROOT::Experimental::TWebWindow::SetDataCallBack(WebWindowDataCallback_t func)
 {
    fDataCallback = func;
+   fDataThrdId = std::this_thread::get_id();
 }
 
 /////////////////////////////////////////////////////////////////////////////////
-- 
GitLab