diff --git a/gui/webdisplay/inc/ROOT/TWebWindow.hxx b/gui/webdisplay/inc/ROOT/TWebWindow.hxx index e543674cfdd87c3d5062090fcbf6271fb19d3351..2bc4b65e85072709106b7c679903fdb31eef3d90 100644 --- a/gui/webdisplay/inc/ROOT/TWebWindow.hxx +++ b/gui/webdisplay/inc/ROOT/TWebWindow.hxx @@ -23,6 +23,7 @@ #include <map> #include <functional> #include <mutex> +#include <thread> class THttpCallArg; class THttpServer; @@ -69,11 +70,18 @@ private: int fClientCredits{0}; ///<! number of credits received from client bool fDoingSend{false}; ///<! true when performing send operation std::queue<QueueItem> fQueue; ///<! output queue - WebWindowDataCallback_t fCallBack; ///<! additional data callback for extra channels + // WebWindowDataCallback_t fCallBack; ///<! additional data callback for extra channels WebConn() = default; WebConn(unsigned id, unsigned wsid) : fConnId(id), fWSId(wsid) {} }; + struct DataEntry { + unsigned fConnId{0}; ///<! connection id + std::string fData; ///<! data for given connection + DataEntry() = default; + DataEntry(unsigned connid, std::string &&data) : fConnId(connid), fData(data) {} + }; + std::shared_ptr<TWebWindowsManager> fMgr; ///<! display manager bool fBatchMode{false}; ///<! batch mode std::string fDefaultPage; ///<! HTML page (or file name) returned when window URL is opened @@ -89,6 +97,9 @@ private: bool fNativeOnlyConn{false}; ///<! only native connection are allowed, created by Show() method static const unsigned fMaxQueueLength{10}; ///<! maximal number of queue entries WebWindowDataCallback_t fDataCallback; ///<! main callback when data over channel 1 is arrived + std::thread::id fDataThrdId; ///<! thread id where data callback should be invoked + std::queue<DataEntry> fDataQueue; ///<! data queue for main callback + std::mutex fDataMutex; ///<! mutex to protect data queue unsigned fWidth{0}; ///<! initial window width when displayed unsigned fHeight{0}; ///<! initial window height when displayed @@ -112,6 +123,10 @@ private: std::string _MakeSendHeader(std::shared_ptr<WebConn> &conn, bool txt, const std::string &data, int chid); + void ProvideData(unsigned connid, std::string &&arg); + + void InovkeCallbacks(bool force = false); + void SubmitData(unsigned connid, bool txt, std::string &&data, int chid = 1); bool CheckDataToSend(std::shared_ptr<WebConn> &conn); @@ -193,6 +208,8 @@ public: THttpServer *GetServer(); + void Sync(); + bool Show(const std::string &where = ""); bool CanSend(unsigned connid, bool direct = true); diff --git a/gui/webdisplay/src/TWebWindow.cxx b/gui/webdisplay/src/TWebWindow.cxx index a11a6a8f2fbdadd7317df130a4cb37d1d579f398..30f4dcf8e1ed3189da79d8198ba69fad19ff98c2 100644 --- a/gui/webdisplay/src/TWebWindow.cxx +++ b/gui/webdisplay/src/TWebWindow.cxx @@ -32,11 +32,11 @@ namespace Experimental { class TWebWindowWSHandler : public THttpWSHandler { public: - TWebWindow &fDispl; ///<! display reference + TWebWindow &fWindow; ///<! window reference /// constructor - TWebWindowWSHandler(TWebWindow &displ, const char *name) - : THttpWSHandler(name, "TWebWindow websockets handler"), fDispl(displ) + TWebWindowWSHandler(TWebWindow &wind, const char *name) + : THttpWSHandler(name, "TWebWindow websockets handler"), fWindow(wind) { } @@ -46,17 +46,17 @@ public: /// returns content of default web-page /// THttpWSHandler interface - virtual TString GetDefaultPageContent() override { return IsDisabled() ? "" : fDispl.fDefaultPage.c_str(); } + virtual TString GetDefaultPageContent() override { return IsDisabled() ? "" : fWindow.fDefaultPage.c_str(); } - /// Process websocket request + /// Process websocket request - called from THttpServer thread /// THttpWSHandler interface - virtual Bool_t ProcessWS(THttpCallArg *arg) override { return arg && !IsDisabled() ? fDispl.ProcessWS(*arg) : kFALSE; } + virtual Bool_t ProcessWS(THttpCallArg *arg) override { return arg && !IsDisabled() ? fWindow.ProcessWS(*arg) : kFALSE; } /// Allows usage of multithreading in send operations virtual Bool_t AllowMT() const { return kTRUE; } /// React on completion of multithreaded send operaiotn - virtual void CompleteMTSend(UInt_t wsid) { if (!IsDisabled()) fDispl.CompleteMTSend(wsid); } + virtual void CompleteMTSend(UInt_t wsid) { if (!IsDisabled()) fWindow.CompleteMTSend(wsid); } }; } // namespace Experimental @@ -204,8 +204,50 @@ std::shared_ptr<ROOT::Experimental::TWebWindow::WebConn> ROOT::Experimental::TWe return nullptr; } +////////////////////////////////////////////////////////////////////////////////////////// +/// Provide data to user callback +/// User callback must be executed in the window thread + +void ROOT::Experimental::TWebWindow::ProvideData(unsigned connid, std::string &&arg) +{ + { + std::lock_guard<std::mutex> grd(fDataMutex); + fDataQueue.emplace(connid, std::move(arg)); + } + + InovkeCallbacks(); +} + +////////////////////////////////////////////////////////////////////////////////////////// +/// Invoke callbacks with existing data +/// Must be called from appropriate thread + +void ROOT::Experimental::TWebWindow::InovkeCallbacks(bool force) +{ + if ((fDataThrdId != std::this_thread::get_id()) && !force) + return; + + while (fDataCallback) { + std::string arg; + unsigned connid; + + { + std::lock_guard<std::mutex> grd(fDataMutex); + if (fDataQueue.size() == 0) + return; + DataEntry &entry = fDataQueue.front(); + connid = entry.fConnId; + arg = std::move(entry.fData); + } + + fDataCallback(connid, arg); + } +} + + ////////////////////////////////////////////////////////////////////////////////////////// /// Processing of websockets call-backs, invoked from TWebWindowWSHandler +/// Method invoked from http server thread, therefore appropriate mutex must be used on all relevant data bool ROOT::Experimental::TWebWindow::ProcessWS(THttpCallArg &arg) { @@ -249,8 +291,7 @@ bool ROOT::Experimental::TWebWindow::ProcessWS(THttpCallArg &arg) auto conn = RemoveConnection(arg.GetWSId()); if (conn) { - if (fDataCallback) - fDataCallback(conn->fConnId, "CONN_CLOSED"); + ProvideData(conn->fConnId, "CONN_CLOSED"); fMgr->HaltClient(conn->fProcId); } @@ -343,23 +384,24 @@ bool ROOT::Experimental::TWebWindow::ProcessWS(THttpCallArg &arg) Send(conn->fConnId, std::string("SHOWPANEL:") + fPanelName); conn->fReady = 5; } else { - fDataCallback(conn->fConnId, "CONN_READY"); + ProvideData(conn->fConnId, "CONN_READY"); conn->fReady = 10; } } } else if (fPanelName.length() && (conn->fReady < 10)) { if (cdata == "PANEL_READY") { R__DEBUG_HERE("webgui") << "Get panel ready " << fPanelName; - fDataCallback(conn->fConnId, "CONN_READY"); + ProvideData(conn->fConnId, "CONN_READY"); conn->fReady = 10; } else { - fDataCallback(conn->fConnId, "CONN_CLOSED"); + ProvideData(conn->fConnId, "CONN_CLOSED"); RemoveConnection(conn->fWSId); } } else if (nchannel == 1) { - fDataCallback(conn->fConnId, cdata); + ProvideData(conn->fConnId, std::move(cdata)); } else if (nchannel > 1) { - conn->fCallBack(conn->fConnId, cdata); + // add processing of extra channels later + // conn->fCallBack(conn->fConnId, cdata); } CheckDataToSend(); @@ -503,6 +545,17 @@ void ROOT::Experimental::TWebWindow::CheckDataToSend(bool only_once) } while (!only_once); } +/////////////////////////////////////////////////////////////////////////////////// +/// Special method to process all internal activity when window runs in separate thread + +void ROOT::Experimental::TWebWindow::Sync() +{ + InvokeCallbacks(); + + CheckDataToSend(); +} + + /////////////////////////////////////////////////////////////////////////////////// /// Returns relative URL address for the specified window /// Address can be required if one needs to access data from one window into another window @@ -703,6 +756,7 @@ void ROOT::Experimental::TWebWindow::SendBinary(unsigned connid, const void *dat void ROOT::Experimental::TWebWindow::SetDataCallBack(WebWindowDataCallback_t func) { fDataCallback = func; + fDataThrdId = std::this_thread::get_id(); } /////////////////////////////////////////////////////////////////////////////////