Skip to content
Snippets Groups Projects
THttpWSHandler.cxx 12.2 KiB
Newer Older
// $Id$
// Author: Sergey Linev   20/10/2017

/*************************************************************************
 * Copyright (C) 1995-2013, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

#include "THttpWSHandler.h"

#include "THttpWSEngine.h"
#include "THttpCallArg.h"

/////////////////////////////////////////////////////////////////////////
///
/// THttpWSHandler
///
/// Class for user-side handling of websocket with THttpServer
/// 1. Create derived from  THttpWSHandler class and implement
///     ProcessWS() method, where all web sockets request handled.
/// 2. Register instance of derived class to running THttpServer
///
///        TUserWSHandler *handler = new TUserWSHandler("name1","title");
///        THttpServer *server = new THttpServer("http:8090");
///        server->Register("/subfolder", handler)
///
/// 3. Now server can accept web socket connection from outside.
///    For instance, from JavaScirpt one can connect to it with code:
///
///        var ws = new WebSocket("ws://hostname:8090/subfolder/name1/root.websocket")
///
/// 4. In the ProcessWS(THttpCallArg *arg) method following code should be implemented:
///
///     if (arg->IsMethod("WS_CONNECT")) {
///         return true;  // to accept incoming request
///      }
///
///      if (arg->IsMethod("WS_READY")) {
///          fWSId = arg->GetWSId(); // fWSId should be member of the user class
///          return true; // connection established
///      }
///
///     if (arg->IsMethod("WS_CLOSE")) {
///         fWSId = 0;
///         return true; // confirm close of socket
///     }
///
///     if (arg->IsMethod("WS_DATA")) {
///         // received data stored as POST data
///         std::string str((const char *)arg->GetPostData(), arg->GetPostDataLength());
///         std::cout << "got string " << str << std::endl;
///         // immediately send data back using websocket id
///         SendCharStarWS(fWSId, "our reply");
///         return true;
///     }
///
///////////////////////////////////////////////////////////////////////////

ClassImp(THttpWSHandler);

////////////////////////////////////////////////////////////////////////////////
/// normal constructor

THttpWSHandler::THttpWSHandler(const char *name, const char *title) : TNamed(name, title)
////////////////////////////////////////////////////////////////////////////////
/// destructor
/// Delete all websockets handles

   SetDisabled();
/// Returns current number of websocket connections
Int_t THttpWSHandler::GetNumWS()
{
   std::lock_guard<std::mutex> grd(fMutex);
   return fEngines.size();
}

////////////////////////////////////////////////////////////////////////////////
/// Return websocket id with given sequential number
/// Number of websockets returned with GetNumWS() method
UInt_t THttpWSHandler::GetWS(Int_t num)
   std::lock_guard<std::mutex> grd(fMutex);
   auto iter = fEngines.begin() + num;
   return (*iter)->GetId();
}
////////////////////////////////////////////////////////////////////////////////
/// Find websocket connection handle with given id
/// If book_send parameter specified, have to book send operation under the mutex
std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid, Bool_t book_send)
   if (IsDisabled())
      return nullptr;

   std::lock_guard<std::mutex> grd(fMutex);
   for (auto &eng : fEngines)
      if (eng->GetId() == wsid) {

         // not allow to work with disabled engine
         if (eng->fDisabled)
            return nullptr;

         if (book_send) {
            if (eng->fMTSend) {
               Error("FindEngine", "Try to book next send operation before previous completed");
               return nullptr;
            }
            eng->fMTSend = kTRUE;
         }
         return eng;
   return nullptr;
////////////////////////////////////////////////////////////////////////////////
/// Remove and destroy WS connection

void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine)
   {
      std::lock_guard<std::mutex> grd(fMutex);

      for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++)
         if (*iter == engine) {
            if (engine->fMTSend)
               Error("RemoveEngine", "Trying to remove WS engine during send operation");

            engine->fDisabled = true;
            fEngines.erase(iter);
            break;
         }
   }

   engine->ClearHandle();
////////////////////////////////////////////////////////////////////////////////
/// Process request to websocket
/// Different kind of requests coded into THttpCallArg::Method
///  "WS_CONNECT" - connection request
///  "WS_READY" - connection ready
///  "WS_CLOSE" - connection closed
/// All other are normal data, which are delivered to users
Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
   if (IsDisabled())
      return kFALSE;
   if (!arg->GetWSId())
      return ProcessWS(arg.get());

   // normally here one accept or reject connection requests
   if (arg->IsMethod("WS_CONNECT"))
      return ProcessWS(arg.get());
   auto engine = FindEngine(arg->GetWSId());
   if (arg->IsMethod("WS_READY")) {
         Error("HandleWS", "WS engine with similar id exists %u", arg->GetWSId());
         RemoveEngine(engine);
      engine = arg->TakeWSEngine();
      {
         std::lock_guard<std::mutex> grd(fMutex);
         fEngines.emplace_back(engine);
      }
      if (!ProcessWS(arg.get())) {
         // if connection refused, remove engine again
         RemoveEngine(engine);
   if (arg->IsMethod("WS_CLOSE")) {
      // connection is closed, one can remove handle

      if (engine) {
         engine->ClearHandle();
         RemoveEngine(engine);
      return ProcessWS(arg.get());
   Bool_t check_send  = engine ? engine->PreviewData(arg) : kFALSE;
   Bool_t res = kTRUE;

   if (!check_send) {

      res = ProcessWS(arg.get());

      check_send = engine ? engine->PostProcess(arg) : kFALSE;
   }

   if (check_send)
      PerformSend(engine);
////////////////////////////////////////////////////////////////////////////////
/// Close connection with given websocket id

void THttpWSHandler::CloseWS(UInt_t wsid)
{
   auto engine = FindEngine(wsid);
   if (engine)
      RemoveEngine(engine);
////////////////////////////////////////////////////////////////////////////////
/// Send binary data via given websocket id
/// Returns -1 - in case of error
///          0 - when operation was executed immediately
///          1 - when send operation will be performed in different thread
Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
   auto engine = FindEngine(wsid, kTRUE);
   if (!engine) return -1;
   if (!AllowMTSend() && engine->CanSendDirectly()) {
      engine->Send(buf, len);
      engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
      CompleteWSSend(engine->GetId());
   // now we indicate that there is data and any thread can access it
   {
      std::lock_guard<std::mutex> grd(engine->fDataMutex);

      if (engine->fKind != THttpWSEngine::kNone) {
         Error("SendWS", "Data kind is not empty - something screwed up");
         return -1;
      }

      engine->fData.resize(len);
      std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());

      engine->fDoingSend = false;
      engine->fKind = THttpWSEngine::kData;
   }

   return RunSendingThrd(engine);
}

////////////////////////////////////////////////////////////////////////////////
/// Send data stored in the buffer
/// Returns 0 - when operation was executed immediately
///         1 - when send operation will be performed in different thread
Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
{
   // actually lonpoll engine does not require thread to reply data in buffer
   if (!engine->RequireSendThrd()) {

      if (engine->CanSendDirectly())
         return PerformSend(engine);

      // handling will be performed in http request handler
      return 1;
   }
   std::thread thrd([this, engine] {
      PerformSend(engine);
   });

   thrd.detach(); // let continue thread execution without thread handle

   return 1;

////////////////////////////////////////////////////////////////////////////////
/// Perform send operation, stored in buffer

Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
{
   {
      std::lock_guard<std::mutex> grd(engine->fDataMutex);

      // no need to do somthing - operation was processed already by somebody else
      if (engine->fKind == THttpWSEngine::kNone)
         return 0;

      if (engine->fDoingSend)
         return 1;
      engine->fDoingSend = true;
   }

   if (IsDisabled() || engine->fDisabled)
      return 0;

   switch (engine->fKind) {
   case THttpWSEngine::kData:
      engine->Send(engine->fData.data(), engine->fData.length());
      break;
   case THttpWSEngine::kHeader:
      engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
      break;
   case THttpWSEngine::kText:
      engine->SendCharStar(engine->fData.c_str());
      break;
   default:
      break;
   }

   engine->fData.clear();
   engine->fHdr.clear();

   {
      std::lock_guard<std::mutex> grd(engine->fDataMutex);
      engine->fDoingSend = false;
      engine->fKind = THttpWSEngine::kNone;
   }

   engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
   CompleteWSSend(engine->GetId());

   return 0;
}

////////////////////////////////////////////////////////////////////////////////
/// Send binary data with text header via given websocket id
/// Returns -1 - in case of error,
///          0 - when operation was executed immediately,
///          1 - when send operation will be performed in different thread,
Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
   auto engine = FindEngine(wsid, kTRUE);
   if (!engine) return -1;
   if (!AllowMTSend() && engine->CanSendDirectly()) {
      engine->SendHeader(hdr, buf, len);
      engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
      CompleteWSSend(engine->GetId());
   // now we indicate that there is data and any thread can access it
   {
      std::lock_guard<std::mutex> grd(engine->fDataMutex);
      if (engine->fKind != THttpWSEngine::kNone) {
         Error("SendWS", "Data kind is not empty - something screwed up");
         return -1;
      }
      engine->fHdr = hdr;
      engine->fData.resize(len);
      std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
      engine->fDoingSend = false;
      engine->fKind = THttpWSEngine::kHeader;
   }

   return RunSendingThrd(engine);
////////////////////////////////////////////////////////////////////////////////
/// Send string via given websocket id
/// Returns -1 - in case of error,
///          0 - when operation was executed immediately,
///          1 - when send operation will be performed in different thread,
Int_t THttpWSHandler::SendCharStarWS(UInt_t wsid, const char *str)
   auto engine = FindEngine(wsid, kTRUE);
   if (!engine) return -1;
   if (!AllowMTSend() && engine->CanSendDirectly()) {
      engine->SendCharStar(str);
      engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
      CompleteWSSend(engine->GetId());
   // now we indicate that there is data and any thread can access it
   {
      std::lock_guard<std::mutex> grd(engine->fDataMutex);
      if (engine->fKind != THttpWSEngine::kNone) {
         Error("SendWS", "Data kind is not empty - something screwed up");
         return -1;
      }
      engine->fData = str;
      engine->fDoingSend = false;
      engine->fKind = THttpWSEngine::kText;
   }
   return RunSendingThrd(engine);