XRootD
Loading...
Searching...
No Matches
XrdClXRootDMsgHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClMessage.hh"
31#include "XrdCl/XrdClURL.hh"
32#include "XrdCl/XrdClUtils.hh"
39#include "XrdCl/XrdClSocket.hh"
40#include "XrdCl/XrdClTls.hh"
42
43#include "XrdOuc/XrdOucCRC.hh"
45
46#include "XrdSys/XrdSysPlatform.hh" // same as above
49#include <memory>
50#include <sstream>
51#include <numeric>
52
53namespace
54{
55 //----------------------------------------------------------------------------
56 // We need an extra task what will run the handler in the future, because
57 // tasks get deleted and we need the handler
58 //----------------------------------------------------------------------------
59 class WaitTask: public XrdCl::Task
60 {
61 public:
62 WaitTask( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
63 {
64 std::ostringstream o;
65 o << "WaitTask for: 0x" << handler->GetRequest();
66 SetName( o.str() );
67 }
68
69 virtual time_t Run( time_t now )
70 {
71 pHandler->WaitDone( now );
72 return 0;
73 }
74 private:
75 XrdCl::XRootDMsgHandler *pHandler;
76 };
77}
78
79namespace XrdCl
80{
81 //----------------------------------------------------------------------------
82 // Delegate the response handling to the thread-pool
83 //----------------------------------------------------------------------------
85 {
86 public:
87 HandleRspJob( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
88 {
89
90 }
91
92 virtual ~HandleRspJob()
93 {
94
95 }
96
97 virtual void Run( void *arg )
98 {
99 pHandler->HandleResponse();
100 delete this;
101 }
102 private:
103 XrdCl::XRootDMsgHandler *pHandler;
104 };
105
106 //----------------------------------------------------------------------------
107 // Examine an incoming message, and decide on the action to be taken
108 //----------------------------------------------------------------------------
109 uint16_t XRootDMsgHandler::Examine( std::shared_ptr<Message> &msg )
110 {
111 const int sst = pSendingState.fetch_or( kSawResp );
112
113 if( !( sst & kSendDone ) && !( sst & kSawResp ) )
114 {
115 // we must have been sent although we haven't got the OnStatusReady
116 // notification yet. Set the inflight notice.
117
118 Log *log = DefaultEnv::GetLog();
119 log->Dump( XRootDMsg, "[%s] Message %s reply received before notification "
120 "that it was sent, assuming it was sent ok.",
121 pUrl.GetHostId().c_str(),
122 pRequest->GetObfuscatedDescription().c_str() );
123
124 pMsgInFly = true;
125 }
126
127 //--------------------------------------------------------------------------
128 // if the MsgHandler is already being used to process another request
129 // (kXR_oksofar) we need to wait
130 //--------------------------------------------------------------------------
131 if( pOksofarAsAnswer )
132 {
133 XrdSysCondVarHelper lck( pCV );
134 while( pResponse ) pCV.Wait();
135 }
136 else
137 {
138 if( pResponse )
139 {
140 Log *log = DefaultEnv::GetLog();
141 log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
142 "it already owns a response: %p (message: %s ).",
143 pUrl.GetHostId().c_str(), this,
144 pRequest->GetObfuscatedDescription().c_str() );
145 }
146 }
147
148 if( msg->GetSize() < 8 )
149 return Ignore;
150
151 ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
152 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
153 uint16_t status = 0;
154 uint32_t dlen = 0;
155
156 //--------------------------------------------------------------------------
157 // We only care about async responses, but those are extracted now
158 // in the SocketHandler.
159 //--------------------------------------------------------------------------
160 if( rsp->hdr.status == kXR_attn )
161 {
162 return Ignore;
163 }
164 //--------------------------------------------------------------------------
165 // We got a sync message - check if it belongs to us
166 //--------------------------------------------------------------------------
167 else
168 {
169 if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
170 rsp->hdr.streamid[1] != req->header.streamid[1] )
171 return Ignore;
172
173 status = rsp->hdr.status;
174 dlen = rsp->hdr.dlen;
175 }
176
177 //--------------------------------------------------------------------------
178 // We take the ownership of the message and decide what we will do
179 // with the handler itself, the options are:
180 // 1) we want to either read in raw mode (the Raw flag) or have the message
181 // body reconstructed for us by the TransportHandler by the time
182 // Process() is called (default, no extra flag)
183 // 2) we either got a full response in which case we don't want to be
184 // notified about anything anymore (RemoveHandler) or we got a partial
185 // answer and we need to wait for more (default, no extra flag)
186 //--------------------------------------------------------------------------
187 pResponse = msg;
188 pBodyReader->SetDataLength( dlen );
189
190 Log *log = DefaultEnv::GetLog();
191 switch( status )
192 {
193 //------------------------------------------------------------------------
194 // Handle the cached cases
195 //------------------------------------------------------------------------
196 case kXR_error:
197 case kXR_redirect:
198 case kXR_wait:
199 return RemoveHandler;
200
201 case kXR_waitresp:
202 {
203 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
204 "message %s", pUrl.GetHostId().c_str(),
205 pRequest->GetObfuscatedDescription().c_str() );
206
207 pResponse.reset();
208 return Ignore; // This must be handled synchronously!
209 }
210
211 //------------------------------------------------------------------------
212 // Handle the potential raw cases
213 //------------------------------------------------------------------------
214 case kXR_ok:
215 {
216 //----------------------------------------------------------------------
217 // For kXR_read we read in raw mode
218 //----------------------------------------------------------------------
219 uint16_t reqId = ntohs( req->header.requestid );
220 if( reqId == kXR_read )
221 {
222 return Raw | RemoveHandler;
223 }
224
225 //----------------------------------------------------------------------
226 // kXR_readv is the same as kXR_read
227 //----------------------------------------------------------------------
228 if( reqId == kXR_readv )
229 {
230 return Raw | RemoveHandler;
231 }
232
233 //----------------------------------------------------------------------
234 // For everything else we just take what we got
235 //----------------------------------------------------------------------
236 return RemoveHandler;
237 }
238
239 //------------------------------------------------------------------------
240 // kXR_oksofars are special, they are not full responses, so we reset
241 // the response pointer to 0 and add the message to the partial list
242 //------------------------------------------------------------------------
243 case kXR_oksofar:
244 {
245 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
246 "%s", pUrl.GetHostId().c_str(),
247 pRequest->GetObfuscatedDescription().c_str() );
248
249 if( !pOksofarAsAnswer )
250 {
251 pPartialResps.emplace_back( std::move( pResponse ) );
252 }
253
254 //----------------------------------------------------------------------
255 // For kXR_read we either read in raw mode if the message has not
256 // been fully reconstructed already, if it has, we adjust
257 // the buffer offset to prepare for the next one
258 //----------------------------------------------------------------------
259 uint16_t reqId = ntohs( req->header.requestid );
260 if( reqId == kXR_read )
261 {
262 pTimeoutFence.store( true, std::memory_order_relaxed );
263 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
264 }
265
266 //----------------------------------------------------------------------
267 // kXR_readv is similar to read, except that the payload is different
268 //----------------------------------------------------------------------
269 if( reqId == kXR_readv )
270 {
271 pTimeoutFence.store( true, std::memory_order_relaxed );
272 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
273 }
274
275 return ( pOksofarAsAnswer ? None : NoProcess );
276 }
277
278 case kXR_status:
279 {
280 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
281 "%s", pUrl.GetHostId().c_str(),
282 pRequest->GetObfuscatedDescription().c_str() );
283
284 uint16_t reqId = ntohs( req->header.requestid );
285 if( reqId == kXR_pgwrite )
286 {
287 //--------------------------------------------------------------------
288 // In case of pgwrite by definition this wont be a partial response
289 // so we can already remove the handler from the in-queue
290 //--------------------------------------------------------------------
291 return RemoveHandler;
292 }
293
294 //----------------------------------------------------------------------
295 // Otherwise (pgread), first of all we need to read the body of the
296 // kXR_status response, we can handle the raw data (if any) only after
297 // we have the whole kXR_status body
298 //----------------------------------------------------------------------
299 pTimeoutFence.store( true, std::memory_order_relaxed );
300 return None;
301 }
302
303 //------------------------------------------------------------------------
304 // Default
305 //------------------------------------------------------------------------
306 default:
307 return RemoveHandler;
308 }
309 return RemoveHandler;
310 }
311
312 //----------------------------------------------------------------------------
313 // Reexamine the incoming message, and decide on the action to be taken
314 //----------------------------------------------------------------------------
316 {
317 if( !pResponse )
318 return 0;
319
320 Log *log = DefaultEnv::GetLog();
321 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
322
323 //--------------------------------------------------------------------------
324 // Additional action is only required for kXR_status
325 //--------------------------------------------------------------------------
326 if( rsp->hdr.status != kXR_status ) return 0;
327
328 //--------------------------------------------------------------------------
329 // Ignore malformed status response
330 //--------------------------------------------------------------------------
331 if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
332 {
333 log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
334 return Corrupted;
335 }
336
337 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
338 uint16_t reqId = ntohs( req->header.requestid );
339 //--------------------------------------------------------------------------
340 // Unmarshal the status body
341 //--------------------------------------------------------------------------
342 XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
343
344 if( !st.IsOK() && st.code == errDataError )
345 {
346 log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
347 st.GetErrorMessage().c_str() );
348 return Corrupted;
349 }
350
351 if( !st.IsOK() )
352 {
353 log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
354 pUrl.GetHostId().c_str() );
355 pStatus = st;
356 HandleRspOrQueue();
357 return Ignore;
358 }
359
360 //--------------------------------------------------------------------------
361 // Common handling for partial results
362 //--------------------------------------------------------------------------
363 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
365 {
366 pPartialResps.push_back( std::move( pResponse ) );
367 }
368
369 //--------------------------------------------------------------------------
370 // Decide the actions that we need to take
371 //--------------------------------------------------------------------------
372 uint16_t action = 0;
373 if( reqId == kXR_pgread )
374 {
375 //----------------------------------------------------------------------
376 // The message contains only Status header and body but no raw data
377 //----------------------------------------------------------------------
378 if( !pPageReader )
379 pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
380 pPageReader->SetRsp( rspst );
381
382 action |= Raw;
383
385 action |= NoProcess;
386 else
387 action |= RemoveHandler;
388 }
389 else if( reqId == kXR_pgwrite )
390 {
391 // if data corruption has been detected on the server side we will
392 // send some additional data pointing to the pages that need to be
393 // retransmitted
394 if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
395 pResponse->GetCursor() )
396 action |= More;
397 }
398
399 return action;
400 }
401
402 //----------------------------------------------------------------------------
403 // Get handler sid
404 //----------------------------------------------------------------------------
406 {
407 ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
408 return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
409 }
410
411 //----------------------------------------------------------------------------
413 //----------------------------------------------------------------------------
415 {
416 Log *log = DefaultEnv::GetLog();
417
418 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
419
420 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
421
422 //--------------------------------------------------------------------------
423 // If it is a local file, it can be only a metalink redirector
424 //--------------------------------------------------------------------------
425 if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
426 pHosts->back().protocol = kXR_PROTOCOLVERSION;
427
428 //--------------------------------------------------------------------------
429 // We got an answer, check who we were talking to
430 //--------------------------------------------------------------------------
431 else
432 {
433 AnyObject qryResult;
434 int *qryResponse = 0;
435 pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
436 qryResult.Get( qryResponse );
437 pHosts->back().flags = *qryResponse; delete qryResponse; qryResponse = 0;
438 pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
439 qryResult.Get( qryResponse );
440 pHosts->back().protocol = *qryResponse; delete qryResponse;
441 }
442
443 //--------------------------------------------------------------------------
444 // Process the message
445 //--------------------------------------------------------------------------
446 Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
447 if( !st.IsOK() )
448 {
449 pStatus = Status( stFatal, errInvalidMessage );
450 HandleResponse();
451 return;
452 }
453
454 //--------------------------------------------------------------------------
455 // we have an response for the message so it's not in fly anymore
456 //--------------------------------------------------------------------------
457 pMsgInFly = false;
458
459 //--------------------------------------------------------------------------
460 // Reset the aggregated wait (used to omit wait response in case of Metalink
461 // redirector)
462 //--------------------------------------------------------------------------
463 if( rsp->hdr.status != kXR_wait )
464 pAggregatedWaitTime = 0;
465
466 switch( rsp->hdr.status )
467 {
468 //------------------------------------------------------------------------
469 // kXR_ok - we're done here
470 //------------------------------------------------------------------------
471 case kXR_ok:
472 {
473 log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
474 pUrl.GetHostId().c_str(),
475 pRequest->GetObfuscatedDescription().c_str() );
476 pStatus = Status();
477 HandleResponse();
478 return;
479 }
480
481 case kXR_status:
482 {
483 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
484 pUrl.GetHostId().c_str(),
485 pRequest->GetObfuscatedDescription().c_str() );
486 pStatus = Status();
487 HandleResponse();
488 return;
489 }
490
491 //------------------------------------------------------------------------
492 // kXR_ok - we're serving partial result to the user
493 //------------------------------------------------------------------------
494 case kXR_oksofar:
495 {
496 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
497 pUrl.GetHostId().c_str(),
498 pRequest->GetObfuscatedDescription().c_str() );
499 pStatus = Status( stOK, suContinue );
500 HandleResponse();
501 return;
502 }
503
504 //------------------------------------------------------------------------
505 // kXR_error - we've got a problem
506 //------------------------------------------------------------------------
507 case kXR_error:
508 {
509 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
510 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
511 log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
512 "[%d] %s", pUrl.GetHostId().c_str(),
513 pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
514 errmsg );
515 delete [] errmsg;
516
517 HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
518 return;
519 }
520
521 //------------------------------------------------------------------------
522 // kXR_redirect - they tell us to go elsewhere
523 //------------------------------------------------------------------------
524 case kXR_redirect:
525 {
526 if( rsp->hdr.dlen <= 4 )
527 {
528 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
529 pUrl.GetHostId().c_str() );
530 pStatus = Status( stError, errInvalidResponse );
531 HandleResponse();
532 return;
533 }
534
535 char *urlInfoBuff = new char[rsp->hdr.dlen-3];
536 urlInfoBuff[rsp->hdr.dlen-4] = 0;
537 memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
538 std::string urlInfo = urlInfoBuff;
539 delete [] urlInfoBuff;
540 log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
541 "message %s: %s, port %d", pUrl.GetHostId().c_str(),
542 pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
543 rsp->body.redirect.port );
544
545 //----------------------------------------------------------------------
546 // Check if we can proceed
547 //----------------------------------------------------------------------
548 if( !pRedirectCounter )
549 {
550 log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
551 "message %s, the last known error is: %s",
552 pUrl.GetHostId().c_str(),
553 pRequest->GetObfuscatedDescription().c_str(),
554 pLastError.ToString().c_str() );
555
556
557 pStatus = Status( stFatal, errRedirectLimit );
558 HandleResponse();
559 return;
560 }
561 --pRedirectCounter;
562
563 //----------------------------------------------------------------------
564 // Keep the info about this server if we still need to find a load
565 // balancer
566 //----------------------------------------------------------------------
567 uint32_t flags = pHosts->back().flags;
568 if( !pHasLoadBalancer )
569 {
570 if( flags & kXR_isManager )
571 {
572 //------------------------------------------------------------------
573 // If the current server is a meta manager then it supersedes
574 // any existing load balancer, otherwise we assign a load-balancer
575 // only if it has not been already assigned
576 //------------------------------------------------------------------
577 if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
578 {
579 pLoadBalancer = pHosts->back();
580 log->Dump( XRootDMsg, "[%s] Current server has been assigned "
581 "as a load-balancer for message %s",
582 pUrl.GetHostId().c_str(),
583 pRequest->GetObfuscatedDescription().c_str() );
584 HostList::iterator it;
585 for( it = pHosts->begin(); it != pHosts->end(); ++it )
586 it->loadBalancer = false;
587 pHosts->back().loadBalancer = true;
588 }
589 }
590 }
591
592 //----------------------------------------------------------------------
593 // If the redirect comes from a data server safe the URL because
594 // in case of a failure we will use it as the effective data server URL
595 // for the tried CGI opaque info
596 //----------------------------------------------------------------------
597 if( flags & kXR_isServer )
598 pEffectiveDataServerUrl = new URL( pHosts->back().url );
599
600 //----------------------------------------------------------------------
601 // Build the URL and check it's validity
602 //----------------------------------------------------------------------
603 std::vector<std::string> urlComponents;
604 std::string newCgi;
605 Utils::splitString( urlComponents, urlInfo, "?" );
606
607 std::ostringstream o;
608
609 o << urlComponents[0];
610 if( rsp->body.redirect.port > 0 )
611 o << ":" << rsp->body.redirect.port << "/";
612 else if( rsp->body.redirect.port < 0 )
613 {
614 //--------------------------------------------------------------------
615 // check if the manager wants to enforce write recovery at himself
616 // (beware we are dealing here with negative flags)
617 //--------------------------------------------------------------------
618 if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
619 pHosts->back().flags |= kXR_recoverWrts;
620
621 //--------------------------------------------------------------------
622 // check if the manager wants to collapse the communication channel
623 // (the redirect host is to replace the current host)
624 //--------------------------------------------------------------------
625 if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
626 {
627 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
628 pPostMaster->CollapseRedirect( pUrl, url );
629 }
630
631 if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
632 {
633 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
634 if( Utils::CheckEC( pRequest, url ) )
635 pRedirectAsAnswer = true;
636 }
637 }
638
639 URL newUrl = URL( o.str() );
640 if( !newUrl.IsValid() )
641 {
643 log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
644 pUrl.GetHostId().c_str(), urlInfo.c_str() );
645 HandleResponse();
646 return;
647 }
648
649 if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
650 newUrl.SetUserName( pUrl.GetUserName() );
651
652 if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
653 newUrl.SetPassword( pUrl.GetPassword() );
654
655 //----------------------------------------------------------------------
656 // Forward any "xrd.*" params from the original client request also to
657 // the new redirection url
658 // Also, we need to preserve any "xrdcl.*' as they are important for
659 // our internal workflows.
660 //----------------------------------------------------------------------
661 std::ostringstream ossXrd;
662 const URL::ParamsMap &urlParams = pUrl.GetParams();
663
664 for(URL::ParamsMap::const_iterator it = urlParams.begin();
665 it != urlParams.end(); ++it )
666 {
667 if( it->first.compare( 0, 4, "xrd." ) &&
668 it->first.compare( 0, 6, "xrdcl." ) )
669 continue;
670
671 ossXrd << it->first << '=' << it->second << '&';
672 }
673
674 std::string xrdCgi = ossXrd.str();
675 pRedirectUrl = newUrl.GetURL();
676
677 URL cgiURL;
678 if( urlComponents.size() > 1 )
679 {
680 pRedirectUrl += "?";
681 pRedirectUrl += urlComponents[1];
682 std::ostringstream o;
683 o << "fake://fake:111//fake?";
684 o << urlComponents[1];
685
686 if( urlComponents.size() == 3 )
687 o << '?' << urlComponents[2];
688
689 if (!xrdCgi.empty())
690 {
691 o << '&' << xrdCgi;
692 pRedirectUrl += '&';
693 pRedirectUrl += xrdCgi;
694 }
695
696 cgiURL = URL( o.str() );
697 }
698 else {
699 if (!xrdCgi.empty())
700 {
701 std::ostringstream o;
702 o << "fake://fake:111//fake?";
703 o << xrdCgi;
704 cgiURL = URL( o.str() );
705 pRedirectUrl += '?';
706 pRedirectUrl += xrdCgi;
707 }
708 }
709
710 //----------------------------------------------------------------------
711 // Check if we need to return the URL as a response
712 //----------------------------------------------------------------------
713 if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
714 newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
715 !newUrl.IsLocalFile() )
716 pRedirectAsAnswer = true;
717
718 if( pRedirectAsAnswer )
719 {
720 pStatus = Status( stError, errRedirect );
721 HandleResponse();
722 return;
723 }
724
725 //----------------------------------------------------------------------
726 // Rewrite the message in a way required to send it to another server
727 //----------------------------------------------------------------------
728 newUrl.SetParams( cgiURL.GetParams() );
729 Status st = RewriteRequestRedirect( newUrl );
730 if( !st.IsOK() )
731 {
732 pStatus = st;
733 HandleResponse();
734 return;
735 }
736
737 //----------------------------------------------------------------------
738 // Make sure we don't change the protocol by accident (root vs roots)
739 //----------------------------------------------------------------------
740 if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
741 ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
742 newUrl.SetProtocol( "roots" );
743
744 //----------------------------------------------------------------------
745 // Send the request to the new location
746 //----------------------------------------------------------------------
747 HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
748 return;
749 }
750
751 //------------------------------------------------------------------------
752 // kXR_wait - we wait, and re-issue the request later
753 //------------------------------------------------------------------------
754 case kXR_wait:
755 {
756 uint32_t waitSeconds = 0;
757
758 if( rsp->hdr.dlen >= 4 )
759 {
760 char *infoMsg = new char[rsp->hdr.dlen-3];
761 infoMsg[rsp->hdr.dlen-4] = 0;
762 memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
763 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
764 "message %s: %s", pUrl.GetHostId().c_str(),
765 rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
766 infoMsg );
767 delete [] infoMsg;
768 waitSeconds = rsp->body.wait.seconds;
769 }
770 else
771 {
772 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
773 "message %s", pUrl.GetHostId().c_str(),
774 pRequest->GetObfuscatedDescription().c_str() );
775 }
776
777 pAggregatedWaitTime += waitSeconds;
778
779 // We need a special case if the data node comes from metalink
780 // redirector. In this case it might make more sense to try the
781 // next entry in the Metalink than wait.
782 if( OmitWait( *pRequest, pLoadBalancer.url ) )
783 {
784 int maxWait = DefaultMaxMetalinkWait;
785 DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
786 if( pAggregatedWaitTime > maxWait )
787 {
788 UpdateTriedCGI();
789 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
790 return;
791 }
792 }
793
794 //----------------------------------------------------------------------
795 // Some messages require rewriting before they can be sent again
796 // after wait
797 //----------------------------------------------------------------------
798 Status st = RewriteRequestWait();
799 if( !st.IsOK() )
800 {
801 pStatus = st;
802 HandleResponse();
803 return;
804 }
805
806 //----------------------------------------------------------------------
807 // Register a task to resend the message in some seconds, if we still
808 // have time to do that, and report a timeout otherwise
809 //----------------------------------------------------------------------
810 time_t resendTime = ::time(0)+waitSeconds;
811
812 if( resendTime < pExpiration )
813 {
814 log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
815 pUrl.GetHostId().c_str(), this,
816 pRequest->GetObfuscatedDescription().c_str() );
817
818 TaskManager *taskMgr = pPostMaster->GetTaskManager();
819 taskMgr->RegisterTask( new WaitTask( this ), resendTime );
820 }
821 else
822 {
823 log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
824 pUrl.GetHostId().c_str(),
825 pRequest->GetObfuscatedDescription().c_str() );
826 HandleError( Status( stError, errOperationExpired) );
827 }
828 return;
829 }
830
831 //------------------------------------------------------------------------
832 // kXR_waitresp - the response will be returned in some seconds as an
833 // unsolicited message. Currently all messages of this type are handled
834 // one step before in the XrdClStream::OnIncoming as they need to be
835 // processed synchronously.
836 //------------------------------------------------------------------------
837 case kXR_waitresp:
838 {
839 if( rsp->hdr.dlen < 4 )
840 {
841 log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
842 pUrl.GetHostId().c_str() );
843 pStatus = Status( stError, errInvalidResponse );
844 HandleResponse();
845 return;
846 }
847
848 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
849 "message %s", pUrl.GetHostId().c_str(),
850 rsp->body.waitresp.seconds,
851 pRequest->GetObfuscatedDescription().c_str() );
852 return;
853 }
854
855 //------------------------------------------------------------------------
856 // Default - unrecognized/unsupported response, declare an error
857 //------------------------------------------------------------------------
858 default:
859 {
860 log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
861 "message %s", pUrl.GetHostId().c_str(),
862 rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
863 pStatus = Status( stError, errInvalidResponse );
864 HandleResponse();
865 return;
866 }
867 }
868
869 return;
870 }
871
872 //----------------------------------------------------------------------------
873 // Handle an event other that a message arrival - may be timeout
874 //----------------------------------------------------------------------------
876 XRootDStatus status )
877 {
878 Log *log = DefaultEnv::GetLog();
879 log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
880 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
881
882 if( event == Ready )
883 return 0;
884
885 if( pTimeoutFence.load( std::memory_order_relaxed ) )
886 return 0;
887
888 HandleError( status );
889 return RemoveHandler;
890 }
891
892 //----------------------------------------------------------------------------
893 // Read message body directly from a socket
894 //----------------------------------------------------------------------------
896 Socket *socket,
897 uint32_t &bytesRead )
898 {
899 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
900 uint16_t reqId = ntohs( req->header.requestid );
901
902 if( reqId == kXR_pgread )
903 return pPageReader->Read( *socket, bytesRead );
904
905 return pBodyReader->Read( *socket, bytesRead );
906 }
907
908 //----------------------------------------------------------------------------
909 // We're here when we requested sending something over the wire
910 // and there has been a status update on this action
911 //----------------------------------------------------------------------------
913 XRootDStatus status )
914 {
915 Log *log = DefaultEnv::GetLog();
916
917 const int sst = pSendingState.fetch_or( kSendDone );
918
919 if( sst & kFinalResp )
920 {
921 log->Dump( XRootDMsg, "[%s] Got late notification that outgoing message %s was "
922 "sent, already have final response, queuing handler callback.",
923 pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
924 HandleRspOrQueue();
925 return;
926 }
927
928 if( sst & kSawResp )
929 {
930 log->Dump( XRootDMsg, "[%s] Got late notification that message %s has "
931 "been successfully sent.",
932 pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
933 return;
934 }
935
936 //--------------------------------------------------------------------------
937 // We were successful, so we now need to listen for a response
938 //--------------------------------------------------------------------------
939 if( status.IsOK() )
940 {
941 log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
942 pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
943
944 pMsgInFly = true;
945 return;
946 }
947
948 //--------------------------------------------------------------------------
949 // We have failed, recover if possible
950 //--------------------------------------------------------------------------
951 log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
952 "recover.", pUrl.GetHostId().c_str(),
953 message->GetObfuscatedDescription().c_str() );
954 HandleError( status );
955 }
956
957 //----------------------------------------------------------------------------
958 // Are we a raw writer or not?
959 //----------------------------------------------------------------------------
961 {
962 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
963 uint16_t reqId = ntohs( req->header.requestid );
964 if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
965 return true;
966 // checkpoint + execute
967 if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
968 {
969 ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
970 reqId = ntohs( xeq->header.requestid );
971 return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
972 }
973
974 return false;
975 }
976
977 //----------------------------------------------------------------------------
978 // Write the message body
979 //----------------------------------------------------------------------------
981 uint32_t &bytesWritten )
982 {
983 //--------------------------------------------------------------------------
984 // First check if it is a PgWrite
985 //--------------------------------------------------------------------------
986 if( !pChunkList->empty() && !pCrc32cDigests.empty() )
987 {
988 //------------------------------------------------------------------------
989 // PgWrite will have just one chunk
990 //------------------------------------------------------------------------
991 ChunkInfo chunk = pChunkList->front();
992 //------------------------------------------------------------------------
993 // Calculate the size of the first and last page (in case the chunk is not
994 // 4KB aligned)
995 //------------------------------------------------------------------------
996 int fLen = 0, lLen = 0;
997 size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
998
999 //------------------------------------------------------------------------
1000 // Set the crc32c buffer if not ready yet
1001 //------------------------------------------------------------------------
1002 if( pPgWrtCksumBuff.GetCursor() == 0 )
1003 {
1004 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1005 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1006 }
1007
1008 uint32_t btsLeft = chunk.length - pAsyncOffset;
1009 uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1010 if( pglen > btsLeft ) pglen = btsLeft;
1011 char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
1012
1013 while( btsLeft > 0 )
1014 {
1015 // first write the crc32c digest
1016 while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
1017 {
1018 uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
1019 char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
1020 int btswrt = 0;
1021 Status st = socket->Send( dgstbuf, dgstlen, btswrt );
1022 if( !st.IsOK() ) return st;
1023 bytesWritten += btswrt;
1024 pPgWrtCksumBuff.AdvanceCursor( btswrt );
1025 if( st.code == suRetry ) return st;
1026 }
1027 // then write the raw data (one page)
1028 int btswrt = 0;
1029 Status st = socket->Send( pgbuf, pglen, btswrt );
1030 if( !st.IsOK() ) return st;
1031 pgbuf += btswrt;
1032 pglen -= btswrt;
1033 btsLeft -= btswrt;
1034 bytesWritten += btswrt;
1035 pAsyncOffset += btswrt; // update the offset to the raw data
1036 if( st.code == suRetry ) return st;
1037 // if we managed to write all the data ...
1038 if( pglen == 0 )
1039 {
1040 // move to the next page
1041 ++pPgWrtCurrentPageNb;
1042 if( pPgWrtCurrentPageNb < nbpgs )
1043 {
1044 // set the digest buffer
1045 pPgWrtCksumBuff.SetCursor( 0 );
1046 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1047 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1048 }
1049 // set the page length
1050 pglen = XrdSys::PageSize;
1051 if( pglen > btsLeft ) pglen = btsLeft;
1052 // reset offset in the current page
1053 pPgWrtCurrentPageOffset = 0;
1054 }
1055 else
1056 // otherwise just adjust the offset in the current page
1057 pPgWrtCurrentPageOffset += btswrt;
1058
1059 }
1060 }
1061 else if( !pChunkList->empty() )
1062 {
1063 size_t size = pChunkList->size();
1064 for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1065 {
1066 char *buffer = (char*)(*pChunkList)[i].buffer;
1067 uint32_t size = (*pChunkList)[i].length;
1068 size_t leftToBeWritten = size - pAsyncOffset;
1069
1070 while( leftToBeWritten )
1071 {
1072 int btswrt = 0;
1073 Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1074 bytesWritten += btswrt;
1075 if( !st.IsOK() || st.code == suRetry ) return st;
1076 pAsyncOffset += btswrt;
1077 leftToBeWritten -= btswrt;
1078 }
1079 //----------------------------------------------------------------------
1080 // Remember that we have moved to the next chunk, also clear the offset
1081 // within the buffer as we are going to move to a new one
1082 //----------------------------------------------------------------------
1083 ++pAsyncChunkIndex;
1084 pAsyncOffset = 0;
1085 }
1086 }
1087 else
1088 {
1089 Log *log = DefaultEnv::GetLog();
1090
1091 //------------------------------------------------------------------------
1092 // If the socket is encrypted we cannot use a kernel buffer, we have to
1093 // convert to user space buffer
1094 //------------------------------------------------------------------------
1095 if( socket->IsEncrypted() )
1096 {
1097 log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1098 pUrl.GetHostId().c_str() );
1099
1100 char *ubuff = 0;
1101 ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1102 if( ret < 0 ) return Status( stError, errInternal );
1103 pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1104 return WriteMessageBody( socket, bytesWritten );
1105 }
1106
1107 //------------------------------------------------------------------------
1108 // Send the data
1109 //------------------------------------------------------------------------
1110 while( !pKBuff->Empty() )
1111 {
1112 int btswrt = 0;
1113 Status st = socket->Send( *pKBuff, btswrt );
1114 bytesWritten += btswrt;
1115 if( !st.IsOK() || st.code == suRetry ) return st;
1116 }
1117
1118 log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1119 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1120 }
1121
1122 return Status();
1123 }
1124
1125 //----------------------------------------------------------------------------
1126 // We're here when we got a time event. We needed to re-issue the request
1127 // in some time in the future, and that moment has arrived
1128 //----------------------------------------------------------------------------
1130 {
1131 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1132 }
1133
1134 //----------------------------------------------------------------------------
1135 // Bookkeeping after partial response has been received.
1136 //----------------------------------------------------------------------------
1138 {
1139 pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1140 }
1141
1142 //----------------------------------------------------------------------------
1143 // Unpack the message and call the response handler
1144 //----------------------------------------------------------------------------
1145 void XRootDMsgHandler::HandleResponse()
1146 {
1147 //--------------------------------------------------------------------------
1148 // Is it a final response?
1149 //--------------------------------------------------------------------------
1150 bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
1151 if( finalrsp )
1152 {
1153 // Do not do final processing of the response if we haven't had
1154 // confirmation the original request was sent (via OnStatusReady).
1155 // The final processing will be triggered when we get the confirm.
1156 const int sst = pSendingState.fetch_or( kFinalResp );
1157 if( !( sst & kSendDone ) )
1158 return;
1159 }
1160
1161 //--------------------------------------------------------------------------
1162 // Process the response and notify the listener
1163 //--------------------------------------------------------------------------
1165 XRootDStatus *status = ProcessStatus();
1166 AnyObject *response = 0;
1167
1168 Log *log = DefaultEnv::GetLog();
1169 log->Debug( ExDbgMsg, "[%s] Calling MsgHandler: %p (message: %s ) "
1170 "with status: %s.",
1171 pUrl.GetHostId().c_str(), this,
1172 pRequest->GetObfuscatedDescription().c_str(),
1173 status->ToString().c_str() );
1174
1175 if( status->IsOK() )
1176 {
1177 Status st = ParseResponse( response );
1178 if( !st.IsOK() )
1179 {
1180 delete status;
1181 delete response;
1182 status = new XRootDStatus( st );
1183 response = 0;
1184 }
1185 }
1186
1187 //--------------------------------------------------------------------------
1188 // Close the redirect entry if necessary
1189 //--------------------------------------------------------------------------
1190 if( pRdirEntry )
1191 {
1192 pRdirEntry->status = *status;
1193 pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1194 }
1195
1196 //--------------------------------------------------------------------------
1197 // Release the stream id
1198 //--------------------------------------------------------------------------
1199 if( pSidMgr && finalrsp )
1200 {
1201 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1202 if( status->IsOK() || !pMsgInFly ||
1203 !( status->code == errOperationExpired || status->code == errOperationInterrupted ) )
1204 pSidMgr->ReleaseSID( req->header.streamid );
1205 }
1206
1207 HostList *hosts = pHosts.release();
1208 if( !finalrsp )
1209 pHosts.reset( new HostList( *hosts ) );
1210
1211 pResponseHandler->HandleResponseWithHosts( status, response, hosts );
1212
1213 //--------------------------------------------------------------------------
1214 // if it is the final response there is nothing more to do ...
1215 //--------------------------------------------------------------------------
1216 if( finalrsp )
1217 delete this;
1218 //--------------------------------------------------------------------------
1219 // on the other hand if it is not the final response, we have to keep the
1220 // MsgHandler and delete the current response
1221 //--------------------------------------------------------------------------
1222 else
1223 {
1224 XrdSysCondVarHelper lck( pCV );
1225 pResponse.reset();
1226 pTimeoutFence.store( false, std::memory_order_relaxed );
1227 pCV.Broadcast();
1228 }
1229 }
1230
1231
1232 //----------------------------------------------------------------------------
1233 // Extract the status information from the stuff that we got
1234 //----------------------------------------------------------------------------
1235 XRootDStatus *XRootDMsgHandler::ProcessStatus()
1236 {
1237 XRootDStatus *st = new XRootDStatus( pStatus );
1238 ServerResponse *rsp = 0;
1239 if( pResponse )
1240 rsp = (ServerResponse *)pResponse->GetBuffer();
1241
1242 if( !pStatus.IsOK() && rsp )
1243 {
1244 if( pStatus.code == errErrorResponse )
1245 {
1246 st->errNo = rsp->body.error.errnum;
1247 // omit the last character as the string returned from the server
1248 // (acording to protocol specs) should be null-terminated
1249 std::string errmsg( rsp->body.error.errmsg, rsp->hdr.dlen-5 );
1250 if( st->errNo == kXR_noReplicas && !pLastError.IsOK() )
1251 errmsg += " Last seen error: " + pLastError.ToString();
1252 st->SetErrorMessage( errmsg );
1253 }
1254 else if( pStatus.code == errRedirect )
1255 st->SetErrorMessage( pRedirectUrl );
1256 }
1257 return st;
1258 }
1259
1260 //------------------------------------------------------------------------
1261 // Parse the response and put it in an object that could be passed to
1262 // the user
1263 //------------------------------------------------------------------------
1264 Status XRootDMsgHandler::ParseResponse( AnyObject *&response )
1265 {
1266 if( !pResponse )
1267 return Status();
1268
1269 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
1270 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1271 Log *log = DefaultEnv::GetLog();
1272
1273 //--------------------------------------------------------------------------
1274 // Handle redirect as an answer
1275 //--------------------------------------------------------------------------
1276 if( rsp->hdr.status == kXR_redirect )
1277 {
1278 log->Error( XRootDMsg, "Internal Error: unable to process redirect" );
1279 return 0;
1280 }
1281
1282 Buffer buff;
1283 uint32_t length = 0;
1284 char *buffer = 0;
1285
1286 //--------------------------------------------------------------------------
1287 // We don't have any partial answers so pass what we have
1288 //--------------------------------------------------------------------------
1289 if( pPartialResps.empty() )
1290 {
1291 buffer = rsp->body.buffer.data;
1292 length = rsp->hdr.dlen;
1293 }
1294 //--------------------------------------------------------------------------
1295 // Partial answers, we need to glue them together before parsing
1296 //--------------------------------------------------------------------------
1297 else if( req->header.requestid != kXR_read &&
1298 req->header.requestid != kXR_readv )
1299 {
1300 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1301 {
1302 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1303 length += part->hdr.dlen;
1304 }
1305 length += rsp->hdr.dlen;
1306
1307 buff.Allocate( length );
1308 uint32_t offset = 0;
1309 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1310 {
1311 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1312 buff.Append( part->body.buffer.data, part->hdr.dlen, offset );
1313 offset += part->hdr.dlen;
1314 }
1315 buff.Append( rsp->body.buffer.data, rsp->hdr.dlen, offset );
1316 buffer = buff.GetBuffer();
1317 }
1318
1319 //--------------------------------------------------------------------------
1320 // Right, but what was the question?
1321 //--------------------------------------------------------------------------
1322 switch( req->header.requestid )
1323 {
1324 //------------------------------------------------------------------------
1325 // kXR_mv, kXR_truncate, kXR_rm, kXR_mkdir, kXR_rmdir, kXR_chmod,
1326 // kXR_ping, kXR_close, kXR_write, kXR_sync
1327 //------------------------------------------------------------------------
1328 case kXR_mv:
1329 case kXR_truncate:
1330 case kXR_rm:
1331 case kXR_mkdir:
1332 case kXR_rmdir:
1333 case kXR_chmod:
1334 case kXR_ping:
1335 case kXR_close:
1336 case kXR_write:
1337 case kXR_writev:
1338 case kXR_sync:
1339 case kXR_chkpoint:
1340 return Status();
1341
1342 //------------------------------------------------------------------------
1343 // kXR_locate
1344 //------------------------------------------------------------------------
1345 case kXR_locate:
1346 {
1347 AnyObject *obj = new AnyObject();
1348
1349 char *nullBuffer = new char[length+1];
1350 nullBuffer[length] = 0;
1351 memcpy( nullBuffer, buffer, length );
1352
1353 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1354 "LocateInfo: %s", pUrl.GetHostId().c_str(),
1355 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1356 LocationInfo *data = new LocationInfo();
1357
1358 if( data->ParseServerResponse( nullBuffer ) == false )
1359 {
1360 delete obj;
1361 delete data;
1362 delete [] nullBuffer;
1363 return Status( stError, errInvalidResponse );
1364 }
1365 delete [] nullBuffer;
1366
1367 obj->Set( data );
1368 response = obj;
1369 return Status();
1370 }
1371
1372 //------------------------------------------------------------------------
1373 // kXR_stat
1374 //------------------------------------------------------------------------
1375 case kXR_stat:
1376 {
1377 AnyObject *obj = new AnyObject();
1378
1379 //----------------------------------------------------------------------
1380 // Virtual File System stat (kXR_vfs)
1381 //----------------------------------------------------------------------
1382 if( req->stat.options & kXR_vfs )
1383 {
1384 StatInfoVFS *data = new StatInfoVFS();
1385
1386 char *nullBuffer = new char[length+1];
1387 nullBuffer[length] = 0;
1388 memcpy( nullBuffer, buffer, length );
1389
1390 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1391 "StatInfoVFS: %s", pUrl.GetHostId().c_str(),
1392 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1393
1394 if( data->ParseServerResponse( nullBuffer ) == false )
1395 {
1396 delete obj;
1397 delete data;
1398 delete [] nullBuffer;
1399 return Status( stError, errInvalidResponse );
1400 }
1401 delete [] nullBuffer;
1402
1403 obj->Set( data );
1404 }
1405 //----------------------------------------------------------------------
1406 // Normal stat
1407 //----------------------------------------------------------------------
1408 else
1409 {
1410 StatInfo *data = new StatInfo();
1411
1412 char *nullBuffer = new char[length+1];
1413 nullBuffer[length] = 0;
1414 memcpy( nullBuffer, buffer, length );
1415
1416 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as StatInfo: "
1417 "%s", pUrl.GetHostId().c_str(),
1418 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1419
1420 if( data->ParseServerResponse( nullBuffer ) == false )
1421 {
1422 delete obj;
1423 delete data;
1424 delete [] nullBuffer;
1425 return Status( stError, errInvalidResponse );
1426 }
1427 delete [] nullBuffer;
1428 obj->Set( data );
1429 }
1430
1431 response = obj;
1432 return Status();
1433 }
1434
1435 //------------------------------------------------------------------------
1436 // kXR_protocol
1437 //------------------------------------------------------------------------
1438 case kXR_protocol:
1439 {
1440 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ProtocolInfo",
1441 pUrl.GetHostId().c_str(),
1442 pRequest->GetObfuscatedDescription().c_str() );
1443
1444 if( rsp->hdr.dlen < 8 )
1445 {
1446 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
1447 pUrl.GetHostId().c_str() );
1448 return Status( stError, errInvalidResponse );
1449 }
1450
1451 AnyObject *obj = new AnyObject();
1452 ProtocolInfo *data = new ProtocolInfo( rsp->body.protocol.pval,
1453 rsp->body.protocol.flags );
1454 obj->Set( data );
1455 response = obj;
1456 return Status();
1457 }
1458
1459 //------------------------------------------------------------------------
1460 // kXR_dirlist
1461 //------------------------------------------------------------------------
1462 case kXR_dirlist:
1463 {
1464 AnyObject *obj = new AnyObject();
1465 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1466 "DirectoryList", pUrl.GetHostId().c_str(),
1467 pRequest->GetObfuscatedDescription().c_str() );
1468
1469 char *path = new char[req->dirlist.dlen+1];
1470 path[req->dirlist.dlen] = 0;
1471 memcpy( path, pRequest->GetBuffer(24), req->dirlist.dlen );
1472
1473 DirectoryList *data = new DirectoryList();
1474 data->SetParentName( path );
1475 delete [] path;
1476
1477 char *nullBuffer = new char[length+1];
1478 nullBuffer[length] = 0;
1479 memcpy( nullBuffer, buffer, length );
1480
1481 bool invalidrsp = false;
1482
1483 if( !pDirListStarted )
1484 {
1485 pDirListWithStat = DirectoryList::HasStatInfo( nullBuffer );
1486 pDirListStarted = true;
1487
1488 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer );
1489 }
1490 else
1491 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer, pDirListWithStat );
1492
1493 if( invalidrsp )
1494 {
1495 delete data;
1496 delete obj;
1497 delete [] nullBuffer;
1498 return Status( stError, errInvalidResponse );
1499 }
1500
1501 delete [] nullBuffer;
1502 obj->Set( data );
1503 response = obj;
1504 return Status();
1505 }
1506
1507 //------------------------------------------------------------------------
1508 // kXR_open - if we got the statistics, otherwise return 0
1509 //------------------------------------------------------------------------
1510 case kXR_open:
1511 {
1512 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as OpenInfo",
1513 pUrl.GetHostId().c_str(),
1514 pRequest->GetObfuscatedDescription().c_str() );
1515
1516 if( rsp->hdr.dlen < 4 )
1517 {
1518 log->Error( XRootDMsg, "[%s] Got invalid open response.",
1519 pUrl.GetHostId().c_str() );
1520 return Status( stError, errInvalidResponse );
1521 }
1522
1523 AnyObject *obj = new AnyObject();
1524 StatInfo *statInfo = 0;
1525
1526 //----------------------------------------------------------------------
1527 // Handle StatInfo if requested
1528 //----------------------------------------------------------------------
1529 if( req->open.options & kXR_retstat )
1530 {
1531 log->Dump( XRootDMsg, "[%s] Parsing StatInfo in response to %s",
1532 pUrl.GetHostId().c_str(),
1533 pRequest->GetObfuscatedDescription().c_str() );
1534
1535 if( rsp->hdr.dlen >= 12 )
1536 {
1537 char *nullBuffer = new char[rsp->hdr.dlen-11];
1538 nullBuffer[rsp->hdr.dlen-12] = 0;
1539 memcpy( nullBuffer, buffer+12, rsp->hdr.dlen-12 );
1540
1541 statInfo = new StatInfo();
1542 if( statInfo->ParseServerResponse( nullBuffer ) == false )
1543 {
1544 delete statInfo;
1545 statInfo = 0;
1546 }
1547 delete [] nullBuffer;
1548 }
1549
1550 if( rsp->hdr.dlen < 12 || !statInfo )
1551 {
1552 log->Error( XRootDMsg, "[%s] Unable to parse StatInfo in response "
1553 "to %s", pUrl.GetHostId().c_str(),
1554 pRequest->GetObfuscatedDescription().c_str() );
1555 delete obj;
1556 return Status( stError, errInvalidResponse );
1557 }
1558 }
1559
1560 OpenInfo *data = new OpenInfo( (uint8_t*)buffer,
1561 pResponse->GetSessionId(),
1562 statInfo );
1563 obj->Set( data );
1564 response = obj;
1565 return Status();
1566 }
1567
1568 //------------------------------------------------------------------------
1569 // kXR_read
1570 //------------------------------------------------------------------------
1571 case kXR_read:
1572 {
1573 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ChunkInfo",
1574 pUrl.GetHostId().c_str(),
1575 pRequest->GetObfuscatedDescription().c_str() );
1576
1577 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1578 {
1579 //--------------------------------------------------------------------
1580 // we are expecting to have only the header in the message, the raw
1581 // data have been readout into the user buffer
1582 //--------------------------------------------------------------------
1583 if( pPartialResps[i]->GetSize() > 8 )
1584 return Status( stOK, errInternal );
1585 }
1586 //----------------------------------------------------------------------
1587 // we are expecting to have only the header in the message, the raw
1588 // data have been readout into the user buffer
1589 //----------------------------------------------------------------------
1590 if( pResponse->GetSize() > 8 )
1591 return Status( stOK, errInternal );
1592 //----------------------------------------------------------------------
1593 // Get the response for the end user
1594 //----------------------------------------------------------------------
1595 return pBodyReader->GetResponse( response );
1596 }
1597
1598 //------------------------------------------------------------------------
1599 // kXR_pgread
1600 //------------------------------------------------------------------------
1601 case kXR_pgread:
1602 {
1603 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as PageInfo",
1604 pUrl.GetHostId().c_str(),
1605 pRequest->GetObfuscatedDescription().c_str() );
1606
1607 //----------------------------------------------------------------------
1608 // Glue in the cached responses if necessary
1609 //----------------------------------------------------------------------
1610 ChunkInfo chunk = pChunkList->front();
1611 bool sizeMismatch = false;
1612 uint32_t currentOffset = 0;
1613 char *cursor = (char*)chunk.buffer;
1614 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1615 {
1616 ServerResponseV2 *part = (ServerResponseV2*)pPartialResps[i]->GetBuffer();
1617
1618 //--------------------------------------------------------------------
1619 // the actual size of the raw data without the crc32c checksums
1620 //--------------------------------------------------------------------
1621 size_t datalen = part->status.bdy.dlen - NbPgPerRsp( part->info.pgread.offset,
1622 part->status.bdy.dlen ) * CksumSize;
1623
1624 if( currentOffset + datalen > chunk.length )
1625 {
1626 sizeMismatch = true;
1627 break;
1628 }
1629
1630 currentOffset += datalen;
1631 cursor += datalen;
1632 }
1633
1634 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
1635 size_t datalen = rspst->status.bdy.dlen - NbPgPerRsp( rspst->info.pgread.offset,
1636 rspst->status.bdy.dlen ) * CksumSize;
1637 if( currentOffset + datalen <= chunk.length )
1638 currentOffset += datalen;
1639 else
1640 sizeMismatch = true;
1641
1642 //----------------------------------------------------------------------
1643 // Overflow
1644 //----------------------------------------------------------------------
1645 if( pChunkStatus.front().sizeError || sizeMismatch )
1646 {
1647 log->Error( XRootDMsg, "[%s] Handling response to %s: user supplied "
1648 "buffer is too small for the received data.",
1649 pUrl.GetHostId().c_str(),
1650 pRequest->GetObfuscatedDescription().c_str() );
1651 return Status( stError, errInvalidResponse );
1652 }
1653
1654 AnyObject *obj = new AnyObject();
1655 PageInfo *pgInfo = new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1656 std::move( pCrc32cDigests) );
1657
1658 obj->Set( pgInfo );
1659 response = obj;
1660 return Status();
1661 }
1662
1663 //------------------------------------------------------------------------
1664 // kXR_pgwrite
1665 //------------------------------------------------------------------------
1666 case kXR_pgwrite:
1667 {
1668 std::vector<std::tuple<uint64_t, uint32_t>> retries;
1669
1670 ServerResponseV2 *rsp = (ServerResponseV2*)pResponse->GetBuffer();
1671 if( rsp->status.bdy.dlen > 0 )
1672 {
1673 ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) );
1674 size_t pgcnt = ( rsp->status.bdy.dlen - 8 ) / sizeof( kXR_int64 );
1675 retries.reserve( pgcnt );
1676 kXR_int64 *pgoffs = (kXR_int64*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) +
1677 sizeof( ServerResponseBody_pgWrCSE ) );
1678
1679 for( size_t i = 0; i < pgcnt; ++i )
1680 {
1681 uint32_t len = XrdSys::PageSize;
1682 if( i == 0 ) len = cse->dlFirst;
1683 else if( i == pgcnt - 1 ) len = cse->dlLast;
1684 retries.push_back( std::make_tuple( pgoffs[i], len ) );
1685 }
1686 }
1687
1688 RetryInfo *info = new RetryInfo( std::move( retries ) );
1689 AnyObject *obj = new AnyObject();
1690 obj->Set( info );
1691 response = obj;
1692
1693 return Status();
1694 }
1695
1696
1697 //------------------------------------------------------------------------
1698 // kXR_readv - we need to pass the length of the buffer to the user code
1699 //------------------------------------------------------------------------
1700 case kXR_readv:
1701 {
1702 log->Dump( XRootDMsg, "[%s] Parsing the response to %p as "
1703 "VectorReadInfo", pUrl.GetHostId().c_str(),
1704 pRequest->GetObfuscatedDescription().c_str() );
1705
1706 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1707 {
1708 //--------------------------------------------------------------------
1709 // we are expecting to have only the header in the message, the raw
1710 // data have been readout into the user buffer
1711 //--------------------------------------------------------------------
1712 if( pPartialResps[i]->GetSize() > 8 )
1713 return Status( stOK, errInternal );
1714 }
1715 //----------------------------------------------------------------------
1716 // we are expecting to have only the header in the message, the raw
1717 // data have been readout into the user buffer
1718 //----------------------------------------------------------------------
1719 if( pResponse->GetSize() > 8 )
1720 return Status( stOK, errInternal );
1721 //----------------------------------------------------------------------
1722 // Get the response for the end user
1723 //----------------------------------------------------------------------
1724 return pBodyReader->GetResponse( response );
1725 }
1726
1727 //------------------------------------------------------------------------
1728 // kXR_fattr
1729 //------------------------------------------------------------------------
1730 case kXR_fattr:
1731 {
1732 int len = rsp->hdr.dlen;
1733 char* data = rsp->body.buffer.data;
1734
1735 return ParseXAttrResponse( data, len, response );
1736 }
1737
1738 //------------------------------------------------------------------------
1739 // kXR_query
1740 //------------------------------------------------------------------------
1741 case kXR_query:
1742 case kXR_set:
1743 case kXR_prepare:
1744 default:
1745 {
1746 AnyObject *obj = new AnyObject();
1747 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as BinaryData",
1748 pUrl.GetHostId().c_str(),
1749 pRequest->GetObfuscatedDescription().c_str() );
1750
1751 BinaryDataInfo *data = new BinaryDataInfo();
1752 data->Allocate( length );
1753 data->Append( buffer, length );
1754 obj->Set( data );
1755 response = obj;
1756 return Status();
1757 }
1758 };
1759 return Status( stError, errInvalidMessage );
1760 }
1761
1762 //------------------------------------------------------------------------
1763 // Parse the response to kXR_fattr request and put it in an object that
1764 // could be passed to the user
1765 //------------------------------------------------------------------------
1766 Status XRootDMsgHandler::ParseXAttrResponse( char *data, size_t len,
1767 AnyObject *&response )
1768 {
1769 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1770// Log *log = DefaultEnv::GetLog(); //TODO
1771
1772 switch( req->fattr.subcode )
1773 {
1774 case kXR_fattrDel:
1775 case kXR_fattrSet:
1776 {
1777 Status status;
1778
1779 kXR_char nerrs = 0;
1780 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1781 return status;
1782
1783 kXR_char nattr = 0;
1784 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1785 return status;
1786
1787 std::vector<XAttrStatus> resp;
1788 // read the namevec
1789 for( kXR_char i = 0; i < nattr; ++i )
1790 {
1791 kXR_unt16 rc = 0;
1792 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1793 return status;
1794 rc = ntohs( rc );
1795
1796 // count errors
1797 if( rc ) --nerrs;
1798
1799 std::string name;
1800 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1801 return status;
1802
1803 XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1804 XRootDStatus();
1805 resp.push_back( XAttrStatus( name, st ) );
1806 }
1807
1808 // check if we read all the data and if the error count is OK
1809 if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1810
1811 // set up the response object
1812 response = new AnyObject();
1813 response->Set( new std::vector<XAttrStatus>( std::move( resp ) ) );
1814
1815 return Status();
1816 }
1817
1818 case kXR_fattrGet:
1819 {
1820 Status status;
1821
1822 kXR_char nerrs = 0;
1823 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1824 return status;
1825
1826 kXR_char nattr = 0;
1827 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1828 return status;
1829
1830 std::vector<XAttr> resp;
1831 resp.reserve( nattr );
1832
1833 // read the name vec
1834 for( kXR_char i = 0; i < nattr; ++i )
1835 {
1836 kXR_unt16 rc = 0;
1837 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1838 return status;
1839 rc = ntohs( rc );
1840
1841 // count errors
1842 if( rc ) --nerrs;
1843
1844 std::string name;
1845 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1846 return status;
1847
1848 XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1849 XRootDStatus();
1850 resp.push_back( XAttr( name, st ) );
1851 }
1852
1853 // read the value vec
1854 for( kXR_char i = 0; i < nattr; ++i )
1855 {
1856 kXR_int32 vlen = 0;
1857 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1858 return status;
1859 vlen = ntohl( vlen );
1860
1861 std::string value;
1862 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1863 return status;
1864
1865 resp[i].value.swap( value );
1866 }
1867
1868 // check if we read all the data and if the error count is OK
1869 if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1870
1871 // set up the response object
1872 response = new AnyObject();
1873 response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1874
1875 return Status();
1876 }
1877
1878 case kXR_fattrList:
1879 {
1880 Status status;
1881 std::vector<XAttr> resp;
1882
1883 while( len > 0 )
1884 {
1885 std::string name;
1886 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1887 return status;
1888
1889 kXR_int32 vlen = 0;
1890 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1891 return status;
1892 vlen = ntohl( vlen );
1893
1894 std::string value;
1895 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1896 return status;
1897
1898 resp.push_back( XAttr( name, value ) );
1899 }
1900
1901 // set up the response object
1902 response = new AnyObject();
1903 response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1904
1905 return Status();
1906 }
1907
1908 default:
1909 return Status( stError, errDataError );
1910 }
1911 }
1912
1913 //----------------------------------------------------------------------------
1914 // Perform the changes to the original request needed by the redirect
1915 // procedure - allocate new streamid, append redirection data and such
1916 //----------------------------------------------------------------------------
1917 Status XRootDMsgHandler::RewriteRequestRedirect( const URL &newUrl )
1918 {
1919 Log *log = DefaultEnv::GetLog();
1920
1921 Status st;
1922 // Append any "xrd.*" parameters present in newCgi so that any authentication
1923 // requirements are properly enforced
1924 const URL::ParamsMap &newCgi = newUrl.GetParams();
1925 std::string xrdCgi = "";
1926 std::ostringstream ossXrd;
1927 for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1928 {
1929 if( it->first.compare( 0, 4, "xrd." ) )
1930 continue;
1931 ossXrd << it->first << '=' << it->second << '&';
1932 }
1933
1934 xrdCgi = ossXrd.str();
1935 // Redirection URL containing also any original xrd.* opaque parameters
1936 XrdCl::URL authUrl;
1937
1938 if (xrdCgi.empty())
1939 {
1940 authUrl = newUrl;
1941 }
1942 else
1943 {
1944 std::string surl = newUrl.GetURL();
1945 (surl.find('?') == std::string::npos) ? (surl += '?') :
1946 ((*surl.rbegin() != '&') ? (surl += '&') : (surl += ""));
1947 surl += xrdCgi;
1948 if (!authUrl.FromString(surl))
1949 {
1950 std::string surlLog = surl;
1951 if( unlikely( log->GetLevel() >= Log::ErrorMsg ) ) {
1952 surlLog = obfuscateAuth(surlLog);
1953 }
1954 log->Error( XRootDMsg, "[%s] Failed to build redirection URL from data: %s",
1955 newUrl.GetHostId().c_str(), surl.c_str());
1956 return Status(stError, errInvalidRedirectURL);
1957 }
1958 }
1959
1960 //--------------------------------------------------------------------------
1961 // Rewrite particular requests
1962 //--------------------------------------------------------------------------
1964 MessageUtils::RewriteCGIAndPath( pRequest, newCgi, true, newUrl.GetPath() );
1966 return Status();
1967 }
1968
1969 //----------------------------------------------------------------------------
1970 // Some requests need to be rewritten also after getting kXR_wait
1971 //----------------------------------------------------------------------------
1972 Status XRootDMsgHandler::RewriteRequestWait()
1973 {
1974 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1975
1976 XRootDTransport::UnMarshallRequest( pRequest );
1977
1978 //------------------------------------------------------------------------
1979 // For kXR_locate and kXR_open request the kXR_refresh bit needs to be
1980 // turned off after wait
1981 //------------------------------------------------------------------------
1982 switch( req->header.requestid )
1983 {
1984 case kXR_locate:
1985 {
1986 uint16_t refresh = kXR_refresh;
1987 req->locate.options &= (~refresh);
1988 break;
1989 }
1990
1991 case kXR_open:
1992 {
1993 uint16_t refresh = kXR_refresh;
1994 req->locate.options &= (~refresh);
1995 break;
1996 }
1997 }
1998
1999 XRootDTransport::SetDescription( pRequest );
2000 XRootDTransport::MarshallRequest( pRequest );
2001 return Status();
2002 }
2003
2004 //----------------------------------------------------------------------------
2005 // Recover error
2006 //----------------------------------------------------------------------------
2007 void XRootDMsgHandler::HandleError( XRootDStatus status )
2008 {
2009 //--------------------------------------------------------------------------
2010 // If there was no error then do nothing
2011 //--------------------------------------------------------------------------
2012 if( status.IsOK() )
2013 return;
2014
2015 if( pSidMgr && pMsgInFly && (
2016 status.code == errOperationExpired ||
2017 status.code == errOperationInterrupted ) )
2018 {
2019 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2020 pSidMgr->TimeOutSID( req->header.streamid );
2021 }
2022
2023 bool noreplicas = ( status.code == errErrorResponse &&
2024 status.errNo == kXR_noReplicas );
2025
2026 if( !noreplicas ) pLastError = status;
2027
2028 Log *log = DefaultEnv::GetLog();
2029 log->Debug( XRootDMsg, "[%s] Handling error while processing %s: %s.",
2030 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str(),
2031 status.ToString().c_str() );
2032
2033 //--------------------------------------------------------------------------
2034 // Check if it is a fatal TLS error that has been marked as potentially
2035 // recoverable, if yes check if we can downgrade from fatal to error.
2036 //--------------------------------------------------------------------------
2037 if( status.IsFatal() && status.code == errTlsError && status.errNo == EAGAIN )
2038 {
2039 if( pSslErrCnt < MaxSslErrRetry )
2040 {
2041 status.status &= ~stFatal; // switch off fatal&error bits
2042 status.status |= stError; // switch on error bit
2043 }
2044 ++pSslErrCnt; // count number of consecutive SSL errors
2045 }
2046 else
2047 pSslErrCnt = 0;
2048
2049 //--------------------------------------------------------------------------
2050 // We have got an error message, we can recover it at the load balancer if:
2051 // 1) we haven't got it from the load balancer
2052 // 2) we have a load balancer assigned
2053 // 3) the error is either one of: kXR_FSError, kXR_IOError, kXR_ServerError,
2054 // kXR_NotFound
2055 // 4) in the case of kXR_NotFound a kXR_refresh flags needs to be set
2056 //--------------------------------------------------------------------------
2057 if( status.code == errErrorResponse )
2058 {
2059 if( RetriableErrorResponse( status ) )
2060 {
2061 UpdateTriedCGI(status.errNo);
2062 if( status.errNo == kXR_NotFound || status.errNo == kXR_Overloaded )
2063 SwitchOnRefreshFlag();
2064 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2065 return;
2066 }
2067 else
2068 {
2069 pStatus = status;
2070 HandleRspOrQueue();
2071 return;
2072 }
2073 }
2074
2075 //--------------------------------------------------------------------------
2076 // Nothing can be done if:
2077 // 1) a user timeout has occurred
2078 // 2) has a non-zero session id
2079 // 3) if another error occurred and the validity of the message expired
2080 //--------------------------------------------------------------------------
2081 if( status.code == errOperationExpired || pRequest->GetSessionId() ||
2082 status.code == errOperationInterrupted || time(0) >= pExpiration )
2083 {
2084 log->Error( XRootDMsg, "[%s] Unable to get the response to request %s",
2085 pUrl.GetHostId().c_str(),
2086 pRequest->GetObfuscatedDescription().c_str() );
2087 pStatus = status;
2088 HandleRspOrQueue();
2089 return;
2090 }
2091
2092 //--------------------------------------------------------------------------
2093 // At this point we're left with connection errors, we recover them
2094 // at a load balancer if we have one and if not on the current server
2095 // until we get a response, an unrecoverable error or a timeout
2096 //--------------------------------------------------------------------------
2097 if( pLoadBalancer.url.IsValid() &&
2098 pLoadBalancer.url.GetLocation() != pUrl.GetLocation() )
2099 {
2100 UpdateTriedCGI( kXR_ServerError );
2101 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2102 return;
2103 }
2104 else
2105 {
2106 if( !status.IsFatal() && IsRetriable() )
2107 {
2108 log->Info( XRootDMsg, "[%s] Retrying request: %s.",
2109 pUrl.GetHostId().c_str(),
2110 pRequest->GetObfuscatedDescription().c_str() );
2111
2112 UpdateTriedCGI( kXR_ServerError );
2113 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryRetry ) );
2114 return;
2115 }
2116 pStatus = status;
2117 HandleRspOrQueue();
2118 return;
2119 }
2120 }
2121
2122 //----------------------------------------------------------------------------
2123 // Retry the message at another server
2124 //----------------------------------------------------------------------------
2125 Status XRootDMsgHandler::RetryAtServer( const URL &url, RedirectEntry::Type entryType )
2126 {
2127 pResponse.reset();
2128 Log *log = DefaultEnv::GetLog();
2129
2130 //--------------------------------------------------------------------------
2131 // Set up a redirect entry
2132 //--------------------------------------------------------------------------
2133 if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2134 pRdirEntry.reset( new RedirectEntry( pUrl.GetLocation(), url.GetLocation(), entryType ) );
2135
2136 if( pUrl.GetLocation() != url.GetLocation() )
2137 {
2138 pHosts->push_back( url );
2139
2140 //------------------------------------------------------------------------
2141 // Assign a new stream id to the message
2142 //------------------------------------------------------------------------
2143
2144 // first release the old stream id
2145 // (though it could be a redirect from a local
2146 // metalink file, in this case there's no SID)
2147 ClientRequestHdr *req = (ClientRequestHdr*)pRequest->GetBuffer();
2148 if( pSidMgr )
2149 {
2150 pSidMgr->ReleaseSID( req->streamid );
2151 pSidMgr.reset();
2152 }
2153
2154 // then get the new SIDManager
2155 // (again this could be a redirect to a local
2156 // file and in this case there is no SID)
2157 if( !url.IsLocalFile() )
2158 {
2159 pSidMgr = SIDMgrPool::Instance().GetSIDMgr( url );
2160 Status st = pSidMgr->AllocateSID( req->streamid );
2161 if( !st.IsOK() )
2162 {
2163 log->Error( XRootDMsg, "[%s] Impossible to send message %s.",
2164 pUrl.GetHostId().c_str(),
2165 pRequest->GetObfuscatedDescription().c_str() );
2166 return st;
2167 }
2168 }
2169
2170 pUrl = url;
2171 }
2172
2173 if( pUrl.IsMetalink() && pFollowMetalink )
2174 {
2175 log->Debug( ExDbgMsg, "[%s] Metaling redirection for MsgHandler: %p (message: %s ).",
2176 pUrl.GetHostId().c_str(), this,
2177 pRequest->GetObfuscatedDescription().c_str() );
2178
2179 return pPostMaster->Redirect( pUrl, pRequest, this );
2180 }
2181 else if( pUrl.IsLocalFile() )
2182 {
2183 HandleLocalRedirect( &pUrl );
2184 return Status();
2185 }
2186 else
2187 {
2188 log->Debug( ExDbgMsg, "[%s] Retry at server MsgHandler: %p (message: %s ).",
2189 pUrl.GetHostId().c_str(), this,
2190 pRequest->GetObfuscatedDescription().c_str() );
2191 return pPostMaster->Send( pUrl, pRequest, this, true, pExpiration );
2192 }
2193 }
2194
2195 //----------------------------------------------------------------------------
2196 // Update the "tried=" part of the CGI of the current message
2197 //----------------------------------------------------------------------------
2198 void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2199 {
2200 URL::ParamsMap cgi;
2201 std::string tried;
2202
2203 //--------------------------------------------------------------------------
2204 // In case a data server responded with a kXR_redirect and we fail at the
2205 // node where we were redirected to, the original data server should be
2206 // included in the tried CGI opaque info (instead of the current one).
2207 //--------------------------------------------------------------------------
2208 if( pEffectiveDataServerUrl )
2209 {
2210 tried = pEffectiveDataServerUrl->GetHostName();
2211 delete pEffectiveDataServerUrl;
2212 pEffectiveDataServerUrl = 0;
2213 }
2214 //--------------------------------------------------------------------------
2215 // Otherwise use the current URL.
2216 //--------------------------------------------------------------------------
2217 else
2218 tried = pUrl.GetHostName();
2219
2220 // Report the reason for the failure to the next location
2221 //
2222 if (errNo)
2223 { if (errNo == kXR_NotFound) cgi["triedrc"] = "enoent";
2224 else if (errNo == kXR_IOError) cgi["triedrc"] = "ioerr";
2225 else if (errNo == kXR_FSError) cgi["triedrc"] = "fserr";
2226 else if (errNo == kXR_ServerError) cgi["triedrc"] = "srverr";
2227 }
2228
2229 //--------------------------------------------------------------------------
2230 // If our current load balancer is a metamanager and we failed either
2231 // at a diskserver or at an unidentified node we also exclude the last
2232 // known manager
2233 //--------------------------------------------------------------------------
2234 if( pLoadBalancer.url.IsValid() && (pLoadBalancer.flags & kXR_attrMeta) )
2235 {
2236 HostList::reverse_iterator it;
2237 for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2238 {
2239 if( it->loadBalancer )
2240 break;
2241
2242 tried += "," + it->url.GetHostName();
2243
2244 if( it->flags & kXR_isManager )
2245 break;
2246 }
2247 }
2248
2249 cgi["tried"] = tried;
2250 XRootDTransport::UnMarshallRequest( pRequest );
2251 MessageUtils::RewriteCGIAndPath( pRequest, cgi, false, "" );
2252 XRootDTransport::MarshallRequest( pRequest );
2253 }
2254
2255 //----------------------------------------------------------------------------
2256 // Switch on the refresh flag for some requests
2257 //----------------------------------------------------------------------------
2258 void XRootDMsgHandler::SwitchOnRefreshFlag()
2259 {
2260 XRootDTransport::UnMarshallRequest( pRequest );
2261 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2262 switch( req->header.requestid )
2263 {
2264 case kXR_locate:
2265 {
2266 req->locate.options |= kXR_refresh;
2267 break;
2268 }
2269
2270 case kXR_open:
2271 {
2272 req->locate.options |= kXR_refresh;
2273 break;
2274 }
2275 }
2276 XRootDTransport::SetDescription( pRequest );
2277 XRootDTransport::MarshallRequest( pRequest );
2278 }
2279
2280 //------------------------------------------------------------------------
2281 // If the current thread is a worker thread from our thread-pool
2282 // handle the response, otherwise submit a new task to the thread-pool
2283 //------------------------------------------------------------------------
2284 void XRootDMsgHandler::HandleRspOrQueue()
2285 {
2286 //--------------------------------------------------------------------------
2287 // Is it a final response?
2288 //--------------------------------------------------------------------------
2289 bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
2290 if( finalrsp )
2291 {
2292 // Do not do final processing of the response if we haven't had
2293 // confirmation the original request was sent (via OnStatusReady).
2294 // The final processing will be triggered when we get the confirm.
2295 const int sst = pSendingState.fetch_or( kFinalResp );
2296 if( !( sst & kSendDone ) )
2297 return;
2298 }
2299
2300 JobManager *jobMgr = pPostMaster->GetJobManager();
2301 if( jobMgr->IsWorker() )
2302 HandleResponse();
2303 else
2304 {
2305 Log *log = DefaultEnv::GetLog();
2306 log->Debug( ExDbgMsg, "[%s] Passing to the thread-pool MsgHandler: %p (message: %s ).",
2307 pUrl.GetHostId().c_str(), this,
2308 pRequest->GetObfuscatedDescription().c_str() );
2309 jobMgr->QueueJob( new HandleRspJob( this ), 0 );
2310 }
2311 }
2312
2313 //------------------------------------------------------------------------
2314 // Notify the FileStateHandler to retry Open() with new URL
2315 //------------------------------------------------------------------------
2316 void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2317 {
2318 Log *log = DefaultEnv::GetLog();
2319 log->Debug( ExDbgMsg, "[%s] Handling local redirect - MsgHandler: %p (message: %s ).",
2320 pUrl.GetHostId().c_str(), this,
2321 pRequest->GetObfuscatedDescription().c_str() );
2322
2323 if( !pLFileHandler )
2324 {
2325 HandleError( XRootDStatus( stFatal, errNotSupported ) );
2326 return;
2327 }
2328
2329 AnyObject *resp = 0;
2330 pLFileHandler->SetHostList( *pHosts );
2331 XRootDStatus st = pLFileHandler->Open( url, pRequest, resp );
2332 if( !st.IsOK() )
2333 {
2334 HandleError( st );
2335 return;
2336 }
2337
2338 pResponseHandler->HandleResponseWithHosts( new XRootDStatus(),
2339 resp,
2340 pHosts.release() );
2341 delete this;
2342
2343 return;
2344 }
2345
2346 //------------------------------------------------------------------------
2347 // Check if it is OK to retry this request
2348 //------------------------------------------------------------------------
2349 bool XRootDMsgHandler::IsRetriable()
2350 {
2351 std::string value;
2352 DefaultEnv::GetEnv()->GetString( "OpenRecovery", value );
2353 if( value == "true" ) return true;
2354
2355 // check if it is a mutable open (open + truncate or open + create)
2356 ClientRequest *req = reinterpret_cast<ClientRequest*>( pRequest->GetBuffer() );
2357 if( req->header.requestid == htons( kXR_open ) )
2358 {
2359 bool _mutable = ( req->open.options & htons( kXR_delete ) ) ||
2360 ( req->open.options & htons( kXR_new ) );
2361
2362 if( _mutable )
2363 {
2364 Log *log = DefaultEnv::GetLog();
2365 log->Debug( XRootDMsg,
2366 "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2367 pUrl.GetHostId().c_str(),
2368 pRequest->GetObfuscatedDescription().c_str() );
2369 // disallow retry if it is a mutable open
2370 return false;
2371 }
2372 }
2373
2374 return true;
2375 }
2376
2377 //------------------------------------------------------------------------
2378 // Check if for given request and Metalink redirector it is OK to omit
2379 // the kXR_wait and proceed straight to the next entry in the Metalink file
2380 //------------------------------------------------------------------------
2381 bool XRootDMsgHandler::OmitWait( Message &request, const URL &url )
2382 {
2383 // we can omit kXR_wait only if we have a Metalink redirector
2384 if( !url.IsMetalink() )
2385 return false;
2386
2387 // we can omit kXR_wait only for requests that can be redirected
2388 // (kXR_read is the only stateful request that can be redirected)
2389 ClientRequest *req = reinterpret_cast<ClientRequest*>( request.GetBuffer() );
2390 if( pStateful && req->header.requestid != kXR_read )
2391 return false;
2392
2393 // we can only omit kXR_wait if the Metalink redirect has more
2394 // replicas
2395 RedirectorRegistry &registry = RedirectorRegistry::Instance();
2396 VirtualRedirector *redirector = registry.Get( url );
2397
2398 // we need more than one server as the current one is not reflected
2399 // in tried CGI
2400 if( redirector->Count( request ) > 1 )
2401 return true;
2402
2403 return false;
2404 }
2405
2406 //------------------------------------------------------------------------
2407 // Checks if the given error returned by server is retriable.
2408 //------------------------------------------------------------------------
2409 bool XRootDMsgHandler::RetriableErrorResponse( const Status &status )
2410 {
2411 // we can only retry error response if we have a valid load-balancer and
2412 // it is not our current URL
2413 if( !( pLoadBalancer.url.IsValid() &&
2414 pUrl.GetLocation() != pLoadBalancer.url.GetLocation() ) )
2415 return false;
2416
2417 // following errors are retriable at any load-balancer
2418 if( status.errNo == kXR_FSError || status.errNo == kXR_IOError ||
2419 status.errNo == kXR_ServerError || status.errNo == kXR_NotFound ||
2420 status.errNo == kXR_Overloaded || status.errNo == kXR_NoMemory )
2421 return true;
2422
2423 // check if the load-balancer is a meta-manager, if yes there are
2424 // more errors that can be recovered
2425 if( !( pLoadBalancer.flags & kXR_attrMeta ) ) return false;
2426
2427 // those errors are retriable for meta-managers
2428 if( status.errNo == kXR_Unsupported || status.errNo == kXR_FileLocked )
2429 return true;
2430
2431 // in case of not-authorized error there is an imposed upper limit
2432 // on how many times we can retry this error
2433 if( status.errNo == kXR_NotAuthorized )
2434 {
2436 DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", limit );
2437 bool ret = pNotAuthorizedCounter < limit;
2438 ++pNotAuthorizedCounter;
2439 if( !ret )
2440 {
2441 Log *log = DefaultEnv::GetLog();
2442 log->Error( XRootDMsg,
2443 "[%s] Reached limit of NotAuthorized retries!",
2444 pUrl.GetHostId().c_str() );
2445 }
2446 return ret;
2447 }
2448
2449 // check if the load-balancer is a virtual (metalink) redirector,
2450 // if yes there are even more errors that can be recovered
2451 if( !( pLoadBalancer.flags & kXR_attrVirtRdr ) ) return false;
2452
2453 // those errors are retriable for virtual (metalink) redirectors
2454 if( status.errNo == kXR_noserver || status.errNo == kXR_ArgTooLong )
2455 return true;
2456
2457 // otherwise it is a non-retriable error
2458 return false;
2459 }
2460
2461 //------------------------------------------------------------------------
2462 // Dump the redirect-trace-back into the log file
2463 //------------------------------------------------------------------------
2464 void XRootDMsgHandler::DumpRedirectTraceBack()
2465 {
2466 if( pRedirectTraceBack.empty() ) return;
2467
2468 std::stringstream sstrm;
2469
2470 sstrm << "Redirect trace-back:\n";
2471
2472 int counter = 0;
2473
2474 auto itr = pRedirectTraceBack.begin();
2475 sstrm << '\t' << counter << ". " << (*itr)->ToString() << '\n';
2476
2477 auto prev = itr;
2478 ++itr;
2479 ++counter;
2480
2481 for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2482 sstrm << '\t' << counter << ". "
2483 << (*itr)->ToString( (*prev)->status.IsOK() ) << '\n';
2484
2485 int authlimit = DefaultNotAuthorizedRetryLimit;
2486 DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", authlimit );
2487
2488 bool warn = !pStatus.IsOK() &&
2489 ( pStatus.code == errNotFound ||
2490 pStatus.code == errRedirectLimit ||
2491 ( pStatus.code == errAuthFailed && pNotAuthorizedCounter >= authlimit ) );
2492
2493 Log *log = DefaultEnv::GetLog();
2494 if( warn )
2495 log->Warning( XRootDMsg, "%s", sstrm.str().c_str() );
2496 else
2497 log->Debug( XRootDMsg, "%s", sstrm.str().c_str() );
2498 }
2499
2500 // Read data from buffer
2501 //------------------------------------------------------------------------
2502 template<typename T>
2503 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, T& result )
2504 {
2505 if( sizeof( T ) > buflen ) return Status( stError, errDataError );
2506
2507 memcpy(&result, buffer, sizeof(T));
2508
2509 buffer += sizeof( T );
2510 buflen -= sizeof( T );
2511
2512 return Status();
2513 }
2514
2515 //------------------------------------------------------------------------
2516 // Read a string from buffer
2517 //------------------------------------------------------------------------
2518 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result )
2519 {
2520 Status status;
2521 char c = 0;
2522
2523 while( true )
2524 {
2525 if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2526 return status;
2527
2528 if( c == 0 ) break;
2529 result += c;
2530 }
2531
2532 return status;
2533 }
2534
2535 //------------------------------------------------------------------------
2536 // Read a string from buffer
2537 //------------------------------------------------------------------------
2538 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen,
2539 size_t size, std::string &result )
2540 {
2541 Status status;
2542
2543 if( size > buflen ) return Status( stError, errDataError );
2544
2545 result.append( buffer, size );
2546 buffer += size;
2547 buflen -= size;
2548
2549 return status;
2550 }
2551
2552}
@ kXR_NotAuthorized
@ kXR_NotFound
@ kXR_FileLocked
Definition XProtocol.hh:993
@ kXR_noReplicas
@ kXR_Unsupported
@ kXR_ServerError
@ kXR_Overloaded
@ kXR_ArgTooLong
Definition XProtocol.hh:992
@ kXR_noserver
@ kXR_IOError
Definition XProtocol.hh:997
@ kXR_FSError
Definition XProtocol.hh:995
@ kXR_NoMemory
Definition XProtocol.hh:998
#define kXR_isManager
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
struct ClientFattrRequest fattr
Definition XProtocol.hh:854
#define kXR_collapseRedir
ServerResponseStatus status
#define kXR_attrMeta
union ServerResponse::@040373375333017131300127053271011057331004327334 body
kXR_char streamid[2]
Definition XProtocol.hh:156
kXR_char streamid[2]
Definition XProtocol.hh:914
kXR_unt16 options
Definition XProtocol.hh:481
struct ClientDirlistRequest dirlist
Definition XProtocol.hh:852
static const int kXR_ckpXeq
Definition XProtocol.hh:216
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_refresh
Definition XProtocol.hh:459
@ kXR_new
Definition XProtocol.hh:455
@ kXR_retstat
Definition XProtocol.hh:463
struct ClientOpenRequest open
Definition XProtocol.hh:860
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_status
Definition XProtocol.hh:907
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_attn
Definition XProtocol.hh:901
@ kXR_wait
Definition XProtocol.hh:905
@ kXR_error
Definition XProtocol.hh:903
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
Definition XProtocol.hh:846
#define kXR_recoverWrts
union ServerResponseV2::@207342300141235315373173036347114307032363217365 info
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_dirlist
Definition XProtocol.hh:116
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_set
Definition XProtocol.hh:130
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_protocol
Definition XProtocol.hh:118
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_ping
Definition XProtocol.hh:123
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_locate
Definition XProtocol.hh:139
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
@ kXR_prepare
Definition XProtocol.hh:133
#define kXR_isServer
#define kXR_attrVirtRdr
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:849
struct ServerResponseHeader hdr
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
@ kXR_vfs
Definition XProtocol.hh:763
struct ClientStatRequest stat
Definition XProtocol.hh:873
struct ClientProtocolRequest protocol
Definition XProtocol.hh:865
#define kXR_ecRedir
struct ClientLocateRequest locate
Definition XProtocol.hh:856
ServerResponseHeader hdr
long long kXR_int64
Definition XPtypes.hh:98
int kXR_int32
Definition XPtypes.hh:89
unsigned short kXR_unt16
Definition XPtypes.hh:67
unsigned char kXR_char
Definition XPtypes.hh:65
void Get(Type &object)
Retrieve the object being held.
Object for reading out data from the PgRead response.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
virtual void Run(void *arg)
The job logic.
HandleRspJob(XrdCl::XRootDMsgHandler *handler)
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
A network socket.
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
const std::string & GetPassword() const
Get the password.
Definition XrdClURL.hh:153
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
Definition XrdClURL.cc:62
void SetPassword(const std::string &password)
Set the password.
Definition XrdClURL.hh:161
void SetParams(const std::string &params)
Set params.
Definition XrdClURL.cc:402
const std::string & GetUserName() const
Get the username.
Definition XrdClURL.hh:135
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
bool IsLocalFile() const
Definition XrdClURL.cc:474
void SetProtocol(const std::string &protocol)
Set protocol.
Definition XrdClURL.hh:126
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:452
void SetUserName(const std::string &userName)
Set the username.
Definition XrdClURL.hh:143
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Handle/Process/Forward XRootD messages.
virtual uint16_t InspectStatusRsp() override
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
const Message * GetRequest() const
Get the request pointer.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
virtual void Process() override
Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
virtual bool IsRaw() const override
Are we a raw writer or not?
const std::string & GetErrorMessage() const
Get error message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
const uint16_t errRedirectLimit
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errNotFound
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint64_t ExDbgMsg
const uint16_t errInvalidResponse
const uint16_t errInvalidRedirectURL
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
const uint16_t suContinue
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
Response NullRef< Response >::value
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
XrdSysError Log
Definition XrdConfig.cc:113
@ kXR_PartialResult
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version