XRootD
Loading...
Searching...
No Matches
XrdCephOssReadVFile.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN)
3// Author: Sebastien Ponce <sebastien.ponce@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include <sys/types.h>
26#include <unistd.h>
27#include <sstream>
28#include <iostream>
29#include <memory>
30#include <algorithm>
31#include <fcntl.h>
32
34#include "XrdOuc/XrdOucEnv.hh"
35#include "XrdSys/XrdSysError.hh"
36#include "XrdOuc/XrdOucTrace.hh"
37#include "XrdSfs/XrdSfsAio.hh"
39
43
44using namespace XrdCephBuffer;
45
48
49XrdCephOssReadVFile::XrdCephOssReadVFile(XrdCephOss *cephoss,XrdCephOssFile *cephossDF,const std::string& algname):
50XrdCephOssFile(cephoss), m_cephoss(cephoss), m_xrdOssDF(cephossDF),m_algname(algname)
51{
52 if (!m_xrdOssDF) XrdCephEroute.Say("XrdCephOssReadVFile::Null m_xrdOssDF");
53
54 if (m_algname == "passthrough") { // #TODO consider to use a factory method. but this is simple enough for now
55 m_readVAdapter = std::unique_ptr<XrdCephBuffer::IXrdCephReadVAdapter>(new XrdCephBuffer::XrdCephReadVNoOp());
56 } else if (m_algname == "basic") {
57 m_readVAdapter = std::unique_ptr<XrdCephBuffer::IXrdCephReadVAdapter>(new XrdCephBuffer::XrdCephReadVBasic());
58 } else {
59 XrdCephEroute.Say("XrdCephOssReadVFile::ERROR Invalid ReadV algorthm passed; defaulting to passthrough");
60 m_algname = "passthrough";
61 m_readVAdapter = std::unique_ptr<XrdCephBuffer::IXrdCephReadVAdapter>(new XrdCephBuffer::XrdCephReadVNoOp());
62 }
63 LOGCEPH("XrdCephOssReadVFile Algorithm type: " << m_algname);
64}
65
67 if (m_xrdOssDF) {
68 delete m_xrdOssDF;
69 m_xrdOssDF = nullptr;
70 }
71
72}
73
74int XrdCephOssReadVFile::Open(const char *path, int flags, mode_t mode, XrdOucEnv &env) {
75 int rc = m_xrdOssDF->Open(path, flags, mode, env);
76 if (rc < 0) {
77 return rc;
78 }
79 m_fd = m_xrdOssDF->getFileDescriptor();
80 LOGCEPH("XrdCephOssReadVFile::Open: fd: " << m_fd << " " << path );
81 return rc;
82}
83
84int XrdCephOssReadVFile::Close(long long *retsz) {
85 LOGCEPH("XrdCephOssReadVFile::Close: retsz: " << retsz << " Time_ceph_s: " << m_timer_read_ns.load()*1e-9 << " count: "
86 << m_timer_count.load() << " size_B: " << m_timer_size.load()
87 << " longest_s:" << m_timer_longest.load()*1e-9);
88
89 return m_xrdOssDF->Close(retsz);
90}
91
92
93ssize_t XrdCephOssReadVFile::ReadV(XrdOucIOVec *readV, int rnum) {
94 int fd = m_xrdOssDF->getFileDescriptor();
95 LOGCEPH("XrdCephOssReadVFile::ReadV: fd: " << fd << " " << rnum );
96
97 std::stringstream msg_extents;
98 msg_extents << "XrdCephOssReadVFile::Extentslist={\"fd\": " << fd << ", \"EXTENTS\":[";
99
100 ExtentHolder extents(rnum);
101 for (int i = 0; i < rnum; i++) {
102 extents.push_back(Extent(readV[i].offset, readV[i].size));
103 msg_extents << "[" << readV[i].offset << "," << readV[i].size << "]," ;
104 }
105 msg_extents << "]}";
106 //XrdCephEroute.Say(msg_extents.str().c_str()); msg_extents.clear();
107 if (m_extraLogging) {
108 // improve this so no wasted calls if logging is disabled
109 LOGCEPH(msg_extents.str());
110 msg_extents.clear();
111 }
112
113 LOGCEPH("XrdCephOssReadVFile::Extents: fd: "<< fd << " " << extents.size() << " " << extents.len() << " "
114 << extents.begin() << " " << extents.end() << " " << extents.bytesContained()
115 << " " << extents.bytesMissing());
116
117 // take the input set of extents and return a vector of merged extents (covering the range to read)
118 std::vector<ExtentHolder> mappedExtents = m_readVAdapter->convert(extents);
119
120
121 // counter is the iterator to the original readV elements, and is incremented for each chunk that's returned
122 int nbytes = 0, curCount = 0, counter(0);
123 size_t totalBytesRead(0), totalBytesUseful(0);
124
125 // extract the largest range of the extents, and create a buffer.
126 size_t buffersize{0};
127 for (std::vector<ExtentHolder>::const_iterator ehit = mappedExtents.cbegin(); ehit!= mappedExtents.cend(); ++ehit ) {
128 buffersize = std::max(buffersize, ehit->len());
129 }
130 std::vector<char> buffer;
131 buffer.reserve(buffersize);
132
133
134 //LOGCEPH("mappedExtents: len: " << mappedExtents.size() );
135 for (std::vector<ExtentHolder>::const_iterator ehit = mappedExtents.cbegin(); ehit!= mappedExtents.cend(); ++ehit ) {
136 off_t off = ehit->begin();
137 size_t len = ehit->len();
138
139 //LOGCEPH("outerloop: " << off << " " << len << " " << ehit->end() << " " << " " << ehit->size() );
140
141 // read the full extent into the buffer
142 long timed_read_ns{0};
143 {Timer_ns ts(timed_read_ns);
144 curCount = m_xrdOssDF->Read(buffer.data(), off, len);
145 } // timer scope
147 auto l = m_timer_longest.load();
148 m_timer_longest.store(std::max(l,timed_read_ns)); // doesn't quite prevent race conditions
149 m_timer_read_ns.fetch_add(timed_read_ns);
150 m_timer_size.fetch_add(curCount);
151
152 // check that the correct amount of data was read.
153 // std:: clog << "buf Read " << curCount << std::endl;
154 if (curCount != (ssize_t)len) {
155 return (curCount < 0 ? curCount : -ESPIPE);
156 }
157 totalBytesRead += curCount;
158 totalBytesUseful += ehit->bytesContained();
159
160
161 // now read out into the original readV requests for each of the held inner extents
162 const char* data = buffer.data();
163 const ExtentContainer& innerExtents = ehit->extents();
164 for (ExtentContainer::const_iterator it = innerExtents.cbegin(); it != innerExtents.cend(); ++it) {
165 off_t innerBegin = it->begin() - off;
166 off_t innerEnd = it->end() - off;
167 //LOGCEPH( "innerloop: " << innerBegin << " " << innerEnd << " " << off << " "
168 // << it->begin() << " " << it-> end() << " "
169 // << readV[counter].offset << " " << readV[counter].size);
170 std::copy(data+innerBegin, data+innerEnd, readV[counter].data );
171 nbytes += it->len();
172 ++counter; // next element
173 } // inner extents
174
175 } // outer extents
176 LOGCEPH( "readV returning " << nbytes << " bytes: " << "Read: " <<totalBytesRead << " Useful: " << totalBytesUseful );
177 return nbytes;
178
179}
180
181ssize_t XrdCephOssReadVFile::Read(off_t offset, size_t blen) {
182 return m_xrdOssDF->Read(offset,blen);
183}
184
185ssize_t XrdCephOssReadVFile::Read(void *buff, off_t offset, size_t blen) {
186 return m_xrdOssDF->Read(buff,offset,blen);
187}
188
190 return m_xrdOssDF->Read(aiop);
191}
192
193ssize_t XrdCephOssReadVFile::ReadRaw(void *buff, off_t offset, size_t blen) {
194 return m_xrdOssDF->ReadRaw(buff, offset, blen);
195}
196
198 return m_xrdOssDF->Fstat(buff);
199}
200
201ssize_t XrdCephOssReadVFile::Write(const void *buff, off_t offset, size_t blen) {
202 return m_xrdOssDF->Write(buff,offset,blen);
203}
204
206 return m_xrdOssDF->Write(aiop);
207}
208
210 return m_xrdOssDF->Fsync();
211}
212
213int XrdCephOssReadVFile::Ftruncate(unsigned long long len) {
214 return m_xrdOssDF->Ftruncate(len);
215}
XrdOucTrace XrdCephTrace
static std::string ts()
timestamp output for logging messages
Definition XrdCephOss.cc:53
XrdSysError XrdCephEroute(0)
Definition XrdCephOss.cc:50
#define LOGCEPH(x)
#define stat(a, b)
Definition XrdPosix.hh:101
if(Avsz)
Designed to hold individual extents, but itself provide Extent-like capabilities Useful in cases of c...
size_t size() const
number of extent elements
void push_back(const Extent &in)
Combine requests into single reads accoriding to some basic rules. Read a minimum amount of data (2Mi...
Passthrough implementation. Convertes the ReadV requests to extents and makes the request....
XrdCephOssFile(XrdCephOss *cephoss)
virtual int Close(long long *retsz=0)
std::atomic< long > m_timer_longest
number of reads
virtual ssize_t Read(off_t offset, size_t blen)
std::atomic< long > m_timer_size
number of reads
std::unique_ptr< XrdCephBuffer::IXrdCephReadVAdapter > m_readVAdapter
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
std::atomic< long > m_timer_read_ns
virtual ssize_t Write(const void *buff, off_t offset, size_t blen)
std::atomic< long > m_timer_count
timer for the reads against ceph
virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env)
XrdCephOssReadVFile(XrdCephOss *cephoss, XrdCephOssFile *cephossDF, const std::string &algname)
virtual int Ftruncate(unsigned long long)
virtual ssize_t ReadRaw(void *, off_t, size_t)
XrdCephOssFile * m_xrdOssDF
virtual int Fstat(struct stat *buff)
int fd
Definition XrdOss.hh:455
is a simple implementation of IXrdCephBufferData using std::vector<char> representation for the buffe...
std::vector< Extent > ExtentContainer
Container defintion for Extents Typedef to provide a container of extents as a simple stl vector cont...
long long offset