XRootD
XrdPfcFile.hh
Go to the documentation of this file.
1 #ifndef __XRDPFC_FILE_HH__
2 #define __XRDPFC_FILE_HH__
3 //----------------------------------------------------------------------------------
4 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5 // Author: Alja Mrak-Tadel, Matevz Tadel
6 //----------------------------------------------------------------------------------
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //----------------------------------------------------------------------------------
20 
22 #include "XrdCl/XrdClDefaultEnv.hh"
23 
24 #include "XrdOuc/XrdOucCache.hh"
25 #include "XrdOuc/XrdOucIOVec.hh"
26 
27 #include "XrdPfcInfo.hh"
28 #include "XrdPfcStats.hh"
29 
30 #include <functional>
31 #include <map>
32 #include <set>
33 #include <string>
34 
35 class XrdJob;
36 class XrdOucIOVec;
37 
38 namespace XrdCl
39 {
40 class Log;
41 }
42 
43 namespace XrdPfc
44 {
45 class BlockResponseHandler;
46 class DirectResponseHandler;
47 class IO;
48 
49 struct ReadVBlockListRAM;
50 struct ReadVChunkListRAM;
51 struct ReadVBlockListDisk;
52 struct ReadVChunkListDisk;
53 }
54 
55 
56 namespace XrdPfc
57 {
58 class File;
59 
60 struct ReadReqRH : public XrdOucCacheIOCB
61 {
62  int m_expected_size = 0;
63  int m_n_chunks = 0; // Only set for ReadV().
64  unsigned short m_seq_id;
65  XrdOucCacheIOCB *m_iocb; // External callback passed into IO::Read().
66 
67  ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) :
68  m_seq_id(sid), m_iocb(iocb)
69  {}
70 };
71 
72 // -------------------------------------------------------------
73 
75 {
76  IO *m_io;
77  ReadReqRH *m_rh; // Internal callback created in IO::Read().
78 
79  long long m_bytes_read = 0;
80  int m_error_cond = 0; // to be set to -errno
82 
83  int m_n_chunk_reqs = 0;
84  bool m_sync_done = false;
85  bool m_direct_done = true;
86 
87  ReadRequest(IO *io, ReadReqRH *rh) :
88  m_io(io), m_rh(rh)
89  {}
90 
91  void update_error_cond(int ec) { if (m_error_cond == 0 ) m_error_cond = ec; }
92 
93  bool is_complete() const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; }
94  int return_value() const { return m_error_cond ? m_error_cond : m_bytes_read; }
95 };
96 
97 // -------------------------------------------------------------
98 
100 {
102  char *m_buf; // Where to place the data chunk.
103  long long m_off; // Offset *within* the corresponding block.
104  int m_size; // Size of the data chunk.
105 
106  ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) :
107  m_read_req(rreq), m_buf(buf), m_off(off), m_size(size)
108  {}
109 };
110 
111 using vChunkRequest_t = std::vector<ChunkRequest>;
112 using vChunkRequest_i = std::vector<ChunkRequest>::iterator;
113 
114 // ================================================================
115 
116 class Block
117 {
118 public:
120  IO *m_io; // IO that handled current request, used for == / != comparisons only
121  void *m_req_id; // Identity of requestor -- used for stats.
122 
123  char *m_buff;
124  long long m_offset;
125  int m_size;
127  int m_refcnt;
128  int m_errno; // stores negative errno
134 
136 
137  Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize,
138  bool m_prefetch, bool cks_net) :
139  m_file(f), m_io(io), m_req_id(rid),
140  m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
142  m_req_cksum_net(cks_net), m_n_cksum_errors(0)
143  {}
144 
145  char* get_buff() const { return m_buff; }
146  int get_size() const { return m_size; }
147  int get_req_size() const { return m_req_size; }
148  long long get_offset() const { return m_offset; }
149 
150  File* get_file() const { return m_file; }
151  IO* get_io() const { return m_io; }
152  void* get_req_id() const { return m_req_id; }
153 
154  bool is_finished() const { return m_downloaded || m_errno != 0; }
155  bool is_ok() const { return m_downloaded; }
156  bool is_failed() const { return m_errno != 0; }
157 
158  void set_downloaded() { m_downloaded = true; }
159  void set_error(int err) { m_errno = err; }
160  int get_error() const { return m_errno; }
161 
162  void reset_error_and_set_io(IO *io, void *rid)
163  {
164  m_errno = 0;
165  m_io = io;
166  m_req_id = rid;
167  }
168 
169  bool req_cksum_net() const { return m_req_cksum_net; }
170  bool has_cksums() const { return ! m_cksum_vec.empty(); }
174 };
175 
176 using BlockList_t = std::list<Block*>;
177 using BlockList_i = std::list<Block*>::iterator;
178 
179 // ================================================================
180 
182 {
183 public:
185 
187 
188  void Done(int result) override;
189 };
190 
191 // ----------------------------------------------------------------
192 
194 {
195 public:
200  int m_bytes_read = 0;
201  int m_errno = 0;
202 
203  DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) :
204  m_file(file), m_read_req(rreq), m_to_wait(to_wait)
205  {}
206 
207  void Done(int result) override;
208 };
209 
210 // ================================================================
211 
212 class File
213 {
214  friend class BlockResponseHandler;
215  friend class DirectResponseHandler;
216 public:
217  // Constructor and Open() are private.
218 
220  static File* FileOpen(const std::string &path, long long offset, long long fileSize);
221 
223  ~File();
224 
227 
229  void BlocksRemovedFromWriteQ(std::list<Block*>&);
230 
232  int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh);
233 
235  int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh);
236 
237  //----------------------------------------------------------------------
239  //----------------------------------------------------------------------
240  void ioUpdated(IO *io);
241 
242  //----------------------------------------------------------------------
245  //----------------------------------------------------------------------
246  bool ioActive(IO *io);
247 
248  //----------------------------------------------------------------------
251  //----------------------------------------------------------------------
253 
254  //----------------------------------------------------------------------
257  //----------------------------------------------------------------------
258  bool FinalizeSyncBeforeExit();
259 
260  //----------------------------------------------------------------------
262  //----------------------------------------------------------------------
263  void Sync();
264 
265  void WriteBlockToDisk(Block* b);
266 
267  void Prefetch();
268 
269  float GetPrefetchScore() const;
270 
272  const char* lPath() const;
273 
274  std::string& GetLocalPath() { return m_filename; }
275 
276  XrdSysError* GetLog();
278 
279  long long GetFileSize() { return m_file_size; }
280 
281  void AddIO(IO *io);
284  void RemoveIO(IO *io);
285 
287 
288  std::string GetRemoteLocations() const;
289  const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); }
290  size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
291  int GetBlockSize() const { return m_cfi.GetBufferSize(); }
292  int GetNBlocks() const { return m_cfi.GetNBlocks(); }
293  int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
294  const Stats& RefStats() const { return m_stats; }
295 
296  // These three methods are called under Cache's m_active lock
297  int get_ref_cnt() { return m_ref_cnt; }
298  int inc_ref_cnt() { return ++m_ref_cnt; }
299  int dec_ref_cnt() { return --m_ref_cnt; }
300 
302  bool is_in_emergency_shutdown() { return m_in_shutdown; }
303 
304 private:
306  File(const std::string &path, long long offset, long long fileSize);
307 
309  bool Open();
310 
311  static const char *m_traceID;
312 
313  int m_ref_cnt;
314 
315  XrdOssDF *m_data_file;
316  XrdOssDF *m_info_file;
317  Info m_cfi;
318 
319  std::string m_filename;
320  long long m_offset;
321  long long m_file_size;
322 
323  // IO objects attached to this file.
324 
325  typedef std::set<IO*> IoSet_t;
326  typedef IoSet_t::iterator IoSet_i;
327 
328  IoSet_t m_io_set;
329  IoSet_i m_current_io;
330  int m_ios_in_detach;
331 
332  // FSync
333 
334  std::vector<int> m_writes_during_sync;
335  int m_non_flushed_cnt;
336  bool m_in_sync;
337  bool m_detach_time_logged;
338  bool m_in_shutdown;
339 
340  // Block state and management
341 
342  typedef std::list<int> IntList_t;
343  typedef IntList_t::iterator IntList_i;
344 
345  typedef std::map<int, Block*> BlockMap_t;
346  typedef BlockMap_t::iterator BlockMap_i;
347 
348  BlockMap_t m_block_map;
349  XrdSysCondVar m_state_cond;
350  long long m_block_size;
351  int m_num_blocks;
352 
353  // Stats
354 
355  Stats m_stats;
356  Stats m_last_stats;
357 
358  std::set<std::string> m_remote_locations;
359  void insert_remote_location(const std::string &loc);
360 
361  // Prefetch
362 
363  enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
364 
365  PrefetchState_e m_prefetch_state;
366 
367  int m_prefetch_read_cnt;
368  int m_prefetch_hit_cnt;
369  float m_prefetch_score; // cached
370 
371  void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } }
372  void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt += phc; calc_prefetch_score(); } }
373  void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; }
374 
375  // Helpers
376 
377  bool overlap(int blk, // block to query
378  long long blk_size, //
379  long long req_off, // offset of user request
380  int req_size, // size of user request
381  // output:
382  long long &off, // offset in user buffer
383  long long &blk_off, // offset in block
384  int &size);
385 
386  // Read & ReadV
387 
388  Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch);
389 
390  void ProcessBlockRequest (Block *b);
391  void ProcessBlockRequests(BlockList_t& blks);
392 
393  void RequestBlocksDirect(IO *io, DirectResponseHandler *handler, std::vector<XrdOucIOVec>& ioVec, int expected_size);
394 
395  int ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size);
396 
397  int ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
398  ReadReqRH *rh, const char *tpfx);
399 
400  void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond);
401  void ProcessBlockError(Block *b, ReadRequest *rreq);
402  void ProcessBlockSuccess(Block *b, ChunkRequest &creq);
403  void FinalizeReadRequest(ReadRequest *rreq);
404 
405  void ProcessBlockResponse(Block *b, int res);
406 
407  // Block management
408 
409  void inc_ref_count(Block* b);
410  void dec_ref_count(Block* b, int count = 1);
411  void free_block(Block*);
412 
413  bool select_current_io_or_disable_prefetching(bool skip_current);
414 
415  int offsetIdx(int idx) const;
416 };
417 
418 //------------------------------------------------------------------------------
419 
420 inline void File::inc_ref_count(Block* b)
421 {
422  // Method always called under lock.
423  b->m_refcnt++;
424 }
425 
426 //------------------------------------------------------------------------------
427 
428 inline void File::dec_ref_count(Block* b, int count)
429 {
430  // Method always called under lock.
431  assert(b->is_finished());
432  b->m_refcnt -= count;
433  assert(b->m_refcnt >= 0);
434 
435  if (b->m_refcnt == 0)
436  {
437  free_block(b);
438  }
439 }
440 
441 }
442 
443 #endif
XrdOucString File
Definition: XrdJob.hh:43
void Done(int result) override
Definition: XrdPfcFile.cc:1536
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:173
int get_size() const
Definition: XrdPfcFile.hh:146
int get_error() const
Definition: XrdPfcFile.hh:160
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:172
Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net)
Definition: XrdPfcFile.hh:137
void * get_req_id() const
Definition: XrdPfcFile.hh:152
long long get_offset() const
Definition: XrdPfcFile.hh:148
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:135
bool is_finished() const
Definition: XrdPfcFile.hh:154
bool is_ok() const
Definition: XrdPfcFile.hh:155
void set_error(int err)
Definition: XrdPfcFile.hh:159
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:171
int m_n_cksum_errors
Definition: XrdPfcFile.hh:133
char * get_buff() const
Definition: XrdPfcFile.hh:145
IO * get_io() const
Definition: XrdPfcFile.hh:151
void set_downloaded()
Definition: XrdPfcFile.hh:158
bool req_cksum_net() const
Definition: XrdPfcFile.hh:169
void * m_req_id
Definition: XrdPfcFile.hh:121
bool has_cksums() const
Definition: XrdPfcFile.hh:170
File * get_file() const
Definition: XrdPfcFile.hh:150
bool is_failed() const
Definition: XrdPfcFile.hh:156
long long m_offset
Definition: XrdPfcFile.hh:124
vCkSum_t m_cksum_vec
Definition: XrdPfcFile.hh:132
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:162
bool m_req_cksum_net
Definition: XrdPfcFile.hh:131
int get_req_size() const
Definition: XrdPfcFile.hh:147
void Done(int result) override
Definition: XrdPfcFile.cc:1544
DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait)
Definition: XrdPfcFile.hh:203
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:271
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1392
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:674
XrdSysTrace * GetTrace()
Definition: XrdPfcFile.cc:1490
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:939
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:99
float GetPrefetchScore() const
Definition: XrdPfcFile.cc:1480
XrdSysError * GetLog()
Definition: XrdPfcFile.cc:1485
int GetNBlocks() const
Definition: XrdPfcFile.hh:292
void Prefetch()
Definition: XrdPfcFile.cc:1407
void StopPrefetchingOnIO(IO *io)
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1504
const Info::AStat * GetLastAccessStats() const
Definition: XrdPfcFile.hh:289
size_t GetAccessCnt() const
Definition: XrdPfcFile.hh:290
void AddIO(IO *io)
Definition: XrdPfcFile.cc:295
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:265
int GetBlockSize() const
Definition: XrdPfcFile.hh:291
int GetNDownloadedBlocks() const
Definition: XrdPfcFile.hh:293
long long GetFileSize()
Definition: XrdPfcFile.hh:279
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:165
friend class DirectResponseHandler
Definition: XrdPfcFile.hh:215
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:112
int inc_ref_cnt()
Definition: XrdPfcFile.hh:298
int GetPrefetchCountOnIO(IO *io)
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1022
int dec_ref_cnt()
Definition: XrdPfcFile.hh:299
int get_ref_cnt()
Definition: XrdPfcFile.hh:297
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:641
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:179
const Stats & RefStats() const
Definition: XrdPfcFile.hh:294
~File()
Destructor.
Definition: XrdPfcFile.cc:76
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:332
std::string & GetLocalPath()
Definition: XrdPfcFile.hh:274
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Definition: XrdPfcFile.cc:157
Stats DeltaStatsFromLastCall()
Definition: XrdPfcFile.cc:142
bool is_in_emergency_shutdown()
Definition: XrdPfcFile.hh:302
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:188
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:18
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45
const AStat * GetLastAccessStats() const
Get latest access stats.
Definition: XrdPfcInfo.cc:491
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:473
int GetNDownloadedBlocks() const
Get number of downloaded blocks.
Definition: XrdPfcInfo.hh:402
size_t GetAccessCnt() const
Get number of accesses.
Definition: XrdPfcInfo.hh:265
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:441
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:31
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
XrdSysError Log
Definition: XrdConfig.cc:111
Definition: XrdPfc.hh:41
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:111
std::vector< ChunkRequest >::iterator vChunkRequest_i
Definition: XrdPfcFile.hh:112
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:176
std::vector< uint32_t > vCkSum_t
Definition: XrdPfcTypes.hh:27
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:177
ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size)
Definition: XrdPfcFile.hh:106
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:101
Access statistics.
Definition: XrdPfcInfo.hh:61
XrdOucCacheIOCB * m_iocb
Definition: XrdPfcFile.hh:65
unsigned short m_seq_id
Definition: XrdPfcFile.hh:64
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition: XrdPfcFile.hh:67
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:91
ReadRequest(IO *io, ReadReqRH *rh)
Definition: XrdPfcFile.hh:87
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:77
bool is_complete() const
Definition: XrdPfcFile.hh:93
int return_value() const
Definition: XrdPfcFile.hh:94
long long m_bytes_read
Definition: XrdPfcFile.hh:79