diff --git a/net/http/inc/THttpWSHandler.h b/net/http/inc/THttpWSHandler.h index d26a490e0d09e430ec0d57ce31f6e918704a2b1a..deb27253ce2f8f8f48523c469d4775cafa140366 100644 --- a/net/http/inc/THttpWSHandler.h +++ b/net/http/inc/THttpWSHandler.h @@ -28,12 +28,13 @@ friend class THttpServer; private: std::vector<std::shared_ptr<THttpWSEngine>> fEngines; ///<! list of active WS engines (connections) + Bool_t fDisabled; ///<! when true, all operations will be ignored - THttpWSEngine *FindEngine(UInt_t id) const; + std::shared_ptr<THttpWSEngine> FindEngine(UInt_t id) const; Bool_t HandleWS(std::shared_ptr<THttpCallArg> &arg); - void RemoveEngine(THttpWSEngine *engine); + void RemoveEngine(std::shared_ptr<THttpWSEngine> &engine); protected: @@ -55,8 +56,14 @@ public: /// Allow send operations in separate threads (when supported by websocket engine) virtual Bool_t AllowMT() const { return kFALSE; } + /// Returns true when processing of websockets is disabled + Bool_t IsDisabled() const { return fDisabled; } + + /// Disable all processing of websockets, normally called shortly before destructor + void SetDisabled() { fDisabled = kTRUE; } + /// Return kTRUE if websocket with given ID exists - Bool_t HasWS(UInt_t wsid) const { return FindEngine(wsid) != 0; } + Bool_t HasWS(UInt_t wsid) const { return !!FindEngine(wsid); } /// Returns current number of websocket connections Int_t GetNumWS() const { return fEngines.size(); } diff --git a/net/http/src/TCivetweb.cxx b/net/http/src/TCivetweb.cxx index 73d1866b80f7908bec09820fd156fee7f1a1342d..1790962f6a10d850b6667fb96aba22f909217079 100644 --- a/net/http/src/TCivetweb.cxx +++ b/net/http/src/TCivetweb.cxx @@ -32,6 +32,9 @@ class TCivetwebWSEngine : public THttpWSEngine { protected: struct mg_connection *fWSconn; + /// Indicate if engine support send operation from different threads + virtual Bool_t SupportMT() const { return kTRUE; } + public: TCivetwebWSEngine(struct mg_connection *conn) : THttpWSEngine(), fWSconn(conn) {} diff --git a/net/http/src/THttpServer.cxx b/net/http/src/THttpServer.cxx index 015dfee27ab3a1314244f45ca455fbaa07bb5c36..1b7c039cfa292c895fe570cb8acc28a18363157e 100644 --- a/net/http/src/THttpServer.cxx +++ b/net/http/src/THttpServer.cxx @@ -22,7 +22,6 @@ #include "TRegexp.h" #include "THttpEngine.h" -#include "THttpWSEngine.h" #include "THttpLongPollEngine.h" #include "THttpWSHandler.h" #include "TRootSniffer.h" diff --git a/net/http/src/THttpWSHandler.cxx b/net/http/src/THttpWSHandler.cxx index 18a60141e6f6cdfa138de53942355b0dec5ef972..e4468759da73ef775af385d2096f4dd5bee389bd 100644 --- a/net/http/src/THttpWSHandler.cxx +++ b/net/http/src/THttpWSHandler.cxx @@ -66,7 +66,7 @@ ClassImp(THttpWSHandler); //////////////////////////////////////////////////////////////////////////////// /// normal constructor -THttpWSHandler::THttpWSHandler(const char *name, const char *title) : TNamed(name, title), fEngines() +THttpWSHandler::THttpWSHandler(const char *name, const char *title) : TNamed(name, title), fEngines(), fDisabled(kFALSE) { } @@ -76,6 +76,7 @@ THttpWSHandler::THttpWSHandler(const char *name, const char *title) : TNamed(nam THttpWSHandler::~THttpWSHandler() { + SetDisabled(); } //////////////////////////////////////////////////////////////////////////////// @@ -91,11 +92,13 @@ UInt_t THttpWSHandler::GetWS(Int_t num) const //////////////////////////////////////////////////////////////////////////////// /// Find websocket connection handle with given id -THttpWSEngine *THttpWSHandler::FindEngine(UInt_t wsid) const +std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid) const { + if (IsDisabled()) return nullptr; + for (auto &eng : fEngines) if (eng->GetId() == wsid) - return eng.get(); + return eng; return nullptr; } @@ -103,10 +106,11 @@ THttpWSEngine *THttpWSHandler::FindEngine(UInt_t wsid) const //////////////////////////////////////////////////////////////////////////////// /// Remove and destroy WS connection -void THttpWSHandler::RemoveEngine(THttpWSEngine *engine) +void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine) { for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++) - if (iter->get() == engine) { + if (*iter == engine) { + engine->ClearHandle(); fEngines.erase(iter); break; } @@ -122,6 +126,8 @@ void THttpWSHandler::RemoveEngine(THttpWSEngine *engine) Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg) { + if (IsDisabled()) return kFALSE; + if (!arg->GetWSId()) return ProcessWS(arg.get()); @@ -129,7 +135,7 @@ Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg) if (arg->IsMethod("WS_CONNECT")) return ProcessWS(arg.get()); - THttpWSEngine *engine = FindEngine(arg->GetWSId()); + auto engine = FindEngine(arg->GetWSId()); if (arg->IsMethod("WS_READY")) { @@ -138,9 +144,8 @@ Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg) RemoveEngine(engine); } - auto handle = arg->TakeWSEngine(); - engine = handle.get(); - fEngines.push_back(std::move(handle)); + engine = arg->TakeWSEngine(); + fEngines.push_back(engine); if (!ProcessWS(arg.get())) { // if connection refused, remove engine again @@ -178,7 +183,7 @@ Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg) void THttpWSHandler::CloseWS(UInt_t wsid) { - THttpWSEngine *engine = FindEngine(wsid); + auto engine = FindEngine(wsid); if (engine) RemoveEngine(engine); @@ -192,7 +197,7 @@ void THttpWSHandler::CloseWS(UInt_t wsid) Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len) { - THttpWSEngine *engine = FindEngine(wsid); + auto engine = FindEngine(wsid); if (!engine) return -1; if (engine->fMTSend) { @@ -207,12 +212,14 @@ Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len) engine->fMTSend = true; - std::string argbuf((const char *)buf, len); + std::string argbuf; + argbuf.resize(len); + std::copy((const char *)buf, (const char *)buf + len, argbuf.begin()); std::thread thrd([this, argbuf, engine] { engine->Send(argbuf.data(), argbuf.length()); engine->fMTSend = false; - CompleteMTSend(engine->GetId()); + if (!IsDisabled()) CompleteMTSend(engine->GetId()); }); thrd.detach(); // let continue thread execution without thread handle @@ -228,7 +235,7 @@ Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len) Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len) { - THttpWSEngine *engine = FindEngine(wsid); + auto engine = FindEngine(wsid); if (!engine) return -1; if (engine->fMTSend) { @@ -243,12 +250,14 @@ Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf engine->fMTSend = true; - std::string arghdr(hdr), argbuf((const char *) buf, len); + std::string arghdr(hdr), argbuf; + argbuf.resize(len); + std::copy((const char *)buf, (const char *)buf + len, argbuf.begin()); std::thread thrd([this, arghdr, argbuf, engine] { engine->SendHeader(arghdr.c_str(), argbuf.data(), argbuf.length()); engine->fMTSend = false; - CompleteMTSend(engine->GetId()); + if (!IsDisabled()) CompleteMTSend(engine->GetId()); }); thrd.detach(); // let continue thread execution without thread handle @@ -264,7 +273,7 @@ Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf Int_t THttpWSHandler::SendCharStarWS(UInt_t wsid, const char *str) { - THttpWSEngine *engine = FindEngine(wsid); + auto engine = FindEngine(wsid); if (!engine) return -1; if (engine->fMTSend) { @@ -284,7 +293,7 @@ Int_t THttpWSHandler::SendCharStarWS(UInt_t wsid, const char *str) std::thread thrd([this, arg, engine] { engine->SendCharStar(arg.c_str()); engine->fMTSend = false; - CompleteMTSend(engine->GetId()); + if (!IsDisabled()) CompleteMTSend(engine->GetId()); }); thrd.detach(); // let continue thread execution without thread handle