XRootD
Loading...
Searching...
No Matches
XrdBwmHandle.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d B w m H a n d l e . c c */
4/* */
5/* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <cstring>
33
36#include "XrdBwm/XrdBwmTrace.hh"
38#include "XrdSys/XrdSysError.hh"
40
42
43/******************************************************************************/
44/* S t a t i c O b j e c t s */
45/******************************************************************************/
46
47XrdBwmLogger *XrdBwmHandle::Logger = 0;
48XrdBwmPolicy *XrdBwmHandle::Policy = 0;
49XrdBwmHandle *XrdBwmHandle::Free = 0;
50unsigned int XrdBwmHandle::numQueued = 0;
51
53
54/******************************************************************************/
55/* L o c a l C l a s s e s */
56/******************************************************************************/
57
59{
60public:
61
62static
64 {XrdBwmHandleCB *mP;
65 xMutex.Lock();
66 if (!(mP = Free)) mP = new XrdBwmHandleCB;
67 else Free = mP->Next;
68 xMutex.UnLock();
69 return mP;
70 }
71
72void Done(int &Results, XrdOucErrInfo *eInfo, const char *Path=0)
73 {xMutex.Lock();
74 Next = Free;
75 Free = this;
76 xMutex.UnLock();
77 }
78
79int Same(unsigned long long arg1, unsigned long long arg2) {return 0;}
80
81 XrdBwmHandleCB() : Next(0) {}
83
84private:
85 XrdBwmHandleCB *Next;
86static XrdSysMutex xMutex;
87static XrdBwmHandleCB *Free;
88};
89
90XrdSysMutex XrdBwmHandleCB::xMutex;
91XrdBwmHandleCB *XrdBwmHandleCB::Free = 0;
92
93/******************************************************************************/
94/* E x t e r n a l L i n k a g e s */
95/******************************************************************************/
96
97void *XrdBwmHanXeq(void *pp)
98{
100}
101
102/******************************************************************************/
103/* c l a s s X r d B w m H a n d l e */
104/******************************************************************************/
105/******************************************************************************/
106/* A c t i v a t e */
107/******************************************************************************/
108
109#define tident Parms.Tident
110
112{
113 EPNAME("Activate");
114 XrdSysMutexHelper myHelper(hMutex);
115 char *rBuff;
116 int rSize, rc;
117
118// Check the status of this request.
119//
120 if (Status != Idle)
121 {if (Status == Scheduled)
122 einfo.setErrInfo(kXR_inProgress, "Request already scheduled.");
123 else einfo.setErrInfo(kXR_InvalidRequest, "Visa already issued.");
124 return SFS_ERROR;
125 }
126
127// Try to schedule this request.
128//
129 qTime = time(0);
130 rBuff = einfo.getMsgBuff(rSize);
131 if (!(rc = Policy->Schedule(rBuff, rSize, Parms))) return SFS_ERROR;
132
133// If resource immediately available, let client run
134//
135 if (rc > 0)
136 {rHandle = rc;
138 rTime = time(0);
139 ZTRACE(sched,"Run " <<Parms.Lfn <<' ' <<Parms.LclNode
140 <<(Parms.Direction==XrdBwmPolicy::Incoming?" <- ":" -> ")
141 <<Parms.RmtNode);
142 einfo.setErrCode(strlen(rBuff));
143 return (*rBuff ? SFS_DATA : SFS_OK);
144 }
145
146// Request was queued. We need to hold on to this so we can issue an async
147// response later when the resource becomes available.
148//
149 rHandle = -rc;
150 ErrCB = einfo.getErrCB(ErrCBarg);
151 einfo.setErrCB((XrdOucEICB *)&myEICB);
153 refHandle(rHandle, this);
154 ZTRACE(sched, "inQ " <<Parms.Lfn <<' ' <<Parms.LclNode
155 <<(Parms.Direction==XrdBwmPolicy::Incoming?" <- ":" -> ")
156 <<Parms.RmtNode);
157
158// Indicate that client needs to wait
159//
160 return SFS_STARTED;
161}
162#undef tident
163
164/******************************************************************************/
165/* static public A l l o c # 1 */
166/******************************************************************************/
167
168XrdBwmHandle *XrdBwmHandle::Alloc(const char *theUsr, const char *thePath,
169 const char *LclNode, const char *RmtNode,
170 int Incoming)
171{
172 XrdBwmHandle *hP = Alloc();
173
174// Initialize the hanlde
175//
176 if (hP)
177 {hP->Parms.Tident = theUsr; // Always available
178 hP->Parms.Lfn = strdup(thePath);
179 hP->Parms.LclNode = strdup(LclNode);
180 hP->Parms.RmtNode = strdup(RmtNode);
181 hP->Parms.Direction = (Incoming ? XrdBwmPolicy::Incoming
183 hP->Status = Idle;
184 hP->qTime = 0;
185 hP->rTime = 0;
186 hP->xSize = 0;
187 hP->xTime = 0;
188 }
189
190// All done
191//
192 return hP;
193}
194
195/******************************************************************************/
196/* private A l l o c # 2 */
197/******************************************************************************/
198
200{
201 static const int minAlloc = 4096/sizeof(XrdBwmHandle);
202 static XrdSysMutex aMutex;
203 XrdBwmHandle *hP;
204
205// No handle currently in the table. Get a new one off the free list or
206// return one to the free list.
207//
208 aMutex.Lock();
209 if (old_hP) {old_hP->Next = Free; Free = old_hP; hP = 0;}
210 else {if (!Free && (hP = new XrdBwmHandle[minAlloc]))
211 {int i = minAlloc; while(i--) {hP->Next = Free; Free = hP; hP++;}}
212 if ((hP = Free)) Free = hP->Next;
213 }
214 aMutex.UnLock();
215
216 return hP;
217}
218
219/******************************************************************************/
220/* D i s p a t c h */
221/******************************************************************************/
222
223#define tident hP->Parms.Tident
224
226{
227 EPNAME("Dispatch");
229 XrdBwmHandle *hP;
230 char *RespBuff;
231 int RespSize, readyH, Result, Err;
232
233// Dispatch ready requests in an endless loop
234//
235 do {
236
237// Setup buffer
238//
239 RespBuff = erP->getMsgBuff(RespSize);
240 *RespBuff = '\0';
241 erP->setErrCode(0);
242
243// Get next ready request and test if it ended with an error
244//
245 if ((Err = (readyH = Policy->Dispatch(RespBuff, RespSize)) < 0))
246 readyH = -readyH;
247
248// Find the matching handle
249//
250 if (!(hP = refHandle(readyH)))
251 {sprintf(RespBuff, "%d", readyH);
252 BwmEroute.Emsg("Dispatch", "Lost handle from", RespBuff);
253 if (!Err) Policy->Done(readyH);
254 continue;
255 }
256
257// Lock the handle and make sure it can be dispatched
258//
259 hP->hMutex.Lock();
260 if (hP->Status != Scheduled)
261 {BwmEroute.Emsg("Dispatch", "ref to unscheduled handle",
262 hP->Parms.Tident, hP->Parms.Lfn);
263 if (!Err) Policy->Done(readyH);
264 } else {
265 hP->myEICB.Wait(); hP->rTime = time(0);
266 erP->setErrCB((XrdOucEICB *)erP, hP->ErrCBarg);
267 if (Err) {hP->Status = Idle; Result = SFS_ERROR;}
268 else {hP->Status = Dispatched;
269 erP->setErrCode(strlen(RespBuff));
270 Result = (*RespBuff ? SFS_DATA : SFS_OK);
271 }
272 ZTRACE(sched,(Err?"Err ":"Run ") <<hP->Parms.Lfn <<' ' <<hP->Parms.LclNode
273 <<(hP->Parms.Direction == XrdBwmPolicy::Incoming ? " <- ":" -> ")
274 <<hP->Parms.RmtNode);
275 hP->ErrCB->Done(Result, (XrdOucErrInfo *)erP);
276 erP = XrdBwmHandleCB::Alloc();
277 }
278 hP->hMutex.UnLock();
279 } while(1);
280
281// Keep the compiler happy
282//
283 return (void *)0;
284}
285
286#undef tident
287
288/******************************************************************************/
289/* private r e f H a n d l e */
290/******************************************************************************/
291
292XrdBwmHandle *XrdBwmHandle::refHandle(int refID, XrdBwmHandle *hP)
293{
294 static XrdSysMutex tMutex;
295 static struct {XrdBwmHandle *First;
296 XrdBwmHandle *Last;
297 } hTab[256] = {{0,0}};
298 XrdBwmHandle *pP = 0;
299 int i = refID % 256;
300
301// If we have a handle passed, add the handle to the table
302//
303 tMutex.Lock();
304 if (hP)
305 {hP->Next = 0;
306 if (hTab[i].Last) {hTab[i].Last->Next = hP; hTab[i].Last = hP;}
307 else {hTab[i].First = hTab[i].Last = hP; hP->Next = 0;}
308 numQueued++;
309 } else {
310 hP = hTab[i].First;
311 while(hP && hP->rHandle != refID) {pP = hP; hP = hP->Next;}
312 if (hP)
313 {if (pP) pP->Next = hP->Next;
314 else hTab[i].First = hP->Next;
315 if (hTab[i].Last == hP) hTab[i].Last = pP;
316 numQueued--;
317 }
318 }
319 tMutex.UnLock();
320
321// All done.
322//
323 return hP;
324}
325
326/******************************************************************************/
327/* public R e t i r e */
328/******************************************************************************/
329
330// The handle must be locked upon entry! It is unlocked upon exit.
331
333{
334 XrdSysMutexHelper myHelper(hMutex);
335
336// Get the global lock as the links field can only be manipulated with it.
337// If not idle, cancel the resource. If scheduled, remove it from the table.
338//
339 if (Status != Idle)
340 {Policy->Done(rHandle);
341 if (Status == Scheduled && !refHandle(rHandle, this))
342 BwmEroute.Emsg("Retire", "Lost handle to", Parms.Tident, Parms.Lfn);
343 Status = Idle; rHandle = 0;
344 }
345
346// If we have a logger, then log this event
347//
348 if (Logger && qTime)
349 {XrdBwmLogger::Info myInfo;
350 myInfo.Tident = Parms.Tident;
351 myInfo.Lfn = Parms.Lfn;
352 myInfo.lclNode = Parms.LclNode;
353 myInfo.rmtNode = Parms.RmtNode;
354 myInfo.ATime = qTime;
355 myInfo.BTime = rTime;
356 myInfo.CTime = time(0);
357 myInfo.Size = xSize;
358 myInfo.ESec = xTime;
359 myInfo.Flow = (Parms.Direction == XrdBwmPolicy::Incoming ? 'I':'O');
360 Policy->Status(myInfo.numqIn, myInfo.numqOut, myInfo.numqXeq);
361 Logger->Event(myInfo);
362 }
363
364// Free storage appendages and recycle handle
365//
366 if (Parms.Lfn) {free(Parms.Lfn); Parms.Lfn = 0;}
367 if (Parms.LclNode) {free(Parms.LclNode); Parms.LclNode = 0;}
368 if (Parms.RmtNode) {free(Parms.RmtNode); Parms.RmtNode = 0;}
369 Alloc(this);
370}
371
372/******************************************************************************/
373/* s e t P o l i c y */
374/******************************************************************************/
375
377{
378 pthread_t tid;
379 int rc, startThread = (Policy == 0);
380
381// Set the policy and then start a thread to do dispatching if we have none
382//
383 Policy = pP;
384 if (startThread)
385 if ((rc = XrdSysThread::Run(&tid, XrdBwmHanXeq, (void *)0,
386 0, "Handle Dispatcher")))
387 {BwmEroute.Emsg("setPolicy", rc, "create handle dispatch thread");
388 return 1;
389 }
390
391// All done
392//
393 Logger = lP;
394 return 0;
395}
@ kXR_InvalidRequest
Definition XProtocol.hh:996
@ kXR_inProgress
void * XrdBwmHanXeq(void *pp)
#define EPNAME(x)
#define ZTRACE(act, x)
XrdSysError BwmEroute(0)
Definition XrdBwm.cc:69
#define Err(p, a, b, c)
XrdOucString Path
#define SFS_DATA
#define SFS_ERROR
#define SFS_STARTED
#define SFS_OK
static XrdBwmHandleCB * Alloc()
void Done(int &Results, XrdOucErrInfo *eInfo, const char *Path=0)
int Same(unsigned long long arg1, unsigned long long arg2)
static void * Dispatch()
int Activate(XrdOucErrInfo &einfo)
HandleState Status
static XrdBwmHandle * Alloc(const char *theUsr, const char *thePath, const char *lclNode, const char *rmtNode, int Incoming)
static int setPolicy(XrdBwmPolicy *pP, XrdBwmLogger *lP)
const char * rmtNode
const char * lclNode
const char * Tident
virtual void Done(int &Result, XrdOucErrInfo *eInfo, const char *Path=0)=0
XrdOucEICB()
Constructor and destructor.
XrdOucEICB * getErrCB()
char * getMsgBuff(int &mblen)
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
int setErrInfo(int code, const char *emsg)
XrdOucErrInfo(const char *user=0, XrdOucEICB *cb=0, unsigned long long ca=0, int mid=0, int uc=0)
int setErrCode(int code)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)