From a6e79391d92681da76729f6830f31a0a4de2ce71 Mon Sep 17 00:00:00 2001 From: Fons Rademakers <Fons.Rademakers@cern.ch> Date: Wed, 17 Dec 2008 16:01:04 +0000 Subject: [PATCH] From Gerri: - Fix a problem with filtering the SvcMsg in TProofMgrLite::GetSessionLogs - Fix a weird backward-compatibility problem in TProof::SendFile (found and needed by ALICE) - Update the release notes git-svn-id: http://root.cern.ch/svn/root/trunk@26993 27541ba8-7e3a-0410-8455-c3a389f83636 --- proof/doc/v522/index.html | 47 +++++++++--- proof/proof/inc/TProofServ.h | 6 +- proof/proof/src/TProof.cxx | 36 +++++----- proof/proof/src/TProofMgrLite.cxx | 6 +- proof/proof/src/TProofServ.cxx | 83 ++++++++++++++-------- proof/proofd/inc/XrdProofdProofServMgr.h | 3 +- proof/proofd/src/XrdProofdProofServMgr.cxx | 27 ++++--- proof/proofx/inc/TXProofServ.h | 1 - proof/proofx/inc/TXSocket.h | 16 +---- proof/proofx/src/TXProofServ.cxx | 12 ---- proof/proofx/src/TXSocket.cxx | 36 ++-------- 11 files changed, 145 insertions(+), 128 deletions(-) diff --git a/proof/doc/v522/index.html b/proof/doc/v522/index.html index bcacc089de1..75d762abd34 100644 --- a/proof/doc/v522/index.html +++ b/proof/doc/v522/index.html @@ -19,7 +19,8 @@ realization of PROOF intended for multi-core machines; the client starts directly the workers; no daemon is required. To start a session just use TProof::Open("") or TProof::Open("lite"). From there on -everything should be as in normal PROOF. To start a standard PROOF +everything should be as in normal PROOF, though some functionality may +not have been ported yet. To start a standard PROOF session (i.e. via daemons) on the localhost use TProof::Open("localhost").</li></ul><li>XrdProofd plug-in</li> @@ -31,7 +32,7 @@ xrootd config file (new directive xpd.worker, see Wiki reference pages)</li> <li>Support for automatic reconnections in the case xrootd is restarted</li> - <li>Dedicated admin area (under <span style="font-style: italic;">xrd.admin</span>/.xproof.<span style="font-style: italic;">port</span>) to + <li>Dedicated admin area (under <span style="font-style: italic;"><xrd.admin></span>/.xproofd.<span style="font-style: italic;"><port></span>) to keep information about active and terminated sessions, and active clients. This is used to reguraly check the client and session activity, to cleanup orphalin sessions and to shutdown inactive client @@ -66,12 +67,32 @@ connections. </li> It's done by a new method TPacketizerAdaptive::AddProcessed(TSlave *sl, TProofProgressStatus *st, TList **) and TPacketizerAdaptive::ReassignPacket. </li> - </ul><li>Add -possibility to display the memory footprint on workers and master as a + </ul><li>Memory control</li><ul><li>Add +the possibility to display the memory footprint on workers and master as a function of the entry processed (workers) or of the merging step (master). A new button has been added to the PROOF dialog box to retrieve and display the memory usage. On the workers about 100 -measurements are recorded by default; this number can be changed with'proof->SetParameter("PROOF_MemLogFreq", memlogfreq)'; </li> +measurements are recorded by default; this number can be changed with 'proof->SetParameter("PROOF_MemLogFreq", memlogfreq)';</li></ul><ul><li>Add +the possibility to set upper limits on the virtual memory used by +processes; the session gets firts a warning when it reaches 80% of +the limit, and then processing is stopped whenit exceeds 95% of the +limit, sending back the results. Also, the memory footprint is notified +when the session is terminated. The limit in MBs is set by the +environment variable "ROOTPROOFASSOFT". An hard limit can be set via the +env "ROOTPROOFASHARD" (also in MBs): the process is automatically +killed by the system if it reaches this limit. Envs variables for the +PROOF processes can be set using the directive 'xpd.putenv' in the +xrootd config file.</li></ul><li>Input data</li><ul><li>Introduce the +concept of 'input data': these are objects that are distributed in +optimal way to the workers, which are available via the input list, but +which are not saved in the TQueryResult object. These are meant for big +objects whic can create a big overload when distributed via the +standard input list (which should mostly be used for job control +parameters). To add an input-data object just use +TProof::AddInputData(TObject *); if the input-data objects are in a +file you can use TProof::SetInputDataFile(const char *file); the final +set of input-data objects is assembled from the objects added via +AddInputData and those found in the file defined bySetInputDataFile. </li></ul> </ul> @@ -88,7 +109,8 @@ complete set of tests in test/stressProof . To run with PROOF-Lite pass the argument 'lite' as master URL, e.g. './stressProof lite'.</li><li>Possibility to control on the client via rc variable the location of the sandbox, package directory, cache and dataset directory (the latters two only -for PROOF-Lite); the variable names are 'Proof.Sandbox', 'Proof.PackageDir', 'Proof.CacheDir' and 'Proof.DataSetDir'. The <span style="font-weight: bold;">default location of the sandbox has been changed from "~/proof" to "~/.proof"</span> to avoid interferences with possible users' working areas.</li><li>XrdProofd plug-in</li> +for PROOF-Lite); the variable names are 'Proof.Sandbox', +'Proof.PackageDir', 'Proof.CacheDir' and 'Proof.DataSetDir'. The <span style="font-weight: bold;">default location of the sandbox has been changed from "~/proof" to "~/.proof"</span> to avoid interferences with possible users' working areas.</li><li>XrdProofd plug-in</li> <ul> @@ -143,9 +165,7 @@ output to an area specific to the logged user.<br></li> TPacketizerAdaptive. It is also send in kPROOF_GETPACKET and kPROOF_STOPPROCESS messages. </li> <li>The class TPacketizerProgressive is removed. </li> - <li>Changing the protocol version to 19: TProofProgressStatus used in - kPROOF_STOPPROCESS and kPROOF_GETNEXTPACKET messages in Master - worker communication - </li> + </ul> <ul> @@ -156,7 +176,12 @@ output to an area specific to the logged user.<br></li> <ul> - <li>Invalidate the TProofMgr when the physical connection is + <li>Enable +the max number of sessions ('mxsess' parameter in the xpd.schedparam +directive); users are just refused to start a session if this limit is +reached.</li><li>Make sure to collect consistently input messages when running in asynchronous mode</li><li>Fix +a few problems with TProof::SendFile (used by UploadPackage, Load) +appearing when a rapid sequence of these commands was submitted </li><li>Invalidate the TProofMgr when the physical connection is closed; avoids crashing when trying to get the logs after a failure. </li> @@ -183,4 +208,4 @@ happen only after a re-build.</li><li>Make sure that in case multiple TProofOutp </ul> -</body></html> \ No newline at end of file +</body></html> diff --git a/proof/proof/inc/TProofServ.h b/proof/proof/inc/TProofServ.h index 0fba45b4b53..f2d4e0b97a8 100644 --- a/proof/proof/inc/TProofServ.h +++ b/proof/proof/inc/TProofServ.h @@ -121,9 +121,11 @@ private: TQueryResultManager *fQMgr; //Query-result manager - TList *fWaitingQueries; //list of TProofQueryResult wating to be processed + TList *fWaitingQueries; //list of TProofQueryResult waiting to be processed Bool_t fIdle; //TRUE if idle + TList *fQueuedMsg; //list of messages waiting to be processed + TString fPrefix; //Prefix identifying the node Bool_t fRealTimeLog; //TRUE if log messages should be send back in real-time @@ -181,8 +183,6 @@ protected: virtual void MakePlayer(); virtual void DeletePlayer(); - virtual void SetInputSocket(Bool_t on = kTRUE); - virtual Int_t Fork(); public: diff --git a/proof/proof/src/TProof.cxx b/proof/proof/src/TProof.cxx index 77b68deca08..a02bd5ec42f 100644 --- a/proof/proof/src/TProof.cxx +++ b/proof/proof/src/TProof.cxx @@ -716,9 +716,10 @@ Int_t TProof::Init(const char *, const char *conffile, } } - UserGroup_t *ug = gSystem->GetUserInfo(); - fPackageLock = new TProofLockPath(Form("%s%s", kPROOF_PackageLockFile, ug->fUser.Data())); - delete ug; + TString lockpath(fPackageDir); + lockpath.ReplaceAll("/", "%"); + lockpath.Insert(0, Form("%s/%s", gSystem->TempDirectory(), kPROOF_PackageLockFile)); + fPackageLock = new TProofLockPath(lockpath.Data()); fEnabledPackagesOnClient = new TList; fEnabledPackagesOnClient->SetOwner(); @@ -2357,7 +2358,7 @@ Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype) if (rc == 1 || (rc == 2 && !savedMonitor)) { // Deactivate it if we are done with it mon->DeActivate(s); - if (gDebug > 2) + PDB(kGlobal, 2) Info("Collect","deactivating %p (active: %d, %p)", s, mon->GetActive(), mon->GetListOfActives()->First()); @@ -2366,8 +2367,8 @@ Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype) // Deactivate it if we are done with it if (savedMonitor) { savedMonitor->DeActivate(s); - if (gDebug > 2) - Info("Collect","deactivating %p (active: %d, %p)", + PDB(kGlobal, 2) + Info("Collect","save monitor: deactivating %p (active: %d, %p)", s, savedMonitor->GetActive(), savedMonitor->GetListOfActives()->First()); } @@ -4754,19 +4755,16 @@ Int_t TProof::SendFile(const char *file, Int_t opt, const char *rfile, TSlave *w // Don't send the kPROOF_SENDFILE command to real slaves when sendto // is false. Masters might still need to send the file to newly added // slaves. + PDB(kPackage,2) { + const char *snd = (sl->fSlaveType == TSlave::kSlave && sendto) ? "" : "not"; + Info("SendFile", "%s sending file %s to: %s:%s (%d)", snd, + file, sl->GetName(), sl->GetOrdinal(), sendto); + } if (sl->fSlaveType == TSlave::kSlave && !sendto) continue; // The value of 'size' is used as flag remotely, so we need to // reset it to 0 if we are not going to send the file Long64_t siz = sendto ? size : 0; - - PDB(kPackage,2) - if (siz > 0) { - if (!nsl) - Info("SendFile", "sending file %s to:", file); - printf(" slave = %s:%s\n", sl->GetName(), sl->GetOrdinal()); - } - sprintf(buf, "%s %d %lld %d", fnam.Data(), bin, siz, fw); if (sl->GetSocket()->Send(buf, kPROOF_SENDFILE) == -1) { MarkBad(sl, "could not send kPROOF_SENDFILE request"); @@ -4802,13 +4800,12 @@ Int_t TProof::SendFile(const char *file, Int_t opt, const char *rfile, TSlave *w nsl++; } + // Wait for the operation to be done + Collect(sl, fCollectTimeout, kPROOF_SENDFILE); } close(fd); - // Wait for the operation to be done - Collect(&wsent, fCollectTimeout, kPROOF_SENDFILE); - // Cleanup temporary list, if any if (slaves != fActiveSlaves && slaves != fUniqueSlaves) SafeDelete(slaves); @@ -6124,7 +6121,6 @@ Int_t TProof::Load(const char *macro, Bool_t notOnClient, Bool_t uniqueWorkers) while ((wrk = (TSlave *)nxw())) { if (!fUniqueSlaves->FindObject(wrk)) { others.Add(wrk); - Info("Load", "adding: %s:", wrk->GetOrdinal()); } } @@ -6139,7 +6135,7 @@ Int_t TProof::Load(const char *macro, Bool_t notOnClient, Bool_t uniqueWorkers) Collect(&others); } - Printf("Adding loaded macro: %s", macro); + PDB(kGlobal, 1) Info("Load", "adding loaded macro: %s", macro); if (!fLoadedMacros) { fLoadedMacros = new TList(); fLoadedMacros->SetOwner(); @@ -6755,7 +6751,7 @@ void TProof::AddInputData(TObject *obj, Bool_t push) // Add data objects that might be needed during the processing of // the selector (see Process()). This object can be very large, so they // are distributed in an optimized way using a dedicated file. - // If push is TRUE the input data are snet over even if no apparent change + // If push is TRUE the input data are sent over even if no apparent change // occured to the list. if (obj) { diff --git a/proof/proof/src/TProofMgrLite.cxx b/proof/proof/src/TProofMgrLite.cxx index 652cdd257fb..dd6e7d108ee 100644 --- a/proof/proof/src/TProofMgrLite.cxx +++ b/proof/proof/src/TProofMgrLite.cxx @@ -331,8 +331,10 @@ TObjString *TProofMgrLite::ReadBuffer(const char *fin, const char *pattern) pat.ReplaceAll("-v ", ""); excl = kTRUE; } - pat.Strip(TString::kLeading, ' '); - pat.Strip(TString::kTrailing, ' '); + pat = pat.Strip(TString::kLeading, ' '); + pat = pat.Strip(TString::kTrailing, ' '); + pat = pat.Strip(TString::kLeading, '\"'); + pat = pat.Strip(TString::kTrailing, '\"'); // Use a regular expression TRegexp re(pat); diff --git a/proof/proof/src/TProofServ.cxx b/proof/proof/src/TProofServ.cxx index 45b58609585..c82b0bfebf7 100644 --- a/proof/proof/src/TProofServ.cxx +++ b/proof/proof/src/TProofServ.cxx @@ -538,6 +538,8 @@ TProofServ::TProofServ(Int_t *argc, char **argv, FILE *flog) fWaitingQueries = new TList; fIdle = kTRUE; + fQueuedMsg = new TList; + fRealTimeLog = kFALSE; fShutdownTimer = 0; @@ -1068,18 +1070,45 @@ void TProofServ::HandleSocketInput() if (fProof) fProof->SetActive(); - // Process the message - Int_t rc = HandleSocketInput(mess, all); - if (rc < 0) { - TString emsg; - if (rc == -1) { - emsg.Form("HandleSocketInput: command %d cannot be executed while processing", what); - } else if (rc == -3) { - emsg.Form("HandleSocketInput: message undefined ! Protocol error?", what); - } else { - emsg.Form("HandleSocketInput: unknown command %d ! Protocol error?", what); + Bool_t doit = kTRUE; + + Int_t rc = 0; + while (doit) { + + // Process the message + rc = HandleSocketInput(mess, all); + if (rc < 0) { + TString emsg; + if (rc == -1) { + emsg.Form("HandleSocketInput: command %d cannot be executed while processing", what); + } else if (rc == -3) { + emsg.Form("HandleSocketInput: message undefined ! Protocol error?", what); + } else { + emsg.Form("HandleSocketInput: unknown command %d ! Protocol error?", what); + } + SendAsynMessage(emsg.Data()); + } else if (rc == 2) { + // Add to the queue + fQueuedMsg->Add(mess); + PDB(kGlobal, 1) + Info("HandleSocketInput", "message of type %d enqueued; sz: %d", + mess->What(), fQueuedMsg->GetSize()); + mess = 0; + } + + // Still somethign to do? + doit = 0; + if (fgRecursive == 1 && fQueuedMsg->GetSize() > 0) { + // Add to the queue + PDB(kGlobal, 1) + Info("HandleSocketInput", "processing enqueued message of type %d; left: %d", + mess->What(), fQueuedMsg->GetSize()); + all = 1; + SafeDelete(mess); + mess = (TMessage *) fQueuedMsg->First(); + fQueuedMsg->Remove(mess); + doit = 1; } - SendAsynMessage(emsg.Data()); } fgRecursive--; @@ -1096,7 +1125,8 @@ void TProofServ::HandleSocketInput() fProof->SetRunStatus(TProof::kRunning); } - delete mess; + // Cleanup + SafeDelete(mess); } //______________________________________________________________________________ @@ -1105,6 +1135,7 @@ Int_t TProofServ::HandleSocketInput(TMessage *mess, Bool_t all) // Process input coming from the client or from the master server. // Returns -1 if the message could not be processed, <-1 if something went // wrong. Returns 1 if the action may have changed the parallel state. + // Returns 2 if the message has to be enqueued. // Returns 0 otherwise static TStopwatch timer; @@ -1356,14 +1387,20 @@ Int_t TProofServ::HandleSocketInput(TMessage *mess, Bool_t all) break; case kPROOF_CHECKFILE: - { + if (!all && fProtocol <= 19) { + // Come back later + rc = 2; + } else { // Handle file checking request HandleCheckFile(mess); } break; case kPROOF_SENDFILE: - { + if (!all && fProtocol <= 19) { + // Come back later + rc = 2; + } else { mess->ReadString(str, sizeof(str)); Long_t size; Int_t bin, fw = 1; @@ -1389,9 +1426,7 @@ Int_t TProofServ::HandleSocketInput(TMessage *mess, Bool_t all) opt |= TProof::kBinary; // Old clients do not wait for the termination of SendFile, so we need // to disable new inputs while doing this - if (fProtocol <= 19) SetInputSocket(kFALSE); fProof->SendFile(fnam, opt, (copytocache ? "cache" : "")); - if (fProtocol <= 19) SetInputSocket(kTRUE); } if (fProtocol > 19) fSocket->Send(kPROOF_SENDFILE); } @@ -1429,7 +1464,10 @@ Int_t TProofServ::HandleSocketInput(TMessage *mess, Bool_t all) break; case kPROOF_CACHE: - { + if (!all && fProtocol <= 19) { + // Come back later + rc = 2; + } else { TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog); PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_CACHE","enter"); Int_t status = HandleCache(mess); @@ -5095,17 +5133,6 @@ void TProofServ::HandleFork(TMessage *) Info("HandleFork", "fork cloning not implemented"); } -//______________________________________________________________________________ -void TProofServ::SetInputSocket(Bool_t on) -{ - // Switch on / off input from the parent - - if (on) - gSystem->AddFileHandler(fInputHandler); - else - gSystem->RemoveFileHandler(fInputHandler); -} - //______________________________________________________________________________ Int_t TProofServ::Fork() { diff --git a/proof/proofd/inc/XrdProofdProofServMgr.h b/proof/proofd/inc/XrdProofdProofServMgr.h index e6c2c5694e0..ce1431cd607 100644 --- a/proof/proofd/inc/XrdProofdProofServMgr.h +++ b/proof/proofd/inc/XrdProofdProofServMgr.h @@ -177,7 +177,8 @@ public: int Recover(XpdClientSessions *cl); void UpdateCounter(int t, int n) { if (PSMCNTOK(t)) { - XrdSysMutexHelper mhp(fMutex); fCounters[t] += n;} } + XrdSysMutexHelper mhp(fMutex); fCounters[t] += n; + if (fCounters[t] < 0) fCounters[t] = 0;} } int CheckCounter(int t) { int cnt = -1; if (PSMCNTOK(t)) { XrdSysMutexHelper mhp(fMutex); cnt = fCounters[t];} return cnt; } diff --git a/proof/proofd/src/XrdProofdProofServMgr.cxx b/proof/proofd/src/XrdProofdProofServMgr.cxx index 47115e56696..ae9bfd8192e 100644 --- a/proof/proofd/src/XrdProofdProofServMgr.cxx +++ b/proof/proofd/src/XrdProofdProofServMgr.cxx @@ -101,8 +101,10 @@ void *XrdProofdProofServCron(void *p) int clnlostscale = 0; // Time of last full sessions check - int lastcheck = time(0), ckfreq = mgr->CheckFrequency(), waitt = 0; + int lastrun = time(0); + int lastcheck = lastrun, ckfreq = mgr->CheckFrequency(), waitt = 0; int deltat = ((int)(0.1*ckfreq) >= 1) ? (int)(0.1*ckfreq) : 1; + int maxdelay = 5*ckfreq; // Force check after 5 times the check frequency mgr->SetNextSessionsCheck(lastcheck + ckfreq); TRACE(ALL, "next full sessions check in "<<ckfreq<<" secs"); while(1) { @@ -186,13 +188,21 @@ void *XrdProofdProofServCron(void *p) int now = time(0); // If there is any activity in mgr->Process() we postpone the checks in 5 secs - if (mgr->CheckCounter(XrdProofdProofServMgr::kProcessCnt) > 0) { - // The current time - lastcheck = now + 5 - ckfreq; - mgr->SetNextSessionsCheck(now + 5); - // Notify - TRACE(ALL, "postponing sessions check (will retry in 5 secs)"); - continue; + int cnt = mgr->CheckCounter(XrdProofdProofServMgr::kProcessCnt); + if (cnt > 0) { + if ((now - lastrun) < maxdelay) { + // The current time + lastcheck = now + 5 - ckfreq; + mgr->SetNextSessionsCheck(now + 5); + // Notify + TRACE(ALL, "postponing sessions check (will retry in 5 secs)"); + continue; + } else { + // Max time without checks reached: force a check + TRACE(ALL, "Max time without checks reached ("<<maxdelay<<"): force a session check"); + // Reset the counter + mgr->UpdateCounter(XrdProofdProofServMgr::kProcessCnt, -cnt); + } } bool full = (now > mgr->NextSessionsCheck() - deltat) ? 1 : 0; @@ -210,6 +220,7 @@ void *XrdProofdProofServCron(void *p) int cursess = mgr->CurrentSessions(1); TRACE(ALL, cursess << " sessions are currently active"); // Remember when ... + lastrun = now; lastcheck = now; mgr->SetNextSessionsCheck(lastcheck + mgr->CheckFrequency()); // Notify diff --git a/proof/proofx/inc/TXProofServ.h b/proof/proofx/inc/TXProofServ.h index 2ea614ee179..2034180c6a2 100644 --- a/proof/proofx/inc/TXProofServ.h +++ b/proof/proofx/inc/TXProofServ.h @@ -64,7 +64,6 @@ public: void HandleTermination(); void ReleaseWorker(const char *ord); - void SetInputSocket(Bool_t on = kTRUE); void Terminate(Int_t status); ClassDef(TXProofServ,0) //XRD PROOF Server Application Interface diff --git a/proof/proofx/inc/TXSocket.h b/proof/proofx/inc/TXSocket.h index c88f6ccdee7..2c0eef2a1ba 100644 --- a/proof/proofx/inc/TXSocket.h +++ b/proof/proofx/inc/TXSocket.h @@ -103,8 +103,6 @@ private: Int_t fByteLeft; // bytes left in the first buffer Int_t fByteCur; // current position in the first buffer TXSockBuf *fBufCur; // current read buffer - Bool_t fEnabled; // kTRUE if input from this socket is enabled - Int_t fAQueued; // Number of messages received while disabled // Interrupts TMutex *fIMtx; // To protect interrupt queue @@ -243,13 +241,6 @@ public: void DisableTimeout() { fDontTimeout = kTRUE; } void EnableTimeout() { fDontTimeout = kFALSE; } - // Disable / Enable / Test input from this socket - inline Bool_t IsEnabled() const { R__LOCKGUARD(fAMtx); return fEnabled; } - inline void Enqueue() { R__LOCKGUARD(fAMtx); fAQueued++; } - inline Int_t Enqueued() const { R__LOCKGUARD(fAMtx); return fAQueued; } - inline void Disable() { R__LOCKGUARD(fAMtx); fEnabled = kFALSE; } - void Enable(); - // Try reconnection after error virtual Int_t Reconnect(); @@ -297,9 +288,9 @@ public: TXSocket *GetLastReady(); Int_t GetRead() const { return fPipe[0]; } - Int_t Post(TSocket *s=0); // Notify socket ready via global pipe - Int_t Clean(TSocket *s=0); // Clean previous pipe notification - Int_t Flush(TSocket *s=0); // Remove any instance of 's' from the pipe + Int_t Post(TSocket *s); // Notify socket ready via global pipe + Int_t Clean(TSocket *s); // Clean previous pipe notification + Int_t Flush(TSocket *s); // Remove any instance of 's' from the pipe void DumpReadySock(); void SetLoc(const char *loc = "") { fLoc = loc; } @@ -309,7 +300,6 @@ private: Int_t fPipe[2]; // Pipe for input monitoring TString fLoc; // Location string TList fReadySock; // List of sockets ready to be read - TMutex fReadyMtx; // Protect access to the sockets-ready list }; #endif diff --git a/proof/proofx/src/TXProofServ.cxx b/proof/proofx/src/TXProofServ.cxx index 242b4f90a67..90b7de20082 100644 --- a/proof/proofx/src/TXProofServ.cxx +++ b/proof/proofx/src/TXProofServ.cxx @@ -1012,15 +1012,3 @@ void TXProofServ::ReleaseWorker(const char *ord) ((TXSocket *)fSocket)->SendCoordinator(TXSocket::kReleaseWorker, ord); } - -//______________________________________________________________________________ -void TXProofServ::SetInputSocket(Bool_t on) -{ - // Switch on / off input from the parent - - if (!fSocket) return; - if (on) - ((TXSocket *)fSocket)->Enable(); - else - ((TXSocket *)fSocket)->Disable(); -} diff --git a/proof/proofx/src/TXSocket.cxx b/proof/proofx/src/TXSocket.cxx index 91ddc6fd095..03f2579e140 100644 --- a/proof/proofx/src/TXSocket.cxx +++ b/proof/proofx/src/TXSocket.cxx @@ -114,7 +114,7 @@ Long64_t TXSockBuf::fgMemMax = 10485760; // Max allowed allocated memory [10 TXSocket::TXSocket(const char *url, Char_t m, Int_t psid, Char_t capver, const char *logbuf, Int_t loglevel, TXHandler *handler) : TSocket(), fMode(m), fLogLevel(loglevel), - fBuffer(logbuf), fASem(0), fEnabled(kTRUE), fAQueued(0), + fBuffer(logbuf), fASem(0), fDontTimeout(kFALSE), fRDInterrupt(kFALSE), fXrdProofdVersion(-1) { // Constructor @@ -927,8 +927,6 @@ Int_t TXSocket::Flush() fASem.TryWait(); fAQue.clear(); } - // Reset the queued counter - fAQueued = 0; } // Move spares to the spare queue @@ -1963,19 +1961,6 @@ Int_t TXSocket::Reconnect() return ((fConn && fConn->IsValid()) ? 0 : -1); } -//______________________________________________________________________________ -void TXSocket::Enable() -{ - // Renable the socket reposting all messages received in the meantime - R__LOCKGUARD(fAMtx); - - while (fAQueued > 0) { - fgPipe.Post(); - fAQueued++; - } - fEnabled= kTRUE; -} - //_____________________________________________________________________________ TXSockBuf::TXSockBuf(Char_t *bp, Int_t sz, Bool_t own) { @@ -2075,7 +2060,7 @@ Int_t TXSockPipe::Post(TSocket *s) // Write a byte to the global pipe to signal new availibility of // new messages - if (!IsValid()) return -1; + if (!IsValid() || !s) return -1; // This must be an atomic action Int_t sz = 0; @@ -2105,7 +2090,7 @@ Int_t TXSockPipe::Clean(TSocket *s) // Read a byte to the global pipe to synchronize message pickup // Pipe must have been created - if (!IsValid()) return -1; + if (!IsValid() || !s) return -1; // Only one char Int_t sz = 0; @@ -2138,8 +2123,6 @@ Int_t TXSockPipe::Flush(TSocket *s) // Pipe must have been created if (!IsValid() || !s) return -1; - // Get number of enqueued - Int_t sque = ((TXSocket *)s)->Enqueued(); TObject *o = 0; // This must be an atomic action { R__LOCKGUARD(&fMutex); @@ -2149,15 +2132,10 @@ Int_t TXSockPipe::Flush(TSocket *s) // Remove from the list fReadySock.Remove(s); o = fReadySock.FindObject(s); - if (sque > 0) { - // There were no pipe posting fro enqueued messages - sque--; - } else { - // Remove one notification from the pipe - Char_t c = 0; - if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1) - Printf("TXSockPipe::Flush: %s: can't read from pipe", fLoc.Data()); - } + // Remove one notification from the pipe + Char_t c = 0; + if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1) + Printf("TXSockPipe::Flush: %s: can't read from pipe", fLoc.Data()); } } // Flush also the socket -- GitLab