XRootD
XrdCl::XCpCtx Class Reference

#include <XrdClXCpCtx.hh>

+ Collaboration diagram for XrdCl::XCpCtx:

Public Member Functions

 XCpCtx (const std::vector< std::string > &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize)
 
bool AllDone ()
 
void Delete ()
 
std::pair< uint64_t, uint64_t > GetBlock ()
 
XRootDStatus GetChunk (XrdCl::PageInfo &ci)
 
bool GetNextUrl (std::string &url)
 
int64_t GetSize ()
 
XRootDStatus Initialize ()
 
void NotifyIdleSrc ()
 
void NotifyInitExpectant ()
 
void PutChunk (PageInfo *chunk)
 
void Release ()
 
void RemoveSrc (XCpSrc *src)
 
XCpCtxSelf ()
 
void SetFileSize (int64_t size)
 
XCpSrcWeakestLink (XCpSrc *exclude)
 

Detailed Description

Definition at line 40 of file XrdClXCpCtx.hh.

Constructor & Destructor Documentation

◆ XCpCtx()

XrdCl::XCpCtx::XCpCtx ( const std::vector< std::string > &  urls,
uint64_t  blockSize,
uint8_t  parallelSrc,
uint64_t  chunkSize,
uint64_t  parallelChunks,
int64_t  fileSize 
)

Constructor

Parameters
urls: list of replica urls
blockSize: the default block size
parallelSrc: maximum number of parallel sources
chunkSize: the default chunk size
parallelChunks: the default number of parallel chunks per source
fileSize: the file size if specified in the metalink file (-1 indicates that the file size is not known and a stat should be done)

Definition at line 36 of file XrdClXCpCtx.cc.

36  :
37  pUrls( std::deque<std::string>( urls.begin(), urls.end() ) ), pBlockSize( blockSize ),
38  pParallelSrc( parallelSrc ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ),
39  pOffset( 0 ), pFileSize( -1 ), pFileSizeCV( 0 ), pDataReceived( 0 ), pDone( false ),
40  pDoneCV( 0 ), pRefCount( 1 ), pDeleteCV( 0 ), pDelete( false )
41 {
42  SetFileSize( fileSize );
43 }
void SetFileSize(int64_t size)
Definition: XrdClXCpCtx.cc:107

References SetFileSize().

+ Here is the call graph for this function:

Member Function Documentation

◆ AllDone()

bool XrdCl::XCpCtx::AllDone ( )

Returns true if all chunks have been transferred, otherwise blocks until NotifyIdleSrc is called, or a 1 minute timeout occurs.

Returns
: true is all chunks have been transferred, false otherwise.

Definition at line 193 of file XrdClXCpCtx.cc.

194 {
195  XrdSysCondVarHelper lck( pDoneCV );
196 
197  if( !pDone )
198  pDoneCV.Wait( 60 );
199 
200  return pDone;
201 }

References XrdSysCondVar::Wait().

+ Here is the call graph for this function:

◆ Delete()

void XrdCl::XCpCtx::Delete ( )
inline

Decrements the reference count and then waits for it to reach zero, then deletes the instance. Should only be called once.

Definition at line 62 of file XrdClXCpCtx.hh.

63  {
64  XrdSysMutexHelper lckmtx( pMtx );
65  --pRefCount;
66  if( !pRefCount )
67  {
68  lckmtx.UnLock();
69  delete this;
70  return;
71  }
72  lckmtx.UnLock();
73 
74  XrdSysCondVarHelper lckcv( pDoneCV );
75  pDone = true;
76  pDoneCV.Broadcast();
77  lckcv.UnLock();
78 
79  lckcv.Lock( &pDeleteCV );
80  while( !pDelete ) pDeleteCV.Wait();
81  lckcv.UnLock();
82  delete this;
83  }

References XrdSysCondVar::Broadcast(), XrdSysCondVarHelper::Lock(), XrdSysCondVarHelper::UnLock(), XrdSysMutexHelper::UnLock(), and XrdSysCondVar::Wait().

+ Here is the call graph for this function:

◆ GetBlock()

std::pair< uint64_t, uint64_t > XrdCl::XCpCtx::GetBlock ( )

Get next block that has to be transferred

Returns
: pair of offset and block size

Definition at line 95 of file XrdClXCpCtx.cc.

96 {
97  XrdSysMutexHelper lck( pMtx );
98 
99  uint64_t blkSize = pBlockSize, offset = pOffset;
100  if( pOffset + blkSize > uint64_t( pFileSize ) )
101  blkSize = pFileSize - pOffset;
102  pOffset += blkSize;
103 
104  return std::make_pair( offset, blkSize );
105 }

◆ GetChunk()

XRootDStatus XrdCl::XCpCtx::GetChunk ( XrdCl::PageInfo ci)

Gets the next chunk from the sink, if the sink is empty blocks.

Parameters
ci: the chunk retrieved from sink (output parameter)
Returns
: stError if we failed to transfer the file, stOK otherwise, with one of the following codes:
  • suDone : the whole file has been transferred, we are done
  • suContinue : a chunk has been written into ci, continue calling GetChunk in order to retrieve remaining chunks
  • suRetry : a chunk has not been written into ci, try again.

Definition at line 156 of file XrdClXCpCtx.cc.

157 {
158  // if we received all the data we are done here
159  if( pDataReceived == uint64_t( pFileSize ) )
160  {
161  XrdSysCondVarHelper lck( pDoneCV );
162  pDone = true;
163  pDoneCV.Broadcast();
164  return XRootDStatus( stOK, suDone );
165  }
166 
167  // if we don't have active sources it means we failed
168  if( GetRunning() == 0 )
169  {
170  XrdSysCondVarHelper lck( pDoneCV );
171  pDone = true;
172  pDoneCV.Broadcast();
173  return XRootDStatus( stError, errNoMoreReplicas );
174  }
175 
176  PageInfo *chunk = pSink.Get();
177  if( chunk )
178  {
179  pDataReceived += chunk->GetLength();
180  ci = std::move( *chunk );
181  delete chunk;
182  return XRootDStatus( stOK, suContinue );
183  }
184 
185  return XRootDStatus( stOK, suRetry );
186 }
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t suDone
Definition: XrdClStatus.hh:38
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const uint16_t errNoMoreReplicas
No more replicas to try.
Definition: XrdClStatus.hh:65

References XrdSysCondVar::Broadcast(), XrdCl::errNoMoreReplicas, XrdCl::PageInfo::GetLength(), XrdCl::stError, XrdCl::stOK, XrdCl::suContinue, XrdCl::suDone, and XrdCl::suRetry.

+ Here is the call graph for this function:

◆ GetNextUrl()

bool XrdCl::XCpCtx::GetNextUrl ( std::string &  url)

Gets the next URL from the list of file replicas

Parameters
url: the output parameter
Returns
: true if a url has been written to the url parameter, false otherwise

Definition at line 57 of file XrdClXCpCtx.cc.

58 {
59  XrdSysMutexHelper lck( pMtx );
60  if( pUrls.empty() ) return false;
61  url = pUrls.front();
62  pUrls.pop();
63  return true;
64 }

◆ GetSize()

int64_t XrdCl::XCpCtx::GetSize ( )
inline

Get file size. The call blocks until the file size is being set using SetFileSize.

Definition at line 157 of file XrdClXCpCtx.hh.

158  {
159  XrdSysCondVarHelper lck( pFileSizeCV );
160  while( pFileSize < 0 && GetRunning() > 0 ) pFileSizeCV.Wait();
161  return pFileSize;
162  }

References XrdSysCondVar::Wait().

+ Here is the call graph for this function:

◆ Initialize()

XRootDStatus XrdCl::XCpCtx::Initialize ( )

Starts one thread per source, each thread tries to open a file, stat the file if necessary, and then starts reading the file, all chunks read go to the sink.

Returns
Error if we were not able to create any threads

Definition at line 124 of file XrdClXCpCtx.cc.

125 {
126  for( uint8_t i = 0; i < pParallelSrc; ++i )
127  {
128  XCpSrc *src = new XCpSrc( pChunkSize, pParallelChunks, pFileSize, this );
129  pSources.push_back( src );
130  }
131 
132  auto scpy = pSources;
133  bool ok = false;
134  for(auto src: scpy) {
135  if( src->Start() )
136  {
137  // src destructor will remove src from pSources
138  src->Delete();
139  }
140  else
141  {
142  ok = true;
143  }
144  }
145 
146  if( !ok )
147  {
148  Log *log = DefaultEnv::GetLog();
149  log->Error( UtilityMsg, "Failed to initialize (failed to create new threads)" );
150  return XRootDStatus( stError, errInternal, EAGAIN, "XCpCtx: failed to create new threads." );
151  }
152 
153  return XRootDStatus();
154 }
static Log * GetLog()
Get default log.
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint64_t UtilityMsg
XrdSysError Log
Definition: XrdConfig.cc:113

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::stError, and XrdCl::UtilityMsg.

+ Here is the call graph for this function:

◆ NotifyIdleSrc()

void XrdCl::XCpCtx::NotifyIdleSrc ( )

Notify idle sources, used in two case:

  • if one of the sources failed and an idle source needs to take over
  • or if we are done and all idle source should be stopped

Definition at line 188 of file XrdClXCpCtx.cc.

189 {
190  pDoneCV.Broadcast();
191 }

References XrdSysCondVar::Broadcast().

+ Here is the call graph for this function:

◆ NotifyInitExpectant()

void XrdCl::XCpCtx::NotifyInitExpectant ( )
inline

Notify those who are waiting for initialization. In particular the GetSize() caller will be waiting on the result of initialization.

Definition at line 225 of file XrdClXCpCtx.hh.

226  {
227  pFileSizeCV.Broadcast();
228  }

References XrdSysCondVar::Broadcast().

+ Here is the call graph for this function:

◆ PutChunk()

void XrdCl::XCpCtx::PutChunk ( PageInfo chunk)

Put a chunk into the sink

Parameters
chunk: the chunk

Definition at line 90 of file XrdClXCpCtx.cc.

91 {
92  pSink.Put( chunk );
93 }

◆ Release()

void XrdCl::XCpCtx::Release ( )
inline

Decrements the reference count and signal when we reach 0

Definition at line 88 of file XrdClXCpCtx.hh.

89  {
90  XrdSysMutexHelper lck( pMtx );
91  --pRefCount;
92  if( !pRefCount )
93  {
94  XrdSysCondVarHelper lckcv( pDeleteCV );
95  pDelete = true;
96  pDeleteCV.Broadcast();
97  }
98  }

References XrdSysCondVar::Broadcast().

+ Here is the call graph for this function:

◆ RemoveSrc()

void XrdCl::XCpCtx::RemoveSrc ( XCpSrc src)
inline

Remove given source

Parameters
src: the source to be removed

Definition at line 195 of file XrdClXCpCtx.hh.

196  {
197  XrdSysMutexHelper lck( pMtx );
198  pSources.remove( src );
199  }

◆ Self()

XCpCtx* XrdCl::XCpCtx::Self ( )
inline

Increments the reference counter.

Returns
: myself.

Definition at line 105 of file XrdClXCpCtx.hh.

106  {
107  XrdSysMutexHelper lck( pMtx );
108  ++pRefCount;
109  return this;
110  }

◆ SetFileSize()

void XrdCl::XCpCtx::SetFileSize ( int64_t  size)

Set the file size (GetSize will block until SetFileSize will be called). Also calculates the block size.

Parameters
size: file size

Definition at line 107 of file XrdClXCpCtx.cc.

108 {
109  XrdSysCondVarHelper lckcv( pFileSizeCV );
110  XrdSysMutexHelper lckmtx( pMtx );
111  if( pFileSize < 0 && size >= 0 )
112  {
113  pFileSize = size;
114  pFileSizeCV.Broadcast();
115 
116  if( pBlockSize > uint64_t( pFileSize ) / pParallelSrc )
117  pBlockSize = pFileSize / pParallelSrc;
118 
119  if( pBlockSize < pChunkSize )
120  pBlockSize = pChunkSize;
121  }
122 }

References XrdSysCondVar::Broadcast().

Referenced by XCpCtx().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WeakestLink()

XCpSrc * XrdCl::XCpCtx::WeakestLink ( XCpSrc exclude)

Get the 'weakest' sources

Parameters
exclude: the source that is excluded from the search
Returns
: the weakest source

Definition at line 66 of file XrdClXCpCtx.cc.

67 {
68  uint64_t transferRate = -1; // set transferRate to max uint64 value
69  XCpSrc *ret = 0;
70 
71  std::list<XCpSrc*>::iterator itr;
72  XrdSysMutexHelper lck( pMtx );
73 
74  for( itr = pSources.begin() ; itr != pSources.end() ; ++itr )
75  {
76  XCpSrc *src = *itr;
77  if( src == exclude ) continue;
78  uint64_t tmp = src->TransferRate();
79  if( src->HasData() && tmp < transferRate )
80  {
81  ret = src;
82  transferRate = tmp;
83  }
84  }
85 
86  if( !ret ) return ret;
87  return ret->Self();
88 }

References XrdCl::XCpSrc::HasData(), XrdCl::XCpSrc::Self(), and XrdCl::XCpSrc::TransferRate().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: