82 XrdXrootdPgwFob *fobP = fP->
pgwFob;
83 int numErrs, numFixes, numLeft;
87 if (!fobP)
return true;
91 numLeft = fobP->
numOffs(&numErrs, &numFixes);
99 snprintf(ebuff,
sizeof(ebuff),
"%d uncorrected checksum errors",numLeft);
113int XrdXrootdProtocol::do_PgRead()
116 XrdXrootdFHandle fh(
Request.pgread.fhandle);
131 if (!
FTab || !(
IO.File =
FTab->Get(fh.handle)))
133 "pgread does not refer to an open file");
138 if (!
Request.header.dlen) pathID = 0;
139 else {ClientPgReadReqArgs *rargs=(ClientPgReadReqArgs *)(
argp->buff);
140 pathID =
static_cast<int>(rargs->
pathid);
142 IO.Flags =
static_cast<unsigned short>(rargs->
reqflags);
147 TRACEP(FSIO,pathID<<
" pgread "<<
IO.IOLen<<
'@'<<
IO.Offset
148 <<
" fn=" <<
IO.File->FileKey);
165 if (
IO.File->AsyncMode &&
IO.IOLen >= pgAioMin
166 &&
IO.Offset+
IO.IOLen <=
IO.File->Stats.fSize+pgAioHalf
170 XrdXrootdPgrwAio *aioP=0;
173 if (!pathID) pP =
this;
174 else {
if (!(pP =
VerifyStream(rc, pathID,
false)))
return rc;
184 XrdXrootdResponse TmpRsp;
190 {
if (!
IO.File->aioFob)
IO.File->aioFob =
new XrdXrootdAioFob;
199 if (pathID)
return do_Offload(&XrdXrootdProtocol::do_PgRIO, pathID);
214int XrdXrootdProtocol::do_PgRIO()
220 static const int maxCSSZ = maxIOVZ/2 - 1;
221 static const int maxPGRD = maxCSSZ*pgPageSize;
222 static const int infoLen =
sizeof(
kXR_int64);
224 struct pgReadResponse
225 {ServerResponseStatus rsp;
229 XrdSfsFile *sfsP =
IO.File->XrdSfsp;
230 uint64_t pgrOpts = 0;
231 int dlen, fLen, lLen, rc, xframt, Quantum;
232 uint32_t csVec[maxCSSZ];
233 struct iovec iov[maxIOVZ];
243 memset(pgrResp.rsp.bdy.reserved, 0,
sizeof(pgrResp.rsp.bdy.reserved));
248 int pgOff, rPages, rLen =
IO.IOLen;
254 if (rPages < Quantum) Quantum = rPages;
259 if (!
argp || Quantum < halfBSize || Quantum >
argp->bsize)
260 {
if ((rc = getBuff(1, Quantum)) <= 0)
return rc;}
262 if (
argp->bsize > maxPGRD) Quantum = maxPGRD;
268 int items = Quantum / pgPageSize;
274 uint32_t *csVP = csVec;
275 char *buff =
argp->buff;
276 int i = 1, n = items * 2;
278 {iov[i ].iov_base = csVP++;
279 iov[i++].iov_len =
sizeof(uint32_t);
280 iov[i ].iov_base = buff;
281 iov[i++].iov_len = pgPageSize;
289 if ((pgOff =
IO.Offset & pgPageMask))
290 {rLen = pgPageSize - pgOff;
291 buff =
argp->buff + pgOff;
292 iov[2].iov_base = buff;
293 iov[2].iov_len = rLen;
294 rLen += Quantum - pgPageSize;
299 if (
IO.IOLen < rLen) rLen =
IO.IOLen;
305 long long ioOffset =
IO.Offset;
306 do {
if ((xframt = sfsP->
pgRead(
IO.Offset, buff, rLen, csVec, pgrOpts)) <= 0)
310 iov[2].iov_len = fLen;
311 if (items > 1) iov[items<<1].iov_len = lLen;
313 if (xframt < rLen || xframt ==
IO.IOLen)
317 IO.IOLen -= xframt;
IO.Offset += xframt;
318 rLen = (
IO.IOLen < Quantum ?
IO.IOLen : Quantum);
321 for (
int i = 0; i < items; i++) csVec[i] = htonl(csVec[i]);
323 pgrResp.ofs = htonll(ioOffset);
328 dlen = xframt + (items *
sizeof(uint32_t));
329 if ((rc =
Response.Send(pgrResp.rsp, infoLen, iov, items*2+1, dlen)) < 0)
333 {iov[2].iov_base =
argp->buff;
334 iov[2].iov_len = pgPageSize;
339 ioOffset =
IO.Offset;
340 }
while(
IO.IOLen > 0);
344 if (xframt < 0)
return fsError(xframt, 0, sfsP->
error, 0, 0);
350 pgrResp.rsp.bdy.dlen = 0;
351 pgrResp.ofs = htonll(
IO.Offset);
352 return Response.Send(pgrResp.rsp, infoLen);
361int XrdXrootdProtocol::do_PgWrite()
363 XrdXrootdFHandle fh(
Request.pgwrite.fhandle);
370 n2hll(
Request.pgwrite.offset,
IO.Offset);
371 pathID =
Request.pgwrite.pathid;
372 IO.Flags =
static_cast<unsigned short>(
Request.pgwrite.reqflags);
378 return Link->setEtext(
"pgwrite protocol violation");
389 if (!
FTab || !(
IO.File =
FTab->Get(fh.handle)))
391 return do_WriteNone(pathID);
396 if (
IO.File->pgwFob == 0)
IO.File->pgwFob =
new XrdXrootdPgwFob(
IO.File);
400 TRACEP(FSIO, pathID<<
" pgwrite "
402 <<
IO.IOLen<<
'@'<<
IO.Offset<<
" fn=" <<
IO.File->FileKey);
418 if (pathID)
return do_Offload(&XrdXrootdProtocol::do_PgWIO, pathID);
422 return do_PgWIO(
true);
434bool XrdXrootdProtocol::do_PgWAIO(
int &rc)
436 XrdXrootdPgrwAio *aioP;
468int XrdXrootdProtocol::do_PgWIO() {
return do_PgWIO(
true);}
470int XrdXrootdProtocol::do_PgWIO(
bool isFresh)
473 XrdSfsFile *sfsP =
IO.File->XrdSfsp;
477 int n, rc, Quantum, iovLen, iovNum, csNum;
483 if (!
IO.File->pgwFob)
491 {
if (
IO.File->AsyncMode &&
IO.IOLen >= pgAioMin
493 && !isRetry && do_PgWAIO(rc))
return rc;
494 if (isRetry && !do_PgWIORetry(rc))
return rc;
495 if (!do_PgWIOSetup(
pgwCtl))
return -1;
503 {
if (!(ioV =
pgwCtl->FrameInfo(iovNum, iovLen)))
break;
505 if ((rc =
getData(
this,
"pgwrite", ioV, iovNum)))
return rc;
510 if (!(csVec =
pgwCtl->FrameInfo(csNum, buff, Quantum,
argp)))
515 for (
int i = 0; i < csNum; i++) csVec[i] = ntohl(csVec[i]);
519 XrdOucPgrwUtils::dataInfo dInfo(buff, csVec,
IO.Offset, Quantum);
532 if ((rc = sfsP->
pgWrite(
IO.Offset, buff, Quantum, csVec)) <= 0)
533 {
IO.EInfo[0] = rc;
IO.EInfo[1] = 0;
534 return do_WriteNone();
540 IO.File->pgwFob->delOffs(
IO.Offset, Quantum);
544 IO.Offset += Quantum;
547 }
while(
pgwCtl->Advance());
565bool XrdXrootdProtocol::do_PgWIORetry(
int &rc)
567 static const int csLen =
sizeof(
kXR_unt32);
574 if (
IO.Offset & pgPageMask)
575 {
int n = pgPageSize - (
IO.Offset & pgPageMask);
576 isBad =
IO.IOLen > (n + csLen);
577 }
else isBad =
IO.IOLen > pgUnitSize;
583 "pgwrite retry of more than one page not allowed");
590 if (!
IO.File->pgwFob->hasOffs(
IO.Offset,
IO.IOLen - csLen))
592 snprintf(buff,
sizeof(buff),
"retry %d@%lld",
IO.IOLen-csLen,
IO.Offset);
593 eDest.Emsg(
"pgwRetry", buff,
"not in error; fn=",
IO.File->FileKey);
594 IO.Flags &= ~XrdProto::kXR_pgRetry;
619 Quantum = (
IO.IOLen < pgPageSize ? pgPageSize :
IO.IOLen);
624 if (!
argp || Quantum < halfBSize || argp->bsize < Quantum
626 {
if (getBuff(0, Quantum) <= 0)
return -1;}
635 Link->setEtext(
"pgwrite protocol violation");
XrdOucTrace * XrdXrootdTrace
static bool csVer(dataInfo &dInfo, off_t &bado, int &badc)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static const uint64_t Verify
Options for pgRead() and pgWrite() as noted below.
virtual XrdSfsXferSize pgRead(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize rdlen, uint32_t *csvec, uint64_t opts=0)
virtual XrdSfsXferSize pgWrite(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize wrlen, uint32_t *csvec, uint64_t opts=0)
void pgUpdt(int wErrs, int wFixd, int wUnc)
void Read(long long offs, int dlen) override
static XrdXrootdPgrwAio * Alloc(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP, XrdXrootdPgwBadCS *bcsP=0)
int Write(long long offs, int dlen) override
static const int maxBSize
int numOffs(int *errs=0, int *fixs=0)
static XrdXrootdStats * SI
XrdXrootdProtocol * VerifyStream(int &rc, int pID, bool lok=true)
XrdXrootdProtocol * Stream[maxStreams]
XrdXrootdFileTable * FTab
static XrdSysError & eDest
int getData(gdCallBack *gdcbP, const char *dtype, char *buff, int blen)
XrdXrootdMonitor::User Monitor
XrdXrootdResponse Response
static const int maxStreams
static RAtomic_int srvrAioOps
static const int kXR_pgUnitSZ
static const int kXR_pgPageSZ
static const int kXR_pgRetry