From a489011ff6d1a1d1a93b6b1dbcc4624db446ed4a Mon Sep 17 00:00:00 2001
From: Sergey Linev <S.Linev@gsi.de>
Date: Wed, 22 Aug 2018 12:55:33 +0200
Subject: [PATCH] http: introduce async mode for the WSHandler

Only in such mode multithreading is allowed.
In async mode one can meaningfully use send completion callback.
Default mode is synchronous (as before), which allows to use handler
only from main thread
---
 net/http/inc/THttpWSHandler.h   |  19 +++--
 net/http/src/TCivetweb.cxx      |   8 +-
 net/http/src/THttpWSEngine.h    |   8 +-
 net/http/src/THttpWSHandler.cxx | 129 +++++++++++++++++++-------------
 4 files changed, 99 insertions(+), 65 deletions(-)

diff --git a/net/http/inc/THttpWSHandler.h b/net/http/inc/THttpWSHandler.h
index 238f1fdd16c..b9f0f9dc2d8 100644
--- a/net/http/inc/THttpWSHandler.h
+++ b/net/http/inc/THttpWSHandler.h
@@ -27,10 +27,11 @@ class THttpWSHandler : public TNamed {
 friend class THttpServer;
 
 private:
-
-   std::mutex fMutex;                                            ///<!  protect list of engines
-   std::vector<std::shared_ptr<THttpWSEngine>> fEngines;         ///<!  list of active WS engines (connections)
-   Bool_t fDisabled{kFALSE};                                     ///<!  when true, all operations will be ignored
+   Bool_t fSyncMode{kTRUE};  ///<! is handler runs in synchronous mode (default, no multi-threading)
+   Bool_t fDisabled{kFALSE}; ///<!  when true, all further operations will be ignored
+   Int_t fSendCnt{0};        ///<! counter for completed send operations
+   std::mutex fMutex;        ///<!  protect list of engines
+   std::vector<std::shared_ptr<THttpWSEngine>> fEngines; ///<!  list of active WS engines (connections)
 
    std::shared_ptr<THttpWSEngine> FindEngine(UInt_t id, Bool_t book_send = kFALSE);
 
@@ -42,9 +43,11 @@ private:
 
    void RemoveEngine(std::shared_ptr<THttpWSEngine> &engine);
 
+   Int_t CompleteSend(std::shared_ptr<THttpWSEngine> &engine);
+
 protected:
 
-   THttpWSHandler(const char *name, const char *title);
+   THttpWSHandler(const char *name, const char *title, Bool_t syncmode = kTRUE);
 
    /// Method called when multi-threaded send operation is completed
    virtual void CompleteWSSend(UInt_t) {}
@@ -52,6 +55,12 @@ protected:
 public:
    virtual ~THttpWSHandler();
 
+   /// Returns processing mode of WS handler
+   /// If sync mode is TRUE (default), all event processing and data sending performed in main thread
+   /// All send functions are blocking and must be performed from main thread
+   /// If sync mode is false, WS handler can be used from different threads and starts its own sending threads
+   Bool_t IsSyncMode() const { return fSyncMode; }
+
    /// Provides content of default web page for registered web-socket handler
    /// Can be content of HTML page or file name, where content should be taken
    /// For instance, file:/home/user/test.htm or file:$jsrootsys/files/canvas.htm
diff --git a/net/http/src/TCivetweb.cxx b/net/http/src/TCivetweb.cxx
index 3d688fbde6f..1aa6eb1ce1c 100644
--- a/net/http/src/TCivetweb.cxx
+++ b/net/http/src/TCivetweb.cxx
@@ -32,14 +32,8 @@ class TCivetwebWSEngine : public THttpWSEngine {
 protected:
    struct mg_connection *fWSconn;
 
-   /// Indicate if engine support send operation from different threads
-   virtual Bool_t SupportMT() const { return kTRUE; }
-
-   /// One always can send data to websocket - as long as previous send operation completed
-   virtual Bool_t CanSendDirectly() { return kTRUE; }
-
    /// True websocket requires extra thread to parallelize sending
-   virtual Bool_t RequireSendThrd() const { return kTRUE; }
+   virtual Bool_t SupportSendThrd() const { return kTRUE; }
 
 public:
    TCivetwebWSEngine(struct mg_connection *conn) : THttpWSEngine(), fWSconn(conn) {}
diff --git a/net/http/src/THttpWSEngine.h b/net/http/src/THttpWSEngine.h
index a108ee34440..9edbb454168 100644
--- a/net/http/src/THttpWSEngine.h
+++ b/net/http/src/THttpWSEngine.h
@@ -39,13 +39,17 @@ protected:
    THttpWSEngine() = default;
 
    /// Indicate if engine require extra thread to complete postponed thread operation
-   virtual Bool_t RequireSendThrd() const { return kFALSE; }
+   virtual Bool_t SupportSendThrd() const { return kFALSE; }
 
-   virtual Bool_t CanSendDirectly() { return kFALSE; }
+   /// One always can send data to websocket - as long as previous send operation completed
+   virtual Bool_t CanSendDirectly() { return kTRUE; }
 
 public:
    virtual ~THttpWSEngine() {}
 
+   /// Returns kTRUE when handle is deactivated and need to be destroyed
+   Bool_t IsDisabled() const { return fDisabled; }
+
    virtual UInt_t GetId() const = 0;
 
    virtual void ClearHandle() = 0;
diff --git a/net/http/src/THttpWSHandler.cxx b/net/http/src/THttpWSHandler.cxx
index 0cc9adbc590..fb41a301a87 100644
--- a/net/http/src/THttpWSHandler.cxx
+++ b/net/http/src/THttpWSHandler.cxx
@@ -13,8 +13,10 @@
 
 #include "THttpWSEngine.h"
 #include "THttpCallArg.h"
+#include "TSystem.h"
 
 #include <thread>
+#include <chrono>
 
 /////////////////////////////////////////////////////////////////////////
 ///
@@ -66,7 +68,7 @@ ClassImp(THttpWSHandler);
 ////////////////////////////////////////////////////////////////////////////////
 /// normal constructor
 
-THttpWSHandler::THttpWSHandler(const char *name, const char *title) : TNamed(name, title)
+THttpWSHandler::THttpWSHandler(const char *name, const char *title, Bool_t syncmode) : TNamed(name, title), fSyncMode(syncmode)
 {
 }
 
@@ -229,43 +231,6 @@ void THttpWSHandler::CloseWS(UInt_t wsid)
       RemoveEngine(engine);
 }
 
-////////////////////////////////////////////////////////////////////////////////
-/// Send binary data via given websocket id
-/// Returns -1 - in case of error
-///          0 - when operation was executed immediately
-///          1 - when send operation will be performed in different thread
-
-Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
-{
-   auto engine = FindEngine(wsid, kTRUE);
-   if (!engine) return -1;
-
-   if (!AllowMTSend() || engine->CanSendDirectly()) {
-      engine->Send(buf, len);
-      engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
-      CompleteWSSend(engine->GetId());
-      return 0;
-   }
-
-   // now we indicate that there is data and any thread can access it
-   {
-      std::lock_guard<std::mutex> grd(engine->fDataMutex);
-
-      if (engine->fKind != THttpWSEngine::kNone) {
-         Error("SendWS", "Data kind is not empty - something screwed up");
-         return -1;
-      }
-
-      engine->fData.resize(len);
-      std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
-
-      engine->fDoingSend = false;
-      engine->fKind = THttpWSEngine::kData;
-   }
-
-   return RunSendingThrd(engine);
-}
-
 ////////////////////////////////////////////////////////////////////////////////
 /// Send data stored in the buffer
 /// Returns 0 - when operation was executed immediately
@@ -273,15 +238,36 @@ Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
 
 Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
 {
-   if (!engine->RequireSendThrd()) {
+   if (IsSyncMode() || !engine->SupportSendThrd()) {
       // this is case of longpoll engine, no extra thread is required for it
       if (engine->CanSendDirectly())
          return PerformSend(engine);
 
-      // handling will be performed in http request handler
-      return 1;
+
+      // handling will be performed in following http request handler
+
+      if (!IsSyncMode()) return 1;
+
+      // now we should wait until next polling requests is processed
+      // or when connection is closed or handler is shutdown
+
+      Int_t sendcnt = fSendCnt, loopcnt(0);
+
+      while (!IsDisabled() && !engine->IsDisabled()) {
+         gSystem->ProcessEvents();
+         // if send counter changed - current send operation is completed
+         if (sendcnt != fSendCnt)
+            return 0;
+         if (loopcnt++ > 1000) {
+            loopcnt = 0;
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+         }
+      }
+
+      return -1;
    }
 
+   // probably this thread can continuously run
    std::thread thrd([this, engine] {
       PerformSend(engine);
    });
@@ -335,12 +321,58 @@ Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
       engine->fKind = THttpWSEngine::kNone;
    }
 
+   return CompleteSend(engine);
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+/// Complete current send operation
+
+Int_t THttpWSHandler::CompleteSend(std::shared_ptr<THttpWSEngine> &engine)
+{
+   fSendCnt++;
    engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
    CompleteWSSend(engine->GetId());
+   return 0; // indicates that operation is completed
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+/// Send binary data via given websocket id
+/// Returns -1 - in case of error
+///          0 - when operation was executed immediately
+///          1 - when send operation will be performed in different thread
+
+Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
+{
+   auto engine = FindEngine(wsid, kTRUE);
+   if (!engine) return -1;
+
+   if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
+      engine->Send(buf, len);
+      return CompleteSend(engine);
+   }
 
-   return 0;
+   // now we indicate that there is data and any thread can access it
+   {
+      std::lock_guard<std::mutex> grd(engine->fDataMutex);
+
+      if (engine->fKind != THttpWSEngine::kNone) {
+         Error("SendWS", "Data kind is not empty - something screwed up");
+         return -1;
+      }
+
+      engine->fData.resize(len);
+      std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
+
+      engine->fDoingSend = false;
+      engine->fKind = THttpWSEngine::kData;
+   }
+
+   return RunSendingThrd(engine);
 }
 
+
 ////////////////////////////////////////////////////////////////////////////////
 /// Send binary data with text header via given websocket id
 /// Returns -1 - in case of error,
@@ -352,14 +384,11 @@ Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf
    auto engine = FindEngine(wsid, kTRUE);
    if (!engine) return -1;
 
-   if (!AllowMTSend() || engine->CanSendDirectly()) {
+   if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
       engine->SendHeader(hdr, buf, len);
-      engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
-      CompleteWSSend(engine->GetId());
-      return 0;
+      return CompleteSend(engine);
    }
 
-
    // now we indicate that there is data and any thread can access it
    {
       std::lock_guard<std::mutex> grd(engine->fDataMutex);
@@ -391,11 +420,9 @@ Int_t THttpWSHandler::SendCharStarWS(UInt_t wsid, const char *str)
    auto engine = FindEngine(wsid, kTRUE);
    if (!engine) return -1;
 
-   if (!AllowMTSend() || engine->CanSendDirectly()) {
+   if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
       engine->SendCharStar(str);
-      engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
-      CompleteWSSend(engine->GetId());
-      return 0;
+      return CompleteSend(engine);
    }
 
    // now we indicate that there is data and any thread can access it
-- 
GitLab