Skip to content
Snippets Groups Projects
Commit 780e16a5 authored by Enrico Guiraud's avatar Enrico Guiraud Committed by Danilo Piparo
Browse files

changed workers' polling technology to support OSX


due to the problems encountered when running the eventloop after
a fork(), workers have been modified to use a simple while loop
to listen to messages instead.

Signed-off-by: default avatardpiparo <danilo.piparo@cern.ch>
parent f0e8c91d
No related branches found
No related tags found
No related merge requests found
...@@ -7,6 +7,7 @@ if(builtin_pcre) ...@@ -7,6 +7,7 @@ if(builtin_pcre)
endif() endif()
add_subdirectory(rint) add_subdirectory(rint)
add_subdirectory(thread) add_subdirectory(thread)
add_subdirectory(multiproc)
if(NOT WIN32) if(NOT WIN32)
add_subdirectory(newdelete) add_subdirectory(newdelete)
endif() endif()
...@@ -18,7 +19,6 @@ add_subdirectory(meta) ...@@ -18,7 +19,6 @@ add_subdirectory(meta)
set(systemdict_opts) set(systemdict_opts)
if(UNIX) if(UNIX)
add_subdirectory(unix) add_subdirectory(unix)
add_subdirectory(multiproc)
set(unix_objects $<TARGET_OBJECTS:Unix>) set(unix_objects $<TARGET_OBJECTS:Unix>)
set(dict_opts -DSYSTEM_TYPE_unix ${dict_opts}) set(dict_opts -DSYSTEM_TYPE_unix ${dict_opts})
endif() endif()
......
...@@ -14,22 +14,12 @@ ...@@ -14,22 +14,12 @@
#include "TMonitor.h" #include "TMonitor.h"
#include "TMPWorker.h" #include "TMPWorker.h"
#include "TSysEvtHandler.h"
#include "MPSendRecv.h" #include "MPSendRecv.h"
#include <vector> #include <vector>
#include <unistd.h> //pid_t #include <unistd.h> //pid_t
#include <memory> //unique_ptr #include <memory> //unique_ptr
#include <iostream> #include <iostream>
class TMPInterruptHandler : public TSignalHandler {
/// \cond
ClassDef(TMPInterruptHandler, 0); //this is a TObject so it's good to have a ClassDef
/// \endcond
public:
TMPInterruptHandler();
Bool_t Notify();
};
class TMPClient { class TMPClient {
public: public:
explicit TMPClient(unsigned nWorkers = 0); explicit TMPClient(unsigned nWorkers = 0);
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include <unistd.h> //pid_t #include <unistd.h> //pid_t
#include <memory> //unique_ptr #include <memory> //unique_ptr
class TMPWorker : public TFileHandler { class TMPWorker {
/// \cond /// \cond
ClassDef(TMPWorker, 0); ClassDef(TMPWorker, 0);
/// \endcond /// \endcond
...@@ -29,13 +29,13 @@ public: ...@@ -29,13 +29,13 @@ public:
TMPWorker &operator=(const TMPWorker &) = delete; TMPWorker &operator=(const TMPWorker &) = delete;
virtual void Init(int fd); virtual void Init(int fd);
void Run();
TSocket *GetSocket() { return fS.get(); } TSocket *GetSocket() { return fS.get(); }
pid_t GetPid() { return fPid; } pid_t GetPid() { return fPid; }
private: private:
virtual void HandleInput(MPCodeBufPair &msg); virtual void HandleInput(MPCodeBufPair &msg);
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
std::unique_ptr<TSocket> fS; ///< This worker's socket. The unique_ptr makes sure resources are released. std::unique_ptr<TSocket> fS; ///< This worker's socket. The unique_ptr makes sure resources are released.
pid_t fPid; ///< the PID of the process in which this worker is running pid_t fPid; ///< the PID of the process in which this worker is running
......
...@@ -14,30 +14,6 @@ ...@@ -14,30 +14,6 @@
#include <memory> //unique_ptr #include <memory> //unique_ptr
#include <iostream> #include <iostream>
//////////////////////////////////////////////////////////////////////////
///
/// \class TMPInterruptHandler
///
/// This is an implementation of a TSignalHandler that is added to the
/// eventloop in the children processes spawned by a TMPClient. When a SIGINT
/// (i.e. kSigInterrupt) is received, TMPInterruptHandler shuts down the
/// worker and performs clean-up operations, then exits.
///
//////////////////////////////////////////////////////////////////////////
/// Class constructor.
TMPInterruptHandler::TMPInterruptHandler() : TSignalHandler(kSigInterrupt, kFALSE)
{
}
/// Executed when SIGINT is received. Clean-up and quit the application
Bool_t TMPInterruptHandler::Notify()
{
std::cerr << "server shutting down on SIGINT" << std::endl;
gSystem->Exit(0);
return true;
}
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
/// ///
/// \class TMPClient /// \class TMPClient
...@@ -145,10 +121,12 @@ bool TMPClient::Fork(TMPWorker &server) ...@@ -145,10 +121,12 @@ bool TMPClient::Fork(TMPWorker &server)
} }
} }
} }
//parent returns here
if (!pid) { if (pid) {
//CHILD/SERVER //parent returns here
return true;
} else {
//CHILD/WORKER
fIsParent = false; fIsParent = false;
//override signal handler (make the servers exit on SIGINT) //override signal handler (make the servers exit on SIGINT)
...@@ -158,8 +136,6 @@ bool TMPClient::Fork(TMPWorker &server) ...@@ -158,8 +136,6 @@ bool TMPClient::Fork(TMPWorker &server)
sh = (TSignalHandler *)signalHandlers->First(); sh = (TSignalHandler *)signalHandlers->First();
if (sh) if (sh)
gSystem->RemoveSignalHandler(sh); gSystem->RemoveSignalHandler(sh);
TMPInterruptHandler handler;
handler.Add();
//remove stdin from eventloop and close it //remove stdin from eventloop and close it
TSeqCollection *fileHandlers = gSystem->GetListOfFileHandlers(); TSeqCollection *fileHandlers = gSystem->GetListOfFileHandlers();
...@@ -188,10 +164,11 @@ bool TMPClient::Fork(TMPWorker &server) ...@@ -188,10 +164,11 @@ bool TMPClient::Fork(TMPWorker &server)
//prepare server and add it to eventloop //prepare server and add it to eventloop
server.Init(sockets[1]); server.Init(sockets[1]);
//enter main loop //enter worker loop
gSystem->Run(); server.Run();
} }
//control should never reach here
return true; return true;
} }
......
...@@ -39,7 +39,7 @@ ...@@ -39,7 +39,7 @@
/// This separation is in place because the instantiation of a worker /// This separation is in place because the instantiation of a worker
/// must be done once _before_ forking, while the initialization of the /// must be done once _before_ forking, while the initialization of the
/// members must be done _after_ forking by each of the children processes. /// members must be done _after_ forking by each of the children processes.
TMPWorker::TMPWorker() : TFileHandler(-1, kRead), fS(), fPid(0) TMPWorker::TMPWorker() : fS(), fPid(0)
{ {
} }
...@@ -56,11 +56,23 @@ void TMPWorker::Init(int fd) ...@@ -56,11 +56,23 @@ void TMPWorker::Init(int fd)
{ {
fS.reset(new TSocket(fd, "MPsock")); //TSocket's constructor with this signature seems much faster than TSocket(int fd) fS.reset(new TSocket(fd, "MPsock")); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
fPid = getpid(); fPid = getpid();
}
void TMPWorker::Run()
{
while(true) {
MPCodeBufPair msg = MPRecv(fS.get());
if (msg.first == MPCode::kRecvError) {
std::cerr << "Lost connection to client\n";
gSystem->Exit(0);
}
//TFileHandler's stuff if (msg.first < 1000)
//these operations _must_ be done in the overriding implementations too HandleInput(msg); //call overridden method
SetFd(fd); else
Add(); TMPWorker::HandleInput(msg); //call this class' method
}
} }
...@@ -94,22 +106,3 @@ void TMPWorker::HandleInput(MPCodeBufPair &msg) ...@@ -94,22 +106,3 @@ void TMPWorker::HandleInput(MPCodeBufPair &msg)
MPSend(fS.get(), MPCode::kError, reply.data()); MPSend(fS.get(), MPCode::kError, reply.data());
} }
} }
//////////////////////////////////////////////////////////////////////////
/// This method is called by TFileHandler when there's an event on the TSocket fS.
/// It checks what kind of message was received (if any) and calls the appropriate
/// handler function (TMPWorker::HandleInput or overridden version).
Bool_t TMPWorker::Notify()
{
MPCodeBufPair msg = MPRecv(fS.get());
if (msg.first == MPCode::kRecvError) {
std::cerr << "Lost connection to client\n";
gSystem->Exit(0);
}
if (msg.first < 1000)
HandleInput(msg); //call overridden method
else
TMPWorker::HandleInput(msg); //call this class' method
return kTRUE;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment