Skip to content
Snippets Groups Projects
Commit 57b2c1a8 authored by Sergey Linev's avatar Sergey Linev Committed by Philippe Canal
Browse files

http: change API to handle WS send operation

Now no queues are allowed, all operation are confirmed by WS handler
parent c5b42d78
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <vector> #include <vector>
#include <memory> #include <memory>
#include <mutex>
class THttpWSEngine; class THttpWSEngine;
class THttpServer; class THttpServer;
...@@ -27,13 +28,18 @@ friend class THttpServer; ...@@ -27,13 +28,18 @@ friend class THttpServer;
private: private:
std::mutex fMutex; ///<! protect list of engines
std::vector<std::shared_ptr<THttpWSEngine>> fEngines; ///<! list of active WS engines (connections) std::vector<std::shared_ptr<THttpWSEngine>> fEngines; ///<! list of active WS engines (connections)
Bool_t fDisabled; ///<! when true, all operations will be ignored Bool_t fDisabled{kFALSE}; ///<! when true, all operations will be ignored
std::shared_ptr<THttpWSEngine> FindEngine(UInt_t id) const; std::shared_ptr<THttpWSEngine> FindEngine(UInt_t id, Bool_t book_send = kFALSE);
Bool_t HandleWS(std::shared_ptr<THttpCallArg> &arg); Bool_t HandleWS(std::shared_ptr<THttpCallArg> &arg);
Int_t RunSendingThrd(std::shared_ptr<THttpWSEngine> engine);
Int_t PerformSend(std::shared_ptr<THttpWSEngine> engine);
void RemoveEngine(std::shared_ptr<THttpWSEngine> &engine); void RemoveEngine(std::shared_ptr<THttpWSEngine> &engine);
protected: protected:
...@@ -41,7 +47,7 @@ protected: ...@@ -41,7 +47,7 @@ protected:
THttpWSHandler(const char *name, const char *title); THttpWSHandler(const char *name, const char *title);
/// Method called when multi-threaded send operation is completed /// Method called when multi-threaded send operation is completed
virtual void CompleteMTSend(UInt_t) {} virtual void CompleteWSSend(UInt_t) {}
public: public:
virtual ~THttpWSHandler(); virtual ~THttpWSHandler();
...@@ -66,12 +72,12 @@ public: ...@@ -66,12 +72,12 @@ public:
void SetDisabled() { fDisabled = kTRUE; } void SetDisabled() { fDisabled = kTRUE; }
/// Return kTRUE if websocket with given ID exists /// Return kTRUE if websocket with given ID exists
Bool_t HasWS(UInt_t wsid) const { return !!FindEngine(wsid); } Bool_t HasWS(UInt_t wsid) { return !!FindEngine(wsid); }
/// Returns current number of websocket connections /// Returns current number of websocket connections
Int_t GetNumWS() const { return fEngines.size(); } Int_t GetNumWS();
UInt_t GetWS(Int_t num = 0) const; UInt_t GetWS(Int_t num = 0);
void CloseWS(UInt_t wsid); void CloseWS(UInt_t wsid);
......
...@@ -35,6 +35,12 @@ protected: ...@@ -35,6 +35,12 @@ protected:
/// Indicate if engine support send operation from different threads /// Indicate if engine support send operation from different threads
virtual Bool_t SupportMT() const { return kTRUE; } virtual Bool_t SupportMT() const { return kTRUE; }
/// One always can send data to websocket - as long as previous send operation completed
virtual Bool_t CanSendDirectly() { return kTRUE; }
/// True websocket requires extra thread to parallelize sending
virtual Bool_t RequireSendThrd() const { return kTRUE; }
public: public:
TCivetwebWSEngine(struct mg_connection *conn) : THttpWSEngine(), fWSconn(conn) {} TCivetwebWSEngine(struct mg_connection *conn) : THttpWSEngine(), fWSconn(conn) {}
......
...@@ -173,20 +173,8 @@ Bool_t THttpLongPollEngine::PreviewData(std::shared_ptr<THttpCallArg> &arg) ...@@ -173,20 +173,8 @@ Bool_t THttpLongPollEngine::PreviewData(std::shared_ptr<THttpCallArg> &arg)
fPoll.reset(); fPoll.reset();
} }
if (fQueue.size() > 0) { arg->SetPostponed(); // mark http request as pending, http server should wait for notification
QueueItem &item = fQueue.front(); fPoll = arg; // keep reference on polling request
if (item.fBinary) {
arg->SetBinaryContent(std::move(item.fData));
if (!fRaw && !item.fHdr.empty())
arg->SetExtraHeader("LongpollHeader", item.fHdr.c_str());
} else {
arg->SetTextContent(std::move(item.fData));
}
fQueue.pop();
} else {
arg->SetPostponed(); // mark http request as pending, http server should wait for notification
fPoll = arg; // keep reference on polling request
}
// if arguments has "&dummy" string, user should not process it // if arguments has "&dummy" string, user should not process it
return kTRUE; return kTRUE;
...@@ -196,24 +184,16 @@ Bool_t THttpLongPollEngine::PreviewData(std::shared_ptr<THttpCallArg> &arg) ...@@ -196,24 +184,16 @@ Bool_t THttpLongPollEngine::PreviewData(std::shared_ptr<THttpCallArg> &arg)
/// Normally requests from client does not replied directly for longpoll socket /// Normally requests from client does not replied directly for longpoll socket
/// Therefore one can use such request to send data, which was submitted before to the queue /// Therefore one can use such request to send data, which was submitted before to the queue
void THttpLongPollEngine::PostProcess(std::shared_ptr<THttpCallArg> &arg) Bool_t THttpLongPollEngine::PostProcess(std::shared_ptr<THttpCallArg> &arg)
{ {
// request with gLongPollNope content indicates, that "dummy" request was not changed by the user // request with gLongPollNope content indicates, that "dummy" request was not changed by the user
if (!arg->IsText() || (arg->GetContentLength() != (Int_t)gLongPollNope.length()) || if (!arg->IsText() || (arg->GetContentLength() != (Int_t)gLongPollNope.length()) ||
(gLongPollNope.compare((const char *)arg->GetContent()) != 0)) (gLongPollNope.compare((const char *)arg->GetContent()) != 0))
return; return kFALSE;
if (fQueue.size() > 0) { IsSomethingInterestingToSend()?;
QueueItem &item = fQueue.front();
if (item.fBinary) { if (fRaw) {
arg->SetBinaryContent(std::move(item.fData));
if (!fRaw && !item.fHdr.empty())
arg->SetExtraHeader("LongpollHeader", item.fHdr.c_str());
} else {
arg->SetTextContent(std::move(item.fData));
}
fQueue.pop();
} else if (fRaw) {
arg->SetContent(std::string("txt:") + gLongPollNope); arg->SetContent(std::string("txt:") + gLongPollNope);
} }
} }
...@@ -37,6 +37,8 @@ protected: ...@@ -37,6 +37,8 @@ protected:
std::string MakeBuffer(const void *buf, int len, const char *hdr = nullptr); std::string MakeBuffer(const void *buf, int len, const char *hdr = nullptr);
virtual Bool_t CanSendDirectly() { return fPoll; }
public: public:
THttpLongPollEngine(bool raw = false); THttpLongPollEngine(bool raw = false);
......
...@@ -1089,12 +1089,14 @@ Bool_t THttpServer::ExecuteWS(std::shared_ptr<THttpCallArg> &arg, Bool_t externa ...@@ -1089,12 +1089,14 @@ Bool_t THttpServer::ExecuteWS(std::shared_ptr<THttpCallArg> &arg, Bool_t externa
return kTRUE; return kTRUE;
} }
if (!handler) { if (!handler)
return kFALSE; return kFALSE;
} else if (arg->fFileName == "root.websocket") {
Bool_t process = kFALSE;
if (arg->fFileName == "root.websocket") {
// handling of web socket // handling of web socket
if (!handler->HandleWS(arg)) process = handler->HandleWS(arg);
arg->Set404();
} else if (arg->fFileName == "root.longpoll") { } else if (arg->fFileName == "root.longpoll") {
// ROOT emulation of websocket with polling requests // ROOT emulation of websocket with polling requests
if ((arg->fQuery == "connect") || (arg->fQuery == "connect_raw")) { if ((arg->fQuery == "connect") || (arg->fQuery == "connect_raw")) {
...@@ -1115,8 +1117,8 @@ Bool_t THttpServer::ExecuteWS(std::shared_ptr<THttpCallArg> &arg, Bool_t externa ...@@ -1115,8 +1117,8 @@ Bool_t THttpServer::ExecuteWS(std::shared_ptr<THttpCallArg> &arg, Bool_t externa
} else { } else {
arg->TakeWSEngine(); // delete handle arg->TakeWSEngine(); // delete handle
} }
if (!arg->IsText())
arg->Set404(); process = arg->IsText();
} else { } else {
TUrl url; TUrl url;
url.SetOptions(arg->fQuery); url.SetOptions(arg->fQuery);
...@@ -1130,16 +1132,13 @@ Bool_t THttpServer::ExecuteWS(std::shared_ptr<THttpCallArg> &arg, Bool_t externa ...@@ -1130,16 +1132,13 @@ Bool_t THttpServer::ExecuteWS(std::shared_ptr<THttpCallArg> &arg, Bool_t externa
arg->SetMethod("WS_DATA"); arg->SetMethod("WS_DATA");
} }
if (!handler->HandleWS(arg)) process = handler->HandleWS(arg);
arg->Set404();
} }
} else {
// non-supported WS kind
arg->Set404();
return kFALSE;
} }
return kTRUE; if (!process) arg->Set404();
return process;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
......
...@@ -34,7 +34,7 @@ void THttpWSEngine::SendCharStar(const char *str) ...@@ -34,7 +34,7 @@ void THttpWSEngine::SendCharStar(const char *str)
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// Method should be invoked before processing data coming from websocket /// Method should be invoked before processing data coming from websocket
/// If method returns kTRUE, this is data is processed internally and /// If method returns kTRUE, data is processed internally and
/// not dedicated for further usage /// not dedicated for further usage
Bool_t THttpWSEngine::PreviewData(std::shared_ptr<THttpCallArg> &) Bool_t THttpWSEngine::PreviewData(std::shared_ptr<THttpCallArg> &)
...@@ -44,8 +44,9 @@ Bool_t THttpWSEngine::PreviewData(std::shared_ptr<THttpCallArg> &) ...@@ -44,8 +44,9 @@ Bool_t THttpWSEngine::PreviewData(std::shared_ptr<THttpCallArg> &)
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// Method invoked after user process data received via websocket /// Method invoked after user process data received via websocket
/// Normally request is no longer usable after that /// If returns kTRUE, websocket can be checked if next operation can be performed
void THttpWSEngine::PostProcess(std::shared_ptr<THttpCallArg> &) Bool_t THttpWSEngine::PostProcess(std::shared_ptr<THttpCallArg> &)
{ {
return kFALSE;
} }
...@@ -16,6 +16,9 @@ ...@@ -16,6 +16,9 @@
#include "THttpCallArg.h" #include "THttpCallArg.h"
#include <mutex>
#include <string>
class THttpWSHandler; class THttpWSHandler;
class THttpWSEngine { class THttpWSEngine {
...@@ -23,13 +26,22 @@ class THttpWSEngine { ...@@ -23,13 +26,22 @@ class THttpWSEngine {
private: private:
friend class THttpWSHandler; friend class THttpWSHandler;
bool fMTSend{false}; ///< true when multithreaded send operation is active bool fMTSend{false}; ///<! true when send operation runs, set under locked fMutex from WSHandler
bool fDisabled{false}; ///<! set shortly before cleanup
std::mutex fDataMutex; ///<! protects data submited for send operation
enum { kNone, kData, kHeader, kText } fKind{kNone}; ///<! kind of operation
bool fDoingSend{false}; ///<! doing send operation in other thread
std::string fData; ///<! data (binary or text)
std::string fHdr; ///<! header
protected: protected:
THttpWSEngine() = default; THttpWSEngine() = default;
/// Indicate if engine support send operation from different threads /// Indicate if engine require extra thread to complete postponed thread operation
virtual Bool_t SupportMT() const { return kFALSE; } virtual Bool_t RequireSendThrd() const { return kFALSE; }
virtual Bool_t CanSendDirectly() { return kFALSE; }
public: public:
virtual ~THttpWSEngine() {} virtual ~THttpWSEngine() {}
...@@ -46,7 +58,7 @@ public: ...@@ -46,7 +58,7 @@ public:
virtual Bool_t PreviewData(std::shared_ptr<THttpCallArg> &arg); virtual Bool_t PreviewData(std::shared_ptr<THttpCallArg> &arg);
virtual void PostProcess(std::shared_ptr<THttpCallArg> &arg); virtual Bool_t PostProcess(std::shared_ptr<THttpCallArg> &arg);
}; };
#endif #endif
...@@ -66,7 +66,7 @@ ClassImp(THttpWSHandler); ...@@ -66,7 +66,7 @@ ClassImp(THttpWSHandler);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// normal constructor /// normal constructor
THttpWSHandler::THttpWSHandler(const char *name, const char *title) : TNamed(name, title), fEngines(), fDisabled(kFALSE) THttpWSHandler::THttpWSHandler(const char *name, const char *title) : TNamed(name, title)
{ {
} }
...@@ -79,26 +79,51 @@ THttpWSHandler::~THttpWSHandler() ...@@ -79,26 +79,51 @@ THttpWSHandler::~THttpWSHandler()
SetDisabled(); SetDisabled();
} }
/// Returns current number of websocket connections
Int_t THttpWSHandler::GetNumWS()
{
std::lock_guard<std::mutex> grd(fMutex);
return fEngines.size();
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// Return websocket id with given sequential number /// Return websocket id with given sequential number
/// Number of websockets return with GetNumWS() method /// Number of websockets returned with GetNumWS() method
UInt_t THttpWSHandler::GetWS(Int_t num) const UInt_t THttpWSHandler::GetWS(Int_t num)
{ {
std::lock_guard<std::mutex> grd(fMutex);
auto iter = fEngines.begin() + num; auto iter = fEngines.begin() + num;
return (*iter)->GetId(); return (*iter)->GetId();
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// Find websocket connection handle with given id /// Find websocket connection handle with given id
/// If book_send parameter specified, have to book send operation under the mutex
std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid) const std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid, Bool_t book_send)
{ {
if (IsDisabled()) return nullptr; if (IsDisabled())
return nullptr;
std::lock_guard<std::mutex> grd(fMutex);
for (auto &eng : fEngines) for (auto &eng : fEngines)
if (eng->GetId() == wsid) if (eng->GetId() == wsid) {
// not allow to work with disabled engine
if (eng->fDisabled)
return nullptr;
if (book_send) {
if (eng->fMTSend) {
Error("FindEngine", "Try to book next send operation before previous completed");
return nullptr;
}
eng->fMTSend = kTRUE;
}
return eng; return eng;
}
return nullptr; return nullptr;
} }
...@@ -108,12 +133,21 @@ std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid) const ...@@ -108,12 +133,21 @@ std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid) const
void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine) void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine)
{ {
for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++) {
if (*iter == engine) { std::lock_guard<std::mutex> grd(fMutex);
engine->ClearHandle();
fEngines.erase(iter); for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++)
break; if (*iter == engine) {
} if (engine->fMTSend)
Error("RemoveEngine", "Trying to remove WS engine during send operation");
engine->fDisabled = true;
fEngines.erase(iter);
break;
}
}
engine->ClearHandle();
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
...@@ -126,7 +160,8 @@ void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine) ...@@ -126,7 +160,8 @@ void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine)
Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg) Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
{ {
if (IsDisabled()) return kFALSE; if (IsDisabled())
return kFALSE;
if (!arg->GetWSId()) if (!arg->GetWSId())
return ProcessWS(arg.get()); return ProcessWS(arg.get());
...@@ -145,7 +180,10 @@ Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg) ...@@ -145,7 +180,10 @@ Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
} }
engine = arg->TakeWSEngine(); engine = arg->TakeWSEngine();
fEngines.push_back(engine); {
std::lock_guard<std::mutex> grd(fMutex);
fEngines.emplace_back(engine);
}
if (!ProcessWS(arg.get())) { if (!ProcessWS(arg.get())) {
// if connection refused, remove engine again // if connection refused, remove engine again
...@@ -167,13 +205,20 @@ Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg) ...@@ -167,13 +205,20 @@ Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
return ProcessWS(arg.get()); return ProcessWS(arg.get());
} }
if (engine && engine->PreviewData(arg)) Bool_t check_send = engine ? engine->PreviewData(arg) : kFALSE;
return kTRUE;
Bool_t res = ProcessWS(arg.get()); Bool_t res = kTRUE;
if (!check_send) {
res = ProcessWS(arg.get());
check_send = engine ? engine->PostProcess(arg) : kFALSE;
}
if (check_send)
PerformSend(engine);
if (engine)
engine->PostProcess(arg);
return res; return res;
} }
...@@ -191,35 +236,60 @@ void THttpWSHandler::CloseWS(UInt_t wsid) ...@@ -191,35 +236,60 @@ void THttpWSHandler::CloseWS(UInt_t wsid)
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// Send binary data via given websocket id /// Send binary data via given websocket id
/// Returns -1 - in case of error, /// Returns -1 - in case of error
/// 0 - when operation was executed immediately, /// 0 - when operation was executed immediately
/// 1 - when send operation will be performed in different thread, /// 1 - when send operation will be performed in different thread
Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len) Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
{ {
auto engine = FindEngine(wsid); auto engine = FindEngine(wsid, kTRUE);
if (!engine) return -1; if (!engine) return -1;
if (engine->fMTSend) { if (!AllowMTSend() && engine->CanSendDirectly()) {
Error("SendWS", "Call next send operation before previous is completed");
return -1;
}
if (!AllowMTSend() || !engine->SupportMT()) {
engine->Send(buf, len); engine->Send(buf, len);
engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
CompleteWSSend(engine->GetId());
return 0; return 0;
} }
engine->fMTSend = true; // now we indicate that there is data and any thread can access it
{
std::lock_guard<std::mutex> grd(engine->fDataMutex);
if (engine->fKind != THttpWSEngine::kNone) {
Error("SendWS", "Data kind is not empty - something screwed up");
return -1;
}
engine->fData.resize(len);
std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
engine->fDoingSend = false;
engine->fKind = THttpWSEngine::kData;
}
return RunSendingThrd(engine);
}
////////////////////////////////////////////////////////////////////////////////
/// Send data stored in the buffer
/// Returns 0 - when operation was executed immediately
/// 1 - when send operation will be performed in different thread
std::string argbuf; Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
argbuf.resize(len); {
std::copy((const char *)buf, (const char *)buf + len, argbuf.begin()); // actually lonpoll engine does not require thread to reply data in buffer
if (!engine->RequireSendThrd()) {
if (engine->CanSendDirectly())
return PerformSend(engine);
// handling will be performed in http request handler
return 1;
}
std::thread thrd([this, argbuf, engine] { std::thread thrd([this, engine] {
engine->Send(argbuf.data(), argbuf.length()); PerformSend(engine);
engine->fMTSend = false;
if (!IsDisabled()) CompleteMTSend(engine->GetId());
}); });
thrd.detach(); // let continue thread execution without thread handle thrd.detach(); // let continue thread execution without thread handle
...@@ -227,6 +297,56 @@ Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len) ...@@ -227,6 +297,56 @@ Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
return 1; return 1;
} }
////////////////////////////////////////////////////////////////////////////////
/// Perform send operation, stored in buffer
Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
{
{
std::lock_guard<std::mutex> grd(engine->fDataMutex);
// no need to do somthing - operation was processed already by somebody else
if (engine->fKind == THttpWSEngine::kNone)
return 0;
if (engine->fDoingSend)
return 1;
engine->fDoingSend = true;
}
if (IsDisabled() || engine->fDisabled)
return 0;
switch (engine->fKind) {
case THttpWSEngine::kData:
engine->Send(engine->fData.data(), engine->fData.length());
break;
case THttpWSEngine::kHeader:
engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
break;
case THttpWSEngine::kText:
engine->SendCharStar(engine->fData.c_str());
break;
default:
break;
}
engine->fData.clear();
engine->fHdr.clear();
{
std::lock_guard<std::mutex> grd(engine->fDataMutex);
engine->fDoingSend = false;
engine->fKind = THttpWSEngine::kNone;
}
engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
CompleteWSSend(engine->GetId());
return 0;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// Send binary data with text header via given websocket id /// Send binary data with text header via given websocket id
/// Returns -1 - in case of error, /// Returns -1 - in case of error,
...@@ -235,34 +355,35 @@ Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len) ...@@ -235,34 +355,35 @@ Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len) Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
{ {
auto engine = FindEngine(wsid); auto engine = FindEngine(wsid, kTRUE);
if (!engine) return -1; if (!engine) return -1;
if (engine->fMTSend) { if (!AllowMTSend() && engine->CanSendDirectly()) {
Error("SendHeaderWS", "Call next send operation before previous is completed");
return -1;
}
if (!AllowMTSend() || !engine->SupportMT()) {
engine->SendHeader(hdr, buf, len); engine->SendHeader(hdr, buf, len);
engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
CompleteWSSend(engine->GetId());
return 0; return 0;
} }
engine->fMTSend = true;
std::string arghdr(hdr), argbuf; // now we indicate that there is data and any thread can access it
argbuf.resize(len); {
std::copy((const char *)buf, (const char *)buf + len, argbuf.begin()); std::lock_guard<std::mutex> grd(engine->fDataMutex);
std::thread thrd([this, arghdr, argbuf, engine] { if (engine->fKind != THttpWSEngine::kNone) {
engine->SendHeader(arghdr.c_str(), argbuf.data(), argbuf.length()); Error("SendWS", "Data kind is not empty - something screwed up");
engine->fMTSend = false; return -1;
if (!IsDisabled()) CompleteMTSend(engine->GetId()); }
});
thrd.detach(); // let continue thread execution without thread handle engine->fHdr = hdr;
engine->fData.resize(len);
std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
return 1; engine->fDoingSend = false;
engine->fKind = THttpWSEngine::kHeader;
}
return RunSendingThrd(engine);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
...@@ -273,30 +394,30 @@ Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf ...@@ -273,30 +394,30 @@ Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf
Int_t THttpWSHandler::SendCharStarWS(UInt_t wsid, const char *str) Int_t THttpWSHandler::SendCharStarWS(UInt_t wsid, const char *str)
{ {
auto engine = FindEngine(wsid); auto engine = FindEngine(wsid, kTRUE);
if (!engine) return -1; if (!engine) return -1;
if (engine->fMTSend) { if (!AllowMTSend() && engine->CanSendDirectly()) {
Error("SendCharStarWS", "Call next send operation before previous is completed");
return -1;
}
if (!AllowMTSend() || !engine->SupportMT()) {
engine->SendCharStar(str); engine->SendCharStar(str);
engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
CompleteWSSend(engine->GetId());
return 0; return 0;
} }
engine->fMTSend = true; // now we indicate that there is data and any thread can access it
{
std::lock_guard<std::mutex> grd(engine->fDataMutex);
std::string arg(str); if (engine->fKind != THttpWSEngine::kNone) {
Error("SendWS", "Data kind is not empty - something screwed up");
return -1;
}
std::thread thrd([this, arg, engine] { engine->fData = str;
engine->SendCharStar(arg.c_str());
engine->fMTSend = false;
if (!IsDisabled()) CompleteMTSend(engine->GetId());
});
thrd.detach(); // let continue thread execution without thread handle engine->fDoingSend = false;
engine->fKind = THttpWSEngine::kText;
}
return 1; return RunSendingThrd(engine);
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment