diff --git a/proofd/inc/XrdProofdProtocol.h b/proofd/inc/XrdProofdProtocol.h index a0aea728117064e86127f81eafe4dc455b8f9d93..93df14973fb54d9a72c16613d37f088a12c901ff 100644 --- a/proofd/inc/XrdProofdProtocol.h +++ b/proofd/inc/XrdProofdProtocol.h @@ -1,4 +1,4 @@ -// @(#)root/proofd:$Name: $:$Id: XrdProofdProtocol.h,v 1.8 2006/06/21 16:18:26 rdm Exp $ +// @(#)root/proofd:$Name: $:$Id: XrdProofdProtocol.h,v 1.9 2006/07/01 16:01:08 rdm Exp $ // Author: G. Ganis June 2005 /************************************************************************* @@ -61,6 +61,7 @@ class XrdProofClient; class XrdProofdPriority; class XrdProofWorker; class XrdScheduler; +class XrdSrvBuffer; class XrdProofdProtocol : XrdProtocol { @@ -89,6 +90,7 @@ public: int Destroy(); int Detach(); void EraseServer(int psid); + int GetBuff(int quantum); int GetData(const char *dtype, char *buff, int blen); int GetFreeServID(); XrdProofServProxy *GetServer(int psid); @@ -98,6 +100,7 @@ public: int Ping(); int Process2(); void Reset(); + int SendData(XrdProofdResponse *resp, kXR_int32 sid = -1, XrdSrvBuffer **buf = 0); int SendMsg(); int SetUserEnvironment(const char *usr, const char *dir = 0); int Urgent(); @@ -140,6 +143,7 @@ public: int fBlast; // int fhcPrev; + int fhcMax; int fhcNext; int fhcNow; int fhalfBSize; diff --git a/proofd/src/XrdProofdProtocol.cxx b/proofd/src/XrdProofdProtocol.cxx index 6f6ba3be39caea98e8fcadcdb2151a5945327cc8..42b938340637080eda875c2234fa05c907f81b4f 100644 --- a/proofd/src/XrdProofdProtocol.cxx +++ b/proofd/src/XrdProofdProtocol.cxx @@ -1,4 +1,4 @@ -// @(#)root/proofd:$Name: $:$Id: XrdProofdProtocol.cxx,v 1.14 2006/06/05 22:51:14 rdm Exp $ +// @(#)root/proofd:$Name: $:$Id: XrdProofdProtocol.cxx,v 1.15 2006/06/21 16:18:26 rdm Exp $ // Author: Gerardo Ganis 12/12/2005 /************************************************************************* @@ -546,6 +546,38 @@ static int MkDir(const char *path, struct passwd *pw) return 0; } +//_____________________________________________________________________________ +static int SymLink(const char *path, const char *link, struct passwd *pw) +{ + // Create a symlink 'link' to 'path' using the credentials of the entity + // described by 'pw' + // Return 0 in case of success, -1 in case of error + + if (!path || strlen(path) <= 0 || !link || strlen(link) <= 0 || !pw) + return -1; + + // Acquire privileges, if needed + int requid = (geteuid() != pw->pw_uid) ? 0 : pw->pw_uid ; + XrdSysPrivGuard pGuard(requid); + if (!pGuard.Valid()) + return -1; + + // Remove existing link, if any + if (unlink(link) != 0 && errno != ENOENT) { + PRINT("SymLink: problems unlinking existing symlink "<< link<< + " (errno: "<<errno<<")"); + return -1; + } + if (symlink(path, link) != 0) { + PRINT("SymLink: problems creating symlink " << link<< + " (errno: "<<errno<<")"); + return -1; + } + + // We are done + return 0; +} + //_____________________________________________________________________________ XrdSecService *XrdProofdProtocol::LoadSecurity(char *seclib, char *cfn) { @@ -790,7 +822,7 @@ XrdProtocol *XrdProofdProtocol::Match(XrdLink *lp) strcpy(xp->fEntity.prot, "host"); xp->fEntity.host = strdup((char *)lp->Host()); - // Dummy data use dby 'proofd' + // Dummy data used by 'proofd' kXR_int32 dum[2]; if (xp->GetData("dummy",(char *)&dum[0],sizeof(dum)) != 0) { xp->Recycle(0,0,0); @@ -825,9 +857,12 @@ void XrdProofdProtocol::Reset() fStatus = 0; fArgp = 0; fLink = 0; - fhcPrev = 13; + + // Magic numbers cut & pasted from Xrootd + fhcMax = 28657; fhcNext = 21; fhcNow = 13; + fhcPrev = 13; // Default mode is query fPClient = 0; @@ -1360,23 +1395,18 @@ int XrdProofdProtocol::Process(XrdLink *) return fLink->setEtext("Process: protocol data length error"); } - // Read any argument data at this point, except when the request is a write. - // The argument may have to be segmented and we're not prepared to do that here. - if (fRequest.header.requestid != kXR_write && fRequest.header.dlen) { - if (!fArgp || fRequest.header.dlen+1 > fArgp->bsize) { - if (fArgp) - fgBPool->Release(fArgp); - if (!(fArgp = fgBPool->Obtain(fRequest.header.dlen+1))) { - fResponse.Send(kXR_ArgTooLong, "fRequest.argument is too long"); - return 0; - } - fhcNow = fhcPrev; fhalfBSize = fArgp->bsize >> 1; + // Read any argument data at this point, except when the request is to forward + // a buffer: the argument may have to be segmented and we're not prepared to do + // that here. + if (fRequest.header.requestid != kXP_sendmsg && fRequest.header.dlen) { + if (GetBuff(fRequest.header.dlen+1) != 1) { + fResponse.Send(kXR_ArgTooLong, "fRequest.argument is too long"); + return 0; } if ((rc = GetData("arg", fArgp->buff, fRequest.header.dlen))) return rc; fArgp->buff[fRequest.header.dlen] = '\0'; } - TRACEP(REQ,"Process: fArgp->buff = "<< (fArgp ? fArgp->buff : "")); // Continue with request processing at the resume point return Process2(); @@ -2332,6 +2362,37 @@ int XrdProofdProtocol::Auth() return -EACCES; } +//______________________________________________________________________________ +int XrdProofdProtocol::GetBuff(int quantum) +{ + // Allocate a buffer to handle quantum bytes + + // The current buffer may be sufficient for the current needs + if (!fArgp || quantum > fArgp->bsize) + fhcNow = fhcPrev; + else if (quantum >= fhalfBSize || fhcNow-- > 0) + return 1; + else if (fhcNext >= fhcMax) + fhcNow = fhcMax; + else { + int tmp = fhcPrev; + fhcNow = fhcNext; + fhcPrev = fhcNext; + fhcNext = tmp + fhcNext; + } + + // We need a new buffer + if (fArgp) + fgBPool->Release(fArgp); + if ((fArgp = fgBPool->Obtain(quantum))) + fhalfBSize = fArgp->bsize >> 1; + else + return fResponse.Send(kXR_NoMemory, "insufficient memory for requested buffer"); + + // Success + return 1; +} + //______________________________________________________________________________ int XrdProofdProtocol::GetData(const char *dtype, char *buff, int blen) { @@ -2824,6 +2885,17 @@ int XrdProofdProtocol::SetProofServEnv(XrdProofdProtocol *p, putenv(ev); TRACE(REQ,"SetProofServEnv: "<<ev); + // Create or Update symlink to last session + XrdOucString syml = udir; + if (p->fSrvType == kXPD_WorkerServer) + syml += "/last-worker-session"; + else + syml += "/last-master-session"; + if (SymLink(logdir.c_str(), syml.c_str(), pw) != 0) { + TRACE(REQ,"SetProofServEnv: problems creating symlink to " + " last session (errno: "<<errno<<")"); + } + // We are done return 0; } @@ -3090,6 +3162,51 @@ void XrdProofdProtocol::SetIgnoreZombieChild() #endif } +//______________________________________________________________________________ +int XrdProofdProtocol::SendData(XrdProofdResponse *resp, + kXR_int32 sid, XrdSrvBuffer **buf) +{ + // Send data over the open link. Segmentation is done here, if required. + + int rc = 1; + + // Buffer length + int len = fRequest.header.dlen; + + // Quantum size + int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len); + + // Make sure we have a large enough buffer + if (!fArgp || quantum < fhalfBSize || quantum > fArgp->bsize) { + if ((rc = GetBuff(quantum)) <= 0) + return rc; + } else if (fhcNow < fhcNext) + fhcNow++; + + // Now send over all of the data as unsolicited messages + while (len > 0) { + if ((rc = GetData("data", fArgp->buff, quantum))) + return rc; + if (buf && !(*buf)) + *buf = new XrdSrvBuffer(fArgp->buff, quantum, 1); + // Send + if (sid > -1) { + if (resp->Send(kXR_attn, kXPD_msgsid, sid, fArgp->buff, quantum)) + return 1; + } else { + if (resp->Send(kXR_attn, kXPD_msg, fArgp->buff, quantum)) + return 1; + } + // Next segment + len -= quantum; + if (len < quantum) + quantum = len; + } + + // Done + return 0; +} + //_____________________________________________________________________________ int XrdProofdProtocol::SendMsg() { @@ -3122,7 +3239,6 @@ int XrdProofdProtocol::SendMsg() bool external = !(opt & kXPD_internal); // Forward message as unsolicited - char *msg = fArgp->buff; int len = fRequest.header.dlen; if (external) { @@ -3134,7 +3250,7 @@ int XrdProofdProtocol::SendMsg() return rc; } TRACEP(REQ, "SendMsg: EXTERNAL: fCID: " << fCID); - if (xps->fProofSrv.Send(kXR_attn, kXPD_msgsid, fCID, msg, len)) { + if (SendData(&(xps->fProofSrv), fCID)) { fResponse.Send(kXR_ServerError,"external: sending message to proofserv"); return rc; } @@ -3145,6 +3261,7 @@ int XrdProofdProtocol::SendMsg() } else { TRACEP(REQ, "SendMsg: INTERNAL: psid: "<<psid); + XrdSrvBuffer **savedBuf = 0; // Additional info about the message if (opt & kXPD_setidle) { TRACEP(REQ, "SendMsg: INTERNAL: setting proofserv in 'idle' state"); @@ -3158,16 +3275,14 @@ int XrdProofdProtocol::SendMsg() } else if (opt & kXPD_querynum) { TRACEP(REQ, "SendMsg: INTERNAL: got message with query number"); // Save query num message for later clients - if (xps->fQueryNum) - delete xps->fQueryNum; - xps->fQueryNum = new XrdSrvBuffer(msg, len, 1); + SafeDelete(xps->fQueryNum); + savedBuf = &(xps->fQueryNum); } else if (opt & kXPD_startprocess) { TRACEP(REQ, "SendMsg: INTERNAL: setting proofserv in 'running' state"); xps->fStatus = kXPD_running; // Save start processing message for later clients - if (xps->fStartMsg) - delete xps->fStartMsg; - xps->fStartMsg = new XrdSrvBuffer(msg, len, 1); + SafeDelete(xps->fStartMsg); + savedBuf = &(xps->fStartMsg); } else if (opt & kXPD_logmsg) { // We broadcast log messages only not idle to catch the // result from processing @@ -3208,7 +3323,7 @@ int XrdProofdProtocol::SendMsg() TRACEP(REQ, "SendMsg: INTERNAL: this sid: "<<sid<< "; client sid:"<<csid->fSid); csid->fP->fResponse.Set(csid->fSid); - rs = csid->fP->fResponse.Send(kXR_attn, kXPD_msg, msg, len); + rs = SendData(&(csid->fP->fResponse), -1, savedBuf); csid->fP->fResponse.Set(sid); } if (rs) { @@ -3230,7 +3345,7 @@ int XrdProofdProtocol::SendMsg() TRACEP(REQ, "SendMsg: INTERNAL: this sid: "<<sid<< "; client sid:"<<csid->fSid); csid->fP->fResponse.Set(csid->fSid); - rs = csid->fP->fResponse.Send(kXR_attn, kXPD_msg, msg, len); + rs = SendData(&(csid->fP->fResponse), -1, savedBuf); csid->fP->fResponse.Set(sid); } if (rs) {