XRootD
Loading...
Searching...
No Matches
XrdCmsResp.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d C m s R e s p . c c */
4/* */
5/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <cstdlib>
33#include <cstring>
34
37#include "XrdCms/XrdCmsResp.hh"
38#include "XrdCms/XrdCmsTrace.hh"
39
43#include "XrdSys/XrdSysError.hh"
44
45using namespace XrdCms;
46
47/******************************************************************************/
48/* G l o b a l s */
49/******************************************************************************/
50
51XrdSysSemaphore XrdCmsResp::isReady(0);
52XrdSysMutex XrdCmsResp::rdyMutex;
53XrdCmsResp *XrdCmsResp::First = 0;
54XrdCmsResp *XrdCmsResp::Last = 0;
55XrdSysMutex XrdCmsResp::myMutex;
56XrdCmsResp *XrdCmsResp::nextFree = 0;
57int XrdCmsResp::numFree = 0;
58int XrdCmsResp::RepDelay = 5;
59
60/******************************************************************************/
61/* A l l o c */
62/******************************************************************************/
63
65{
66 XrdCmsResp *rp;
67
68// Allocate a response object. We must be assured that the semaphore count
69// is zero. This will be true for freshly allocated objects. For reused
70// objects we will need to run down the count to zero as multiple calls
71// to sem_init may produced undefined behaviour.
72//
73 myMutex.Lock();
74 if (nextFree)
75 {rp = nextFree;
76 nextFree = rp->next;
77 numFree--;
78 rp->SyncCB.Init();
79 }
80 else if (!(rp = new XrdCmsResp()))
81 {myMutex.UnLock();
82 return (XrdCmsResp *)0;
83 }
84 myMutex.UnLock();
85
86// Initialize it. We also replace the callback object pointer with a pointer
87// to the synchronization semaphore as we have taken over the object and must
88// provide a callback synchronization path for the caller. The OucEI object
89// must be setup with pointers to stable areas and we will relocate any data
90// to allow for a message to be sent back without overwriting the data (usually
91// the path related to this request). We do this manually as the assignment
92// operator does a bit more and a bit less than what we really need to do here.
93//
94 strlcpy(rp->UserID, erp->getErrUser(), sizeof(rp->UserID));
95 rp->setErrUser(rp->UserID);
97 rp->setErrInfo(0, "");
98 rp->setErrMid(erp->getErrMid());
99 rp->ErrCB = erp->getErrCB(rp->ErrCBarg);
100 erp->setErrCB((XrdOucEICB *)&rp->SyncCB);
101 rp->myID = msgid;
102 rp->next = 0;
103
104// Return the response object
105//
106 return rp;
107}
108
109/******************************************************************************/
110/* R e c y c l e */
111/******************************************************************************/
112
113void XrdCmsResp::Recycle()
114{
115
116// Recycle appendages
117//
118 if (myBuff) {myBuff->Recycle(); myBuff = 0;}
119
120// We keep a stash of allocated response objects. If there are too many we
121// simply delete this object.
122//
123 if (XrdCmsResp::numFree >= XrdCmsResp::maxFree) delete this;
124 else {myMutex.Lock();
125 next = nextFree;
126 nextFree = this;
127 numFree++;
128 myMutex.UnLock();
129 }
130}
131
132/******************************************************************************/
133/* R e p l y */
134/******************************************************************************/
135
136// This version of reply simply queues the object for reply
137
138void XrdCmsResp::Reply(const char *manp, CmsRRHdr &rrhdr, XrdOucBuffer *netbuff)
139{
140
141// Copy the data we need to have
142//
143 myRRHdr = rrhdr;
144 myBuff = netbuff;
145 next = 0;
146 strlcpy(theMan, manp, sizeof(theMan));
147
148// Now queue this object
149//
150 rdyMutex.Lock();
151 if (Last) {Last->next = this; Last = this;}
152 else Last=First = this;
153 rdyMutex.UnLock();
154
155// Now indicate we have something to process
156//
157 isReady.Post();
158}
159
160/******************************************************************************/
161
162// This version of Reply() dequeues queued replies for processing
163
165{
166 XrdCmsResp *rp;
167
168// Endless look looking for something to reply to
169//
170 while(1)
171 {isReady.Wait();
172 rdyMutex.Lock();
173 if ((rp = First))
174 {if (!(First = rp->next)) Last = 0;
175 rdyMutex.UnLock();
176 rp->ReplyXeq();
177 } else rdyMutex.UnLock();
178 }
179}
180
181/******************************************************************************/
182/* R e p l y X e q */
183/******************************************************************************/
184
185void XrdCmsResp::ReplyXeq()
186{
187 EPNAME("Reply")
188 XrdOucEICB *theCB;
189 int Result;
190
191// If there is no callback object, ignore this call. Eventually, we may wish
192// to simulate a callback but this is rather complicated.
193//
194 if (!ErrCB)
195 {DEBUG("No callback object for user " <<UserID <<" msgid="
196 <<myRRHdr.streamid <<' ' <<theMan);
197 Recycle();
198 return;
199 }
200
201// Get the values for the callback.
202//
203 Result = XrdCmsParser::Decode(theMan,myRRHdr,myBuff,(XrdOucErrInfo *)this);
204
205// Translate the return code to what the caller's caller wanst to see. We
206// should only receive the indicated codes at this point.
207//
208 if (Result != SFS_REDIRECT && Result != SFS_STALL
209 && Result != SFS_DATA && Result != SFS_ERROR)
210 {char buff[16];
211 sprintf(buff, "%d", Result);
212 Say.Emsg("Reply", "Invalid call back result code", buff);
213 setErrInfo(EINVAL,"Invalid call back response from redirector.");
214 Result = SFS_ERROR;
215 }
216
217// Before invoking the callback we must be assured that the waitresp response
218// has been sent to the client. We do this by waiting on a semaphore which is
219// posted *after* the waitresp response is sent.
220//
221 SyncCB.Wait();
222
223// We now must request a callback to recycle this object once the callback
224// response is actually sent by setting the callback object pointer to us.
225//
226 theCB = ErrCB;
227 ErrCB = (XrdOucEICB *)this;
228
229// Invoke the callback
230//
231 theCB->Done(Result, (XrdOucErrInfo *)this, getErrData());
232}
233
234/******************************************************************************/
235/* X r d O d c R e s p Q */
236/******************************************************************************/
237/******************************************************************************/
238/* C o n s t r u c t o r */
239/******************************************************************************/
240
242{
243 memset(mqTab, 0, sizeof(mqTab));
244}
245
246/******************************************************************************/
247/* A d d */
248/******************************************************************************/
249
251{
252 int i;
253
254// Compute index and either add or chain the entry
255//
256 i = rp->myID % mqSize;
257 myMutex.Lock();
258 rp->next = (mqTab[i] ? mqTab[i] : 0);
259 mqTab[i] = rp;
260 myMutex.UnLock();
261}
262
263/******************************************************************************/
264/* P u r g e */
265/******************************************************************************/
266
268{
269 XrdCmsResp *rp;
270 int i;
271
272 myMutex.Lock();
273 for (i = 0; i < mqSize; i++)
274 {while ((rp = mqTab[i])) {mqTab[i] = rp->next; delete rp;}}
275 myMutex.UnLock();
276}
277
278/******************************************************************************/
279/* R e m */
280/******************************************************************************/
281
283{
284 int i;
285 XrdCmsResp *rp, *pp = 0;
286
287// Compute the index and find the entry
288//
289 i = msgid % mqSize;
290 myMutex.Lock();
291 rp = mqTab[i];
292 while(rp && rp->myID != msgid) {pp = rp; rp = rp->next;}
293
294// Remove the entry if we found it
295//
296 if (rp) {if (pp) pp->next = rp->next;
297 else mqTab[i] = rp->next;
298 }
299
300// Return what we found
301//
302 myMutex.UnLock();
303 return rp;
304}
#define DEBUG(x)
#define EPNAME(x)
#define SFS_DATA
#define SFS_ERROR
#define SFS_REDIRECT
#define SFS_STALL
if(Avsz)
size_t strlcpy(char *dst, const char *src, size_t sz)
static int Decode(const char *Man, XrdCms::CmsRRHdr &hdr, XrdOucBuffer *dBuff, XrdOucErrInfo *eInfo)
XrdCmsResp * Rem(int msgid)
void Add(XrdCmsResp *rp)
static void Reply()
static XrdCmsResp * Alloc(XrdOucErrInfo *erp, int msgid)
Definition XrdCmsResp.cc:64
void Recycle()
Recycle the buffer. The buffer may be reused in the future.
XrdOucEICB()
Constructor and destructor.
const char * getErrUser()
XrdOucEICB * getErrCB()
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
void setErrUser(const char *user)
const char * getErrData()
void setErrMid(int mid)
Set the monitoring identifier.
int setErrInfo(int code, const char *emsg)
XrdOucErrInfo(const char *user=0, XrdOucEICB *cb=0, unsigned long long ca=0, int mid=0, int uc=0)
XrdOucEICB * ErrCB
void setErrData(const char *Data, int Offs=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysError Say
kXR_unt32 streamid
Definition YProtocol.hh:83
static const int Path_Offset