XRootD
Loading...
Searching...
No Matches
XrdClOperations.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3// Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4// Michal Simon <michal.simon@cern.ch>
5//------------------------------------------------------------------------------
6// This file is part of the XRootD software suite.
7//
8// XRootD is free software: you can redistribute it and/or modify
9// it under the terms of the GNU Lesser General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12//
13// XRootD is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16// GNU General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public License
19// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20//
21// In applying this licence, CERN does not waive the privileges and immunities
22// granted to it by virtue of its status as an Intergovernmental Organization
23// or submit itself to any jurisdiction.
24//------------------------------------------------------------------------------
25
26#ifndef __XRD_CL_OPERATIONS_HH__
27#define __XRD_CL_OPERATIONS_HH__
28
29#include <memory>
30#include <stdexcept>
31#include <sstream>
32#include <tuple>
33#include <future>
36#include "XrdCl/XrdClArg.hh"
40
45
46namespace XrdCl
47{
48
49 class Pipeline;
50 class PipelineHandler;
51
52 //----------------------------------------------------------------------------
58 //----------------------------------------------------------------------------
59 template<bool HasHndl>
61 {
62 // Declare friendship between templates
63 template<bool>
64 friend class Operation;
65
66 friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
67
68 friend class Pipeline;
69 friend class PipelineHandler;
70
71 public:
72
73 //------------------------------------------------------------------------
75 //------------------------------------------------------------------------
76 Operation() : valid( true )
77 {
78 }
79
80 //------------------------------------------------------------------------
82 //------------------------------------------------------------------------
83 template<bool from>
85 handler( std::move( op.handler ) ), valid( true )
86 {
87 if( !op.valid ) throw std::invalid_argument( "Cannot construct "
88 "Operation from an invalid Operation!" );
89 op.valid = false;
90 }
91
92 //------------------------------------------------------------------------
94 //------------------------------------------------------------------------
95 virtual ~Operation()
96 {
97 }
98
99 //------------------------------------------------------------------------
101 //------------------------------------------------------------------------
102 virtual std::string ToString() = 0;
103
104 //------------------------------------------------------------------------
108 //------------------------------------------------------------------------
109 virtual Operation<HasHndl>* Move() = 0;
110
111 //------------------------------------------------------------------------
116 //------------------------------------------------------------------------
118
119 protected:
120
121 //------------------------------------------------------------------------
126 //------------------------------------------------------------------------
127 void Run( Timeout timeout,
128 std::promise<XRootDStatus> prms,
129 std::function<void(const XRootDStatus&)> final );
130
131 //------------------------------------------------------------------------
137 //------------------------------------------------------------------------
138 virtual XRootDStatus RunImpl( PipelineHandler *handler, uint16_t timeout ) = 0;
139
140 //------------------------------------------------------------------------
144 //------------------------------------------------------------------------
146
147 //------------------------------------------------------------------------
149 //------------------------------------------------------------------------
150 std::unique_ptr<PipelineHandler> handler;
151;
152 //------------------------------------------------------------------------
154 //------------------------------------------------------------------------
155 bool valid;
156 };
157
158 //----------------------------------------------------------------------------
160 //----------------------------------------------------------------------------
161 typedef std::function<Operation<true>*(const XRootDStatus&)> rcvry_func;
162
163 //----------------------------------------------------------------------------
166 //----------------------------------------------------------------------------
168 {
169 template<bool> friend class Operation;
170
171 public:
172
173 //------------------------------------------------------------------------
177 //------------------------------------------------------------------------
179
180 //------------------------------------------------------------------------
182 //------------------------------------------------------------------------
184 {
185 }
186
187 //------------------------------------------------------------------------
189 //------------------------------------------------------------------------
190 void HandleResponseWithHosts( XRootDStatus *status, AnyObject *response,
191 HostList *hostList );
192
193 //------------------------------------------------------------------------
195 //------------------------------------------------------------------------
196 void HandleResponse( XRootDStatus *status, AnyObject *response );
197
198 //------------------------------------------------------------------------
200 //------------------------------------------------------------------------
202 {
203 }
204
205 //------------------------------------------------------------------------
209 //------------------------------------------------------------------------
210 void AddOperation( Operation<true> *operation );
211
212 //------------------------------------------------------------------------
219 //------------------------------------------------------------------------
220 void Assign( const Timeout &timeout,
221 std::promise<XRootDStatus> prms,
222 std::function<void(const XRootDStatus&)> final,
223 Operation<true> *opr );
224
225 //------------------------------------------------------------------------
227 //------------------------------------------------------------------------
228 void Assign( std::function<void(const XRootDStatus&)> final );
229
230 //------------------------------------------------------------------------
232 //------------------------------------------------------------------------
234
235 private:
236
237 //------------------------------------------------------------------------
239 //------------------------------------------------------------------------
240 void HandleResponseImpl( XRootDStatus *status, AnyObject *response,
241 HostList *hostList = nullptr );
242
243 inline void dealloc( XRootDStatus *status, AnyObject *response,
244 HostList *hostList )
245 {
246 delete status;
247 delete response;
248 delete hostList;
249 }
250
251 //------------------------------------------------------------------------
253 //------------------------------------------------------------------------
254 std::unique_ptr<ResponseHandler> responseHandler;
255
256 //------------------------------------------------------------------------
258 //------------------------------------------------------------------------
259 std::unique_ptr<Operation<true>> currentOperation;
260
261 //------------------------------------------------------------------------
263 //------------------------------------------------------------------------
264 std::unique_ptr<Operation<true>> nextOperation;
265
266 //------------------------------------------------------------------------
268 //------------------------------------------------------------------------
269 Timeout timeout;
270
271 //------------------------------------------------------------------------
273 //------------------------------------------------------------------------
274 std::promise<XRootDStatus> prms;
275
276 //------------------------------------------------------------------------
279 //------------------------------------------------------------------------
280 std::function<void(const XRootDStatus&)> final;
281 };
282
283 //----------------------------------------------------------------------------
289 //----------------------------------------------------------------------------
291 {
292 template<bool> friend class ParallelOperation;
293 friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
294 friend class PipelineHandler;
295
296 public:
297
298 //------------------------------------------------------------------------
300 //------------------------------------------------------------------------
302 {
303 }
304
305 //------------------------------------------------------------------------
307 //------------------------------------------------------------------------
309 operation( op->Move() )
310 {
311 }
312
313 //------------------------------------------------------------------------
315 //------------------------------------------------------------------------
317 operation( op.Move() )
318 {
319 }
320
321 //------------------------------------------------------------------------
323 //------------------------------------------------------------------------
325 operation( op.Move() )
326 {
327 }
328
330 operation( op->ToHandled() )
331 {
332 }
333
334 //------------------------------------------------------------------------
336 //------------------------------------------------------------------------
338 operation( op.ToHandled() )
339 {
340 }
341
342 //------------------------------------------------------------------------
344 //------------------------------------------------------------------------
346 operation( op.ToHandled() )
347 {
348 }
349
350 Pipeline( Pipeline &&pipe ) :
351 operation( std::move( pipe.operation ) )
352 {
353 }
354
355 //------------------------------------------------------------------------
357 //------------------------------------------------------------------------
359 {
360 operation = std::move( pipe.operation );
361 return *this;
362 }
363
364 //------------------------------------------------------------------------
366 //------------------------------------------------------------------------
368 {
369 operation->AddOperation( op.Move() );
370 return *this;
371 }
372
373 //------------------------------------------------------------------------
375 //------------------------------------------------------------------------
377 {
378 operation->AddOperation( op.ToHandled() );
379 return *this;
380 }
381
382 //------------------------------------------------------------------------
386 //------------------------------------------------------------------------
387 operator Operation<true>&()
388 {
389 if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." );
390 return *operation.get();
391 }
392
393 //------------------------------------------------------------------------
397 //------------------------------------------------------------------------
398 operator bool()
399 {
400 return bool( operation );
401 }
402
403 //------------------------------------------------------------------------
407 //------------------------------------------------------------------------
408 static void Stop( const XRootDStatus &status = XrdCl::XRootDStatus() );
409
410 //------------------------------------------------------------------------
412 //------------------------------------------------------------------------
413 static void Repeat();
414
415 //------------------------------------------------------------------------
417 //------------------------------------------------------------------------
418 static void Replace( Operation<false> &&opr );
419
420 //------------------------------------------------------------------------
422 //------------------------------------------------------------------------
423 static void Replace( Pipeline p );
424
425 //------------------------------------------------------------------------
427 //------------------------------------------------------------------------
428 static void Ignore();
429
430 private:
431
432 //------------------------------------------------------------------------
437 //------------------------------------------------------------------------
438 Operation<true>* operator->()
439 {
440 return operation.get();
441 }
442
443 //------------------------------------------------------------------------
448 //------------------------------------------------------------------------
449 void Run( Timeout timeout, std::function<void(const XRootDStatus&)> final = nullptr )
450 {
451 if( ftr.valid() )
452 throw std::logic_error( "Pipeline is already running!" );
453
454 // a promise that the pipe will have a result
455 std::promise<XRootDStatus> prms;
456 ftr = prms.get_future();
457
458 if( !operation ) std::logic_error( "Empty pipeline!" );
459
460 Operation<true> *opr = operation.release();
461 PipelineHandler *h = opr->handler.get();
462 if( h )
463 h->PreparePipelineStart();
464
465 opr->Run( timeout, std::move( prms ), std::move( final ) );
466 }
467
468 //------------------------------------------------------------------------
470 //------------------------------------------------------------------------
471 std::unique_ptr<Operation<true>> operation;
472
473 //------------------------------------------------------------------------
475 //------------------------------------------------------------------------
476 std::future<XRootDStatus> ftr;
477
478 };
479
480 //----------------------------------------------------------------------------
487 //----------------------------------------------------------------------------
488 inline std::future<XRootDStatus> Async( Pipeline pipeline, uint16_t timeout = 0 )
489 {
490 pipeline.Run( timeout );
491 return std::move( pipeline.ftr );
492 }
493
494 //----------------------------------------------------------------------------
502 //----------------------------------------------------------------------------
503 inline XRootDStatus WaitFor( Pipeline pipeline, uint16_t timeout = 0 )
504 {
505 return Async( std::move( pipeline ), timeout ).get();
506 }
507
508 //----------------------------------------------------------------------------
515 //----------------------------------------------------------------------------
516 template<template<bool> class Derived, bool HasHndl, typename HdlrFactory, typename ... Args>
517 class ConcreteOperation: public Operation<HasHndl>
518 {
519 template<template<bool> class, bool, typename, typename ...>
520 friend class ConcreteOperation;
521
522 public:
523
524 //------------------------------------------------------------------------
528 //------------------------------------------------------------------------
529 ConcreteOperation( Args&&... args ) : args( std::tuple<Args...>( std::move( args )... ) ),
530 timeout( 0 )
531 {
532 static_assert( !HasHndl, "It is only possible to construct operation without handler" );
533 }
534
535 //------------------------------------------------------------------------
541 //------------------------------------------------------------------------
542 template<bool from>
544 Operation<HasHndl>( std::move( op ) ), args( std::move( op.args ) ), timeout( 0 )
545 {
546 }
547
548 //------------------------------------------------------------------------
556 //------------------------------------------------------------------------
557 template<typename Hdlr>
558 Derived<true> operator>>( Hdlr &&hdlr )
559 {
560 return this->StreamImpl( HdlrFactory::Create( hdlr ) );
561 }
562
563 //------------------------------------------------------------------------
569 //------------------------------------------------------------------------
570 Derived<true> operator|( Operation<true> &op )
571 {
572 return PipeImpl( *this, op );
573 }
574
575 //------------------------------------------------------------------------
581 //------------------------------------------------------------------------
582 Derived<true> operator|( Operation<true> &&op )
583 {
584 return PipeImpl( *this, op );
585 }
586
587 //------------------------------------------------------------------------
593 //------------------------------------------------------------------------
594 Derived<true> operator|( Operation<false> &op )
595 {
596 return PipeImpl( *this, op );
597 }
598
599 //------------------------------------------------------------------------
605 //------------------------------------------------------------------------
606 Derived<true> operator|( Operation<false> &&op )
607 {
608 return PipeImpl( *this, op );
609 }
610
611 //------------------------------------------------------------------------
613 //------------------------------------------------------------------------
614 Derived<true> operator|( FinalOperation &&fo )
615 {
616 AllocHandler( *this );
617 this->handler->Assign( fo.final );
618 return this->template Transform<true>();
619 }
620
621 //------------------------------------------------------------------------
625 //------------------------------------------------------------------------
627 {
628 Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
629 return new Derived<HasHndl>( std::move( *me ) );
630 }
631
632 //------------------------------------------------------------------------
636 //------------------------------------------------------------------------
638 {
639 this->handler.reset( new PipelineHandler() );
640 Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
641 return new Derived<true>( std::move( *me ) );
642 }
643
644 //------------------------------------------------------------------------
646 //------------------------------------------------------------------------
647 Derived<HasHndl> Timeout( uint16_t timeout )
648 {
649 this->timeout = timeout;
650 Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
651 return std::move( *me );
652 }
653
654 protected:
655
656 //------------------------------------------------------------------------
660 //------------------------------------------------------------------------
661 template<bool to>
662 inline Derived<to> Transform()
663 {
664 Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
665 return Derived<to>( std::move( *me ) );
666 }
667
668 //------------------------------------------------------------------------
674 //------------------------------------------------------------------------
675 inline Derived<true> StreamImpl( ResponseHandler *handler )
676 {
677 static_assert( !HasHndl, "Operator >> is available only for operation without handler" );
678 this->handler.reset( new PipelineHandler( handler ) );
679 return Transform<true>();
680 }
681
682 //------------------------------------------------------------------------
683 // Allocate handler if necessary
684 //------------------------------------------------------------------------
685 inline static
687 {
688 // nothing to do
689 }
690
691 //------------------------------------------------------------------------
692 // Allocate handler if necessary
693 //------------------------------------------------------------------------
694 inline static
699
700 //------------------------------------------------------------------------
707 //------------------------------------------------------------------------
708 inline static
709 Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
710 Args...> &me, Operation<true> &op )
711 {
712 AllocHandler( me ); // if HasHndl is false allocate handler
713 me.AddOperation( op.Move() );
714 return me.template Transform<true>();
715 }
716
717 //------------------------------------------------------------------------
724 //------------------------------------------------------------------------
725 inline static
726 Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
727 Args...> &me, Operation<false> &op )
728 {
729 AllocHandler( me ); // if HasHndl is false allocate handler
730 me.AddOperation( op.ToHandled() );
731 return me.template Transform<true>();
732 }
733
734 //------------------------------------------------------------------------
736 //------------------------------------------------------------------------
737 std::tuple<Args...> args;
738
739 //------------------------------------------------------------------------
741 //------------------------------------------------------------------------
742 uint16_t timeout;
743 };
744
745 // Out-of-line methods for class Operation
746
747 template <bool HasHndl>
748 void Operation<HasHndl>::Run(Timeout timeout, std::promise<XRootDStatus> prms,
749 std::function<void(const XRootDStatus &)> f)
750 {
751 static_assert(HasHndl, "Only an operation that has a handler can be assigned to workflow");
752
753 XRootDStatus st;
754 handler->Assign(timeout, std::move(prms), std::move(f), this);
755 PipelineHandler *h = handler.release();
756
757 try {
758 st = RunImpl(h, timeout);
759 } catch (const operation_expired &ex) {
761 } catch (const PipelineException &ex) { // probably not needed
762 st = ex.GetError();
763 } catch (const std::exception &ex) {
764 st = XRootDStatus(stError, errInternal, 0, ex.what());
765 }
766
767 if (!st.IsOK()) {
768 ResponseJob *job = new ResponseJob(h, new XRootDStatus(st), 0, nullptr);
770 }
771 }
772
773 template <bool HasHndl>
775 {
776 if (handler)
777 handler->AddOperation(op);
778 }
779}
780
781#endif // __XRD_CL_OPERATIONS_HH__
Derived< true > operator|(Operation< true > &&op)
static void AllocHandler(ConcreteOperation< Derived, true, HdlrFactory, Args... > &me)
static void AllocHandler(ConcreteOperation< Derived, false, HdlrFactory, Args... > &me)
std::tuple< Args... > args
Operation arguments.
Derived< true > StreamImpl(ResponseHandler *handler)
ConcreteOperation(ConcreteOperation< Derived, from, HdlrFactory, Args... > &&op)
Derived< HasHndl > Timeout(uint16_t timeout)
Set operation timeout.
Derived< true > operator>>(Hdlr &&hdlr)
uint16_t timeout
Operation timeout.
ConcreteOperation(Args &&... args)
Derived< true > operator|(Operation< false > &op)
Operation< HasHndl > * Move()
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< false > &op)
Derived< true > operator|(Operation< false > &&op)
Derived< true > operator|(FinalOperation &&fo)
Adds a final operation to the pipeline.
Operation< true > * ToHandled()
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< true > &op)
Derived< true > operator|(Operation< true > &op)
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
virtual ~Operation()
Destructor.
virtual Operation< HasHndl > * Move()=0
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
friend class PipelineHandler
Operation()
Constructor.
void AddOperation(Operation< true > *op)
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
bool valid
Flag indicating if it is a valid object.
virtual std::string ToString()=0
Name of the operation.
virtual XRootDStatus RunImpl(PipelineHandler *handler, uint16_t timeout)=0
virtual Operation< true > * ToHandled()=0
std::unique_ptr< PipelineHandler > handler
Operation handler.
friend class Operation
Operation(Operation< from > &&op)
Move constructor between template instances.
Pipeline exception, wrapps an XRootDStatus.
void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Callback function.
PipelineHandler()
Default Constructor.
void PreparePipelineStart()
Called by a pipeline on the handler of its first operation before Run.
void Assign(const Timeout &timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final, Operation< true > *opr)
PipelineHandler(ResponseHandler *handler)
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
void AddOperation(Operation< true > *operation)
Pipeline(Operation< true > *op)
Constructor.
Pipeline(Operation< true > &&op)
Constructor.
Pipeline & operator|=(Operation< true > &&op)
Extend pipeline.
static void Repeat()
Repeat current operation.
Pipeline(Operation< true > &op)
Constructor.
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
friend class PipelineHandler
Pipeline(Pipeline &&pipe)
Pipeline & operator=(Pipeline &&pipe)
Constructor.
Pipeline(Operation< false > *op)
static void Stop(const XRootDStatus &status=XrdCl::XRootDStatus())
Pipeline(Operation< false > &&op)
Constructor.
static void Replace(Operation< false > &&opr)
Replace current operation.
friend class ParallelOperation
static void Ignore()
Ignore error and proceed with the pipeline.
Pipeline(Operation< false > &op)
Constructor.
Pipeline()
Default constructor.
Pipeline & operator|=(Operation< false > &&op)
Extend pipeline.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
Call the user callback.
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
std::function< Operation< true > *(const XRootDStatus &)> rcvry_func
Type of the recovery function to be provided by the user.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
std::vector< HostInfo > HostList
const uint16_t errInternal
Internal error.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
bool IsOK() const
We're fine.