XRootD
XrdClXRootDMsgHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClLog.hh"
27 #include "XrdCl/XrdClDefaultEnv.hh"
28 #include "XrdCl/XrdClConstants.hh"
30 #include "XrdCl/XrdClMessage.hh"
31 #include "XrdCl/XrdClURL.hh"
32 #include "XrdCl/XrdClUtils.hh"
34 #include "XrdCl/XrdClJobManager.hh"
35 #include "XrdCl/XrdClSIDManager.hh"
39 #include "XrdCl/XrdClSocket.hh"
40 #include "XrdCl/XrdClTls.hh"
41 #include "XrdCl/XrdClOptimizers.hh"
42 
43 #include "XrdOuc/XrdOucCRC.hh"
45 
46 #include "XrdSys/XrdSysPlatform.hh" // same as above
47 #include "XrdSys/XrdSysAtomics.hh"
48 #include "XrdSys/XrdSysPthread.hh"
49 #include <memory>
50 #include <sstream>
51 #include <numeric>
52 
53 namespace
54 {
55  //----------------------------------------------------------------------------
56  // We need an extra task what will run the handler in the future, because
57  // tasks get deleted and we need the handler
58  //----------------------------------------------------------------------------
59  class WaitTask: public XrdCl::Task
60  {
61  public:
62  WaitTask( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
63  {
64  std::ostringstream o;
65  o << "WaitTask for: 0x" << handler->GetRequest();
66  SetName( o.str() );
67  }
68 
69  virtual time_t Run( time_t now )
70  {
71  pHandler->WaitDone( now );
72  return 0;
73  }
74  private:
75  XrdCl::XRootDMsgHandler *pHandler;
76  };
77 }
78 
79 namespace XrdCl
80 {
81  //----------------------------------------------------------------------------
82  // Delegate the response handling to the thread-pool
83  //----------------------------------------------------------------------------
84  class HandleRspJob: public XrdCl::Job
85  {
86  public:
87  HandleRspJob( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
88  {
89 
90  }
91 
92  virtual ~HandleRspJob()
93  {
94 
95  }
96 
97  virtual void Run( void *arg )
98  {
99  pHandler->HandleResponse();
100  delete this;
101  }
102  private:
103  XrdCl::XRootDMsgHandler *pHandler;
104  };
105 
106  //----------------------------------------------------------------------------
107  // Examine an incoming message, and decide on the action to be taken
108  //----------------------------------------------------------------------------
109  uint16_t XRootDMsgHandler::Examine( std::shared_ptr<Message> &msg )
110  {
111  const int sst = pSendingState.fetch_or( kSawResp );
112 
113  if( !( sst & kSendDone ) && !( sst & kSawResp ) )
114  {
115  // we must have been sent although we haven't got the OnStatusReady
116  // notification yet. Set the inflight notice.
117 
118  Log *log = DefaultEnv::GetLog();
119  log->Dump( XRootDMsg, "[%s] Message %s reply received before notification "
120  "that it was sent, assuming it was sent ok.",
121  pUrl.GetHostId().c_str(),
122  pRequest->GetObfuscatedDescription().c_str() );
123 
124  pMsgInFly = true;
125  }
126 
127  //--------------------------------------------------------------------------
128  // if the MsgHandler is already being used to process another request
129  // (kXR_oksofar) we need to wait
130  //--------------------------------------------------------------------------
131  if( pOksofarAsAnswer )
132  {
133  XrdSysCondVarHelper lck( pCV );
134  while( pResponse ) pCV.Wait();
135  }
136  else
137  {
138  if( pResponse )
139  {
140  Log *log = DefaultEnv::GetLog();
141  log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
142  "it already owns a response: %p (message: %s ).",
143  pUrl.GetHostId().c_str(), (void*)this,
144  pRequest->GetObfuscatedDescription().c_str() );
145  }
146  }
147 
148  if( msg->GetSize() < 8 )
149  return Ignore;
150 
151  ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
152  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
153  uint16_t status = 0;
154  uint32_t dlen = 0;
155 
156  //--------------------------------------------------------------------------
157  // We only care about async responses, but those are extracted now
158  // in the SocketHandler.
159  //--------------------------------------------------------------------------
160  if( rsp->hdr.status == kXR_attn )
161  {
162  return Ignore;
163  }
164  //--------------------------------------------------------------------------
165  // We got a sync message - check if it belongs to us
166  //--------------------------------------------------------------------------
167  else
168  {
169  if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
170  rsp->hdr.streamid[1] != req->header.streamid[1] )
171  return Ignore;
172 
173  status = rsp->hdr.status;
174  dlen = rsp->hdr.dlen;
175  }
176 
177  //--------------------------------------------------------------------------
178  // We take the ownership of the message and decide what we will do
179  // with the handler itself, the options are:
180  // 1) we want to either read in raw mode (the Raw flag) or have the message
181  // body reconstructed for us by the TransportHandler by the time
182  // Process() is called (default, no extra flag)
183  // 2) we either got a full response in which case we don't want to be
184  // notified about anything anymore (RemoveHandler) or we got a partial
185  // answer and we need to wait for more (default, no extra flag)
186  //--------------------------------------------------------------------------
187  pResponse = msg;
188  pBodyReader->SetDataLength( dlen );
189 
190  Log *log = DefaultEnv::GetLog();
191  switch( status )
192  {
193  //------------------------------------------------------------------------
194  // Handle the cached cases
195  //------------------------------------------------------------------------
196  case kXR_error:
197  case kXR_redirect:
198  case kXR_wait:
199  return RemoveHandler;
200 
201  case kXR_waitresp:
202  {
203  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
204  "message %s", pUrl.GetHostId().c_str(),
205  pRequest->GetObfuscatedDescription().c_str() );
206 
207  pResponse.reset();
208  return Ignore; // This must be handled synchronously!
209  }
210 
211  //------------------------------------------------------------------------
212  // Handle the potential raw cases
213  //------------------------------------------------------------------------
214  case kXR_ok:
215  {
216  //----------------------------------------------------------------------
217  // For kXR_read we read in raw mode
218  //----------------------------------------------------------------------
219  uint16_t reqId = ntohs( req->header.requestid );
220  if( reqId == kXR_read )
221  {
222  return Raw | RemoveHandler;
223  }
224 
225  //----------------------------------------------------------------------
226  // kXR_readv is the same as kXR_read
227  //----------------------------------------------------------------------
228  if( reqId == kXR_readv )
229  {
230  return Raw | RemoveHandler;
231  }
232 
233  //----------------------------------------------------------------------
234  // For everything else we just take what we got
235  //----------------------------------------------------------------------
236  return RemoveHandler;
237  }
238 
239  //------------------------------------------------------------------------
240  // kXR_oksofars are special, they are not full responses, so we reset
241  // the response pointer to 0 and add the message to the partial list
242  //------------------------------------------------------------------------
243  case kXR_oksofar:
244  {
245  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
246  "%s", pUrl.GetHostId().c_str(),
247  pRequest->GetObfuscatedDescription().c_str() );
248 
249  if( !pOksofarAsAnswer )
250  {
251  pPartialResps.emplace_back( std::move( pResponse ) );
252  }
253 
254  //----------------------------------------------------------------------
255  // For kXR_read we either read in raw mode if the message has not
256  // been fully reconstructed already, if it has, we adjust
257  // the buffer offset to prepare for the next one
258  //----------------------------------------------------------------------
259  uint16_t reqId = ntohs( req->header.requestid );
260  if( reqId == kXR_read )
261  {
262  pTimeoutFence.store( true, std::memory_order_relaxed );
263  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
264  }
265 
266  //----------------------------------------------------------------------
267  // kXR_readv is similar to read, except that the payload is different
268  //----------------------------------------------------------------------
269  if( reqId == kXR_readv )
270  {
271  pTimeoutFence.store( true, std::memory_order_relaxed );
272  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
273  }
274 
275  return ( pOksofarAsAnswer ? None : NoProcess );
276  }
277 
278  case kXR_status:
279  {
280  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
281  "%s", pUrl.GetHostId().c_str(),
282  pRequest->GetObfuscatedDescription().c_str() );
283 
284  uint16_t reqId = ntohs( req->header.requestid );
285  if( reqId == kXR_pgwrite )
286  {
287  //--------------------------------------------------------------------
288  // In case of pgwrite by definition this wont be a partial response
289  // so we can already remove the handler from the in-queue
290  //--------------------------------------------------------------------
291  return RemoveHandler;
292  }
293 
294  //----------------------------------------------------------------------
295  // Otherwise (pgread), first of all we need to read the body of the
296  // kXR_status response, we can handle the raw data (if any) only after
297  // we have the whole kXR_status body
298  //----------------------------------------------------------------------
299  pTimeoutFence.store( true, std::memory_order_relaxed );
300  return None;
301  }
302 
303  //------------------------------------------------------------------------
304  // Default
305  //------------------------------------------------------------------------
306  default:
307  return RemoveHandler;
308  }
309  return RemoveHandler;
310  }
311 
312  //----------------------------------------------------------------------------
313  // Reexamine the incoming message, and decide on the action to be taken
314  //----------------------------------------------------------------------------
316  {
317  if( !pResponse )
318  return 0;
319 
320  Log *log = DefaultEnv::GetLog();
321  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
322 
323  //--------------------------------------------------------------------------
324  // Additional action is only required for kXR_status
325  //--------------------------------------------------------------------------
326  if( rsp->hdr.status != kXR_status ) return 0;
327 
328  //--------------------------------------------------------------------------
329  // Ignore malformed status response
330  //--------------------------------------------------------------------------
331  if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
332  {
333  log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
334  return Corrupted;
335  }
336 
337  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
338  uint16_t reqId = ntohs( req->header.requestid );
339  //--------------------------------------------------------------------------
340  // Unmarshal the status body
341  //--------------------------------------------------------------------------
342  XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
343 
344  if( !st.IsOK() && st.code == errDataError )
345  {
346  log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
347  st.GetErrorMessage().c_str() );
348  return Corrupted;
349  }
350 
351  if( !st.IsOK() )
352  {
353  log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
354  pUrl.GetHostId().c_str() );
355  pStatus = st;
356  HandleRspOrQueue();
357  return Ignore;
358  }
359 
360  //--------------------------------------------------------------------------
361  // Common handling for partial results
362  //--------------------------------------------------------------------------
363  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
365  {
366  pPartialResps.push_back( std::move( pResponse ) );
367  }
368 
369  //--------------------------------------------------------------------------
370  // Decide the actions that we need to take
371  //--------------------------------------------------------------------------
372  uint16_t action = 0;
373  if( reqId == kXR_pgread )
374  {
375  //----------------------------------------------------------------------
376  // The message contains only Status header and body but no raw data
377  //----------------------------------------------------------------------
378  if( !pPageReader )
379  pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
380  pPageReader->SetRsp( rspst );
381 
382  action |= Raw;
383 
385  action |= NoProcess;
386  else
387  action |= RemoveHandler;
388  }
389  else if( reqId == kXR_pgwrite )
390  {
391  // if data corruption has been detected on the server side we will
392  // send some additional data pointing to the pages that need to be
393  // retransmitted
394  if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
395  pResponse->GetCursor() )
396  action |= More;
397  }
398 
399  return action;
400  }
401 
402  //----------------------------------------------------------------------------
403  // Get handler sid
404  //----------------------------------------------------------------------------
405  uint16_t XRootDMsgHandler::GetSid() const
406  {
407  ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
408  return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
409  }
410 
411  //----------------------------------------------------------------------------
413  //----------------------------------------------------------------------------
415  {
416  Log *log = DefaultEnv::GetLog();
417 
418  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
419 
420  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
421 
422  //--------------------------------------------------------------------------
423  // If it is a local file, it can be only a metalink redirector
424  //--------------------------------------------------------------------------
425  if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
426  pHosts->back().protocol = kXR_PROTOCOLVERSION;
427 
428  //--------------------------------------------------------------------------
429  // We got an answer, check who we were talking to
430  //--------------------------------------------------------------------------
431  else
432  {
433  AnyObject qryResult;
434  int *qryResponse = nullptr;
435  pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
436  qryResult.Get( qryResponse );
437  if (qryResponse) {
438  pHosts->back().flags = *qryResponse;
439  delete qryResponse;
440  qryResponse = nullptr;
441  }
442  pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
443  qryResult.Get( qryResponse );
444  if (qryResponse) {
445  pHosts->back().protocol = *qryResponse;
446  delete qryResponse;
447  }
448  }
449 
450  //--------------------------------------------------------------------------
451  // Process the message
452  //--------------------------------------------------------------------------
453  Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
454  if( !st.IsOK() )
455  {
456  pStatus = Status( stFatal, errInvalidMessage );
457  HandleResponse();
458  return;
459  }
460 
461  //--------------------------------------------------------------------------
462  // we have an response for the message so it's not in fly anymore
463  //--------------------------------------------------------------------------
464  pMsgInFly = false;
465 
466  //--------------------------------------------------------------------------
467  // Reset the aggregated wait (used to omit wait response in case of Metalink
468  // redirector)
469  //--------------------------------------------------------------------------
470  if( rsp->hdr.status != kXR_wait )
471  pAggregatedWaitTime = 0;
472 
473  switch( rsp->hdr.status )
474  {
475  //------------------------------------------------------------------------
476  // kXR_ok - we're done here
477  //------------------------------------------------------------------------
478  case kXR_ok:
479  {
480  log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
481  pUrl.GetHostId().c_str(),
482  pRequest->GetObfuscatedDescription().c_str() );
483  pStatus = Status();
484  HandleResponse();
485  return;
486  }
487 
488  case kXR_status:
489  {
490  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
491  pUrl.GetHostId().c_str(),
492  pRequest->GetObfuscatedDescription().c_str() );
493  pStatus = Status();
494  HandleResponse();
495  return;
496  }
497 
498  //------------------------------------------------------------------------
499  // kXR_ok - we're serving partial result to the user
500  //------------------------------------------------------------------------
501  case kXR_oksofar:
502  {
503  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
504  pUrl.GetHostId().c_str(),
505  pRequest->GetObfuscatedDescription().c_str() );
506  pStatus = Status( stOK, suContinue );
507  HandleResponse();
508  return;
509  }
510 
511  //------------------------------------------------------------------------
512  // kXR_error - we've got a problem
513  //------------------------------------------------------------------------
514  case kXR_error:
515  {
516  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
517  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
518  log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
519  "[%d] %s", pUrl.GetHostId().c_str(),
520  pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
521  errmsg );
522  delete [] errmsg;
523 
524  HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
525  return;
526  }
527 
528  //------------------------------------------------------------------------
529  // kXR_redirect - they tell us to go elsewhere
530  //------------------------------------------------------------------------
531  case kXR_redirect:
532  {
533  if( rsp->hdr.dlen <= 4 )
534  {
535  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
536  pUrl.GetHostId().c_str() );
537  pStatus = Status( stError, errInvalidResponse );
538  HandleResponse();
539  return;
540  }
541 
542  char *urlInfoBuff = new char[rsp->hdr.dlen-3];
543  urlInfoBuff[rsp->hdr.dlen-4] = 0;
544  memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
545  std::string urlInfo = urlInfoBuff;
546  delete [] urlInfoBuff;
547  log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
548  "message %s: %s, port %d", pUrl.GetHostId().c_str(),
549  pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
550  rsp->body.redirect.port );
551 
552  //----------------------------------------------------------------------
553  // Check if we can proceed
554  //----------------------------------------------------------------------
555  if( !pRedirectCounter )
556  {
557  log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
558  "message %s, the last known error is: %s",
559  pUrl.GetHostId().c_str(),
560  pRequest->GetObfuscatedDescription().c_str(),
561  pLastError.ToString().c_str() );
562 
563 
564  pStatus = Status( stFatal, errRedirectLimit );
565  HandleResponse();
566  return;
567  }
568  --pRedirectCounter;
569 
570  //----------------------------------------------------------------------
571  // Keep the info about this server if we still need to find a load
572  // balancer
573  //----------------------------------------------------------------------
574  uint32_t flags = pHosts->back().flags;
575  if( !pHasLoadBalancer )
576  {
577  if( flags & kXR_isManager )
578  {
579  //------------------------------------------------------------------
580  // If the current server is a meta manager then it supersedes
581  // any existing load balancer, otherwise we assign a load-balancer
582  // only if it has not been already assigned
583  //------------------------------------------------------------------
584  if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
585  {
586  pLoadBalancer = pHosts->back();
587  log->Dump( XRootDMsg, "[%s] Current server has been assigned "
588  "as a load-balancer for message %s",
589  pUrl.GetHostId().c_str(),
590  pRequest->GetObfuscatedDescription().c_str() );
591  HostList::iterator it;
592  for( it = pHosts->begin(); it != pHosts->end(); ++it )
593  it->loadBalancer = false;
594  pHosts->back().loadBalancer = true;
595  }
596  }
597  }
598 
599  //----------------------------------------------------------------------
600  // If the redirect comes from a data server safe the URL because
601  // in case of a failure we will use it as the effective data server URL
602  // for the tried CGI opaque info
603  //----------------------------------------------------------------------
604  if( flags & kXR_isServer )
605  pEffectiveDataServerUrl = new URL( pHosts->back().url );
606 
607  //----------------------------------------------------------------------
608  // Build the URL and check it's validity
609  //----------------------------------------------------------------------
610  std::vector<std::string> urlComponents;
611  std::string newCgi;
612  Utils::splitString( urlComponents, urlInfo, "?" );
613 
614  std::ostringstream o;
615 
616  o << urlComponents[0];
617  if( rsp->body.redirect.port > 0 )
618  o << ":" << rsp->body.redirect.port << "/";
619  else if( rsp->body.redirect.port < 0 )
620  {
621  //--------------------------------------------------------------------
622  // check if the manager wants to enforce write recovery at himself
623  // (beware we are dealing here with negative flags)
624  //--------------------------------------------------------------------
625  if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
626  pHosts->back().flags |= kXR_recoverWrts;
627 
628  //--------------------------------------------------------------------
629  // check if the manager wants to collapse the communication channel
630  // (the redirect host is to replace the current host)
631  //--------------------------------------------------------------------
632  if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
633  {
634  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
635  pPostMaster->CollapseRedirect( pUrl, url );
636  }
637 
638  if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
639  {
640  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
641  if( Utils::CheckEC( pRequest, url ) )
642  pRedirectAsAnswer = true;
643  }
644  }
645 
646  URL newUrl = URL( o.str() );
647  if( !newUrl.IsValid() )
648  {
649  pStatus = Status( stError, errInvalidRedirectURL );
650  log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
651  pUrl.GetHostId().c_str(), urlInfo.c_str() );
652  HandleResponse();
653  return;
654  }
655 
656  if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
657  newUrl.SetUserName( pUrl.GetUserName() );
658 
659  if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
660  newUrl.SetPassword( pUrl.GetPassword() );
661 
662  //----------------------------------------------------------------------
663  // Forward any "xrd.*" params from the original client request also to
664  // the new redirection url
665  // Also, we need to preserve any "xrdcl.*' as they are important for
666  // our internal workflows.
667  //----------------------------------------------------------------------
668  std::ostringstream ossXrd;
669  const URL::ParamsMap &urlParams = pUrl.GetParams();
670 
671  for(URL::ParamsMap::const_iterator it = urlParams.begin();
672  it != urlParams.end(); ++it )
673  {
674  if( it->first.compare( 0, 4, "xrd." ) &&
675  it->first.compare( 0, 6, "xrdcl." ) )
676  continue;
677 
678  ossXrd << it->first << '=' << it->second << '&';
679  }
680 
681  std::string xrdCgi = ossXrd.str();
682  pRedirectUrl = newUrl.GetURL();
683 
684  URL cgiURL;
685  if( urlComponents.size() > 1 )
686  {
687  pRedirectUrl += "?";
688  pRedirectUrl += urlComponents[1];
689  std::ostringstream o;
690  o << "fake://fake:111//fake?";
691  o << urlComponents[1];
692 
693  if( urlComponents.size() == 3 )
694  o << '?' << urlComponents[2];
695 
696  if (!xrdCgi.empty())
697  {
698  o << '&' << xrdCgi;
699  pRedirectUrl += '&';
700  pRedirectUrl += xrdCgi;
701  }
702 
703  cgiURL = URL( o.str() );
704  }
705  else {
706  if (!xrdCgi.empty())
707  {
708  std::ostringstream o;
709  o << "fake://fake:111//fake?";
710  o << xrdCgi;
711  cgiURL = URL( o.str() );
712  pRedirectUrl += '?';
713  pRedirectUrl += xrdCgi;
714  }
715  }
716 
717  //----------------------------------------------------------------------
718  // Check if we need to return the URL as a response
719  //----------------------------------------------------------------------
720  if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
721  newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
722  !newUrl.IsLocalFile() )
723  pRedirectAsAnswer = true;
724 
725  if( pRedirectAsAnswer )
726  {
727  pStatus = Status( stError, errRedirect );
728  HandleResponse();
729  return;
730  }
731 
732  //----------------------------------------------------------------------
733  // Rewrite the message in a way required to send it to another server
734  //----------------------------------------------------------------------
735  newUrl.SetParams( cgiURL.GetParams() );
736  Status st = RewriteRequestRedirect( newUrl );
737  if( !st.IsOK() )
738  {
739  pStatus = st;
740  HandleResponse();
741  return;
742  }
743 
744  //----------------------------------------------------------------------
745  // Make sure we don't change the protocol by accident (root vs roots)
746  //----------------------------------------------------------------------
747  if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
748  ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
749  newUrl.SetProtocol( "roots" );
750 
751  //----------------------------------------------------------------------
752  // Send the request to the new location
753  //----------------------------------------------------------------------
754  HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
755  return;
756  }
757 
758  //------------------------------------------------------------------------
759  // kXR_wait - we wait, and re-issue the request later
760  //------------------------------------------------------------------------
761  case kXR_wait:
762  {
763  uint32_t waitSeconds = 0;
764 
765  if( rsp->hdr.dlen >= 4 )
766  {
767  char *infoMsg = new char[rsp->hdr.dlen-3];
768  infoMsg[rsp->hdr.dlen-4] = 0;
769  memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
770  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
771  "message %s: %s", pUrl.GetHostId().c_str(),
772  rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
773  infoMsg );
774  delete [] infoMsg;
775  waitSeconds = rsp->body.wait.seconds;
776  }
777  else
778  {
779  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
780  "message %s", pUrl.GetHostId().c_str(),
781  pRequest->GetObfuscatedDescription().c_str() );
782  }
783 
784  pAggregatedWaitTime += waitSeconds;
785 
786  // We need a special case if the data node comes from metalink
787  // redirector. In this case it might make more sense to try the
788  // next entry in the Metalink than wait.
789  if( OmitWait( *pRequest, pLoadBalancer.url ) )
790  {
791  int maxWait = DefaultMaxMetalinkWait;
792  DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
793  if( pAggregatedWaitTime > maxWait )
794  {
795  UpdateTriedCGI();
796  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
797  return;
798  }
799  }
800 
801  //----------------------------------------------------------------------
802  // Some messages require rewriting before they can be sent again
803  // after wait
804  //----------------------------------------------------------------------
805  Status st = RewriteRequestWait();
806  if( !st.IsOK() )
807  {
808  pStatus = st;
809  HandleResponse();
810  return;
811  }
812 
813  //----------------------------------------------------------------------
814  // Register a task to resend the message in some seconds, if we still
815  // have time to do that, and report a timeout otherwise
816  //----------------------------------------------------------------------
817  time_t resendTime = ::time(0)+waitSeconds;
818 
819  if( resendTime < pExpiration )
820  {
821  log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
822  pUrl.GetHostId().c_str(), (void*)this,
823  pRequest->GetObfuscatedDescription().c_str() );
824 
825  TaskManager *taskMgr = pPostMaster->GetTaskManager();
826  taskMgr->RegisterTask( new WaitTask( this ), resendTime );
827  }
828  else
829  {
830  log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
831  pUrl.GetHostId().c_str(),
832  pRequest->GetObfuscatedDescription().c_str() );
833  HandleError( Status( stError, errOperationExpired) );
834  }
835  return;
836  }
837 
838  //------------------------------------------------------------------------
839  // kXR_waitresp - the response will be returned in some seconds as an
840  // unsolicited message. Currently all messages of this type are handled
841  // one step before in the XrdClStream::OnIncoming as they need to be
842  // processed synchronously.
843  //------------------------------------------------------------------------
844  case kXR_waitresp:
845  {
846  if( rsp->hdr.dlen < 4 )
847  {
848  log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
849  pUrl.GetHostId().c_str() );
850  pStatus = Status( stError, errInvalidResponse );
851  HandleResponse();
852  return;
853  }
854 
855  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
856  "message %s", pUrl.GetHostId().c_str(),
857  rsp->body.waitresp.seconds,
858  pRequest->GetObfuscatedDescription().c_str() );
859  return;
860  }
861 
862  //------------------------------------------------------------------------
863  // Default - unrecognized/unsupported response, declare an error
864  //------------------------------------------------------------------------
865  default:
866  {
867  log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
868  "message %s", pUrl.GetHostId().c_str(),
869  rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
870  pStatus = Status( stError, errInvalidResponse );
871  HandleResponse();
872  return;
873  }
874  }
875 
876  return;
877  }
878 
879  //----------------------------------------------------------------------------
880  // Handle an event other that a message arrival - may be timeout
881  //----------------------------------------------------------------------------
883  XRootDStatus status )
884  {
885  Log *log = DefaultEnv::GetLog();
886  log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
887  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
888 
889  if( event == Ready )
890  return 0;
891 
892  if( pTimeoutFence.load( std::memory_order_relaxed ) )
893  return 0;
894 
895  HandleError( status );
896  return RemoveHandler;
897  }
898 
899  //----------------------------------------------------------------------------
900  // Read message body directly from a socket
901  //----------------------------------------------------------------------------
903  Socket *socket,
904  uint32_t &bytesRead )
905  {
906  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
907  uint16_t reqId = ntohs( req->header.requestid );
908 
909  if( reqId == kXR_pgread )
910  return pPageReader->Read( *socket, bytesRead );
911 
912  return pBodyReader->Read( *socket, bytesRead );
913  }
914 
915  //----------------------------------------------------------------------------
916  // We're here when we requested sending something over the wire
917  // or other status update on this action.
918  // We can be called when message is still in out-queue, with an
919  // error status indicating message will not be sent.
920  //----------------------------------------------------------------------------
922  XRootDStatus status )
923  {
924  Log *log = DefaultEnv::GetLog();
925 
926  const int sst = pSendingState.fetch_or( kSendDone );
927 
928  // ignore if we're already in this state
929  if( status.IsOK() && ( sst & kSendDone ) ) return;
930 
931  // if we have already seen a response we should be getting notified
932  // of a successful send. But if not, log and do our best to recover.
933  if( !status.IsOK() && ( ( sst & kFinalResp ) || ( sst & kSawResp ) ) )
934  {
935  log->Error( XRootDMsg, "[%s] Unexpected error for message %s. Trying to "
936  "recover.", pUrl.GetHostId().c_str(),
937  message->GetObfuscatedDescription().c_str() );
938  HandleError( status );
939  return;
940  }
941 
942  if( sst & kFinalResp )
943  {
944  log->Dump( XRootDMsg, "[%s] Got late notification that outgoing message %s was "
945  "sent, already have final response, queuing handler callback.",
946  pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
947  HandleRspOrQueue();
948  return;
949  }
950 
951  if( sst & kSawResp )
952  {
953  log->Dump( XRootDMsg, "[%s] Got late notification that message %s has "
954  "been successfully sent.",
955  pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
956  return;
957  }
958 
959  //--------------------------------------------------------------------------
960  // We were successful, so we now need to listen for a response
961  //--------------------------------------------------------------------------
962  if( status.IsOK() )
963  {
964  log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
965  pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
966 
967  pMsgInFly = true;
968  return;
969  }
970 
971  //--------------------------------------------------------------------------
972  // We have failed, recover if possible
973  //--------------------------------------------------------------------------
974  log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
975  "recover.", pUrl.GetHostId().c_str(),
976  message->GetObfuscatedDescription().c_str() );
977  HandleError( status );
978  }
979 
980  //----------------------------------------------------------------------------
981  // Are we a raw writer or not?
982  //----------------------------------------------------------------------------
984  {
985  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
986  uint16_t reqId = ntohs( req->header.requestid );
987  if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
988  return true;
989  // checkpoint + execute
990  if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
991  {
992  ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
993  reqId = ntohs( xeq->header.requestid );
994  return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
995  }
996 
997  return false;
998  }
999 
1000  //----------------------------------------------------------------------------
1001  // Write the message body
1002  //----------------------------------------------------------------------------
1004  uint32_t &bytesWritten )
1005  {
1006  //--------------------------------------------------------------------------
1007  // First check if it is a PgWrite
1008  //--------------------------------------------------------------------------
1009  if( !pChunkList->empty() && !pCrc32cDigests.empty() )
1010  {
1011  //------------------------------------------------------------------------
1012  // PgWrite will have just one chunk
1013  //------------------------------------------------------------------------
1014  ChunkInfo chunk = pChunkList->front();
1015  //------------------------------------------------------------------------
1016  // Calculate the size of the first and last page (in case the chunk is not
1017  // 4KB aligned)
1018  //------------------------------------------------------------------------
1019  int fLen = 0, lLen = 0;
1020  size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
1021 
1022  //------------------------------------------------------------------------
1023  // Set the crc32c buffer if not ready yet
1024  //------------------------------------------------------------------------
1025  if( pPgWrtCksumBuff.GetCursor() == 0 )
1026  {
1027  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1028  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1029  }
1030 
1031  uint32_t btsLeft = chunk.length - pAsyncOffset;
1032  uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1033  if( pglen > btsLeft ) pglen = btsLeft;
1034  char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
1035 
1036  while( btsLeft > 0 )
1037  {
1038  // first write the crc32c digest
1039  while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
1040  {
1041  uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
1042  char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
1043  int btswrt = 0;
1044  Status st = socket->Send( dgstbuf, dgstlen, btswrt );
1045  if( !st.IsOK() ) return st;
1046  bytesWritten += btswrt;
1047  pPgWrtCksumBuff.AdvanceCursor( btswrt );
1048  if( st.code == suRetry ) return st;
1049  }
1050  // then write the raw data (one page)
1051  int btswrt = 0;
1052  Status st = socket->Send( pgbuf, pglen, btswrt );
1053  if( !st.IsOK() ) return st;
1054  pgbuf += btswrt;
1055  pglen -= btswrt;
1056  btsLeft -= btswrt;
1057  bytesWritten += btswrt;
1058  pAsyncOffset += btswrt; // update the offset to the raw data
1059  if( st.code == suRetry ) return st;
1060  // if we managed to write all the data ...
1061  if( pglen == 0 )
1062  {
1063  // move to the next page
1064  ++pPgWrtCurrentPageNb;
1065  if( pPgWrtCurrentPageNb < nbpgs )
1066  {
1067  // set the digest buffer
1068  pPgWrtCksumBuff.SetCursor( 0 );
1069  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1070  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1071  }
1072  // set the page length
1073  pglen = XrdSys::PageSize;
1074  if( pglen > btsLeft ) pglen = btsLeft;
1075  // reset offset in the current page
1076  pPgWrtCurrentPageOffset = 0;
1077  }
1078  else
1079  // otherwise just adjust the offset in the current page
1080  pPgWrtCurrentPageOffset += btswrt;
1081 
1082  }
1083  }
1084  else if( !pChunkList->empty() )
1085  {
1086  size_t size = pChunkList->size();
1087  for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1088  {
1089  char *buffer = (char*)(*pChunkList)[i].buffer;
1090  uint32_t size = (*pChunkList)[i].length;
1091  size_t leftToBeWritten = size - pAsyncOffset;
1092 
1093  while( leftToBeWritten )
1094  {
1095  int btswrt = 0;
1096  Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1097  bytesWritten += btswrt;
1098  if( !st.IsOK() || st.code == suRetry ) return st;
1099  pAsyncOffset += btswrt;
1100  leftToBeWritten -= btswrt;
1101  }
1102  //----------------------------------------------------------------------
1103  // Remember that we have moved to the next chunk, also clear the offset
1104  // within the buffer as we are going to move to a new one
1105  //----------------------------------------------------------------------
1106  ++pAsyncChunkIndex;
1107  pAsyncOffset = 0;
1108  }
1109  }
1110  else
1111  {
1112  Log *log = DefaultEnv::GetLog();
1113 
1114  //------------------------------------------------------------------------
1115  // If the socket is encrypted we cannot use a kernel buffer, we have to
1116  // convert to user space buffer
1117  //------------------------------------------------------------------------
1118  if( socket->IsEncrypted() )
1119  {
1120  log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1121  pUrl.GetHostId().c_str() );
1122 
1123  char *ubuff = 0;
1124  ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1125  if( ret < 0 ) return Status( stError, errInternal );
1126  pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1127  return WriteMessageBody( socket, bytesWritten );
1128  }
1129 
1130  //------------------------------------------------------------------------
1131  // Send the data
1132  //------------------------------------------------------------------------
1133  while( !pKBuff->Empty() )
1134  {
1135  int btswrt = 0;
1136  Status st = socket->Send( *pKBuff, btswrt );
1137  bytesWritten += btswrt;
1138  if( !st.IsOK() || st.code == suRetry ) return st;
1139  }
1140 
1141  log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1142  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1143  }
1144 
1145  return Status();
1146  }
1147 
1148  //----------------------------------------------------------------------------
1149  // We're here when we got a time event. We needed to re-issue the request
1150  // in some time in the future, and that moment has arrived
1151  //----------------------------------------------------------------------------
1153  {
1154  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1155  }
1156 
1157  //----------------------------------------------------------------------------
1158  // Bookkeeping after partial response has been received.
1159  //----------------------------------------------------------------------------
1161  {
1162  pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1163  }
1164 
1165  //----------------------------------------------------------------------------
1166  // Unpack the message and call the response handler
1167  //----------------------------------------------------------------------------
1168  void XRootDMsgHandler::HandleResponse()
1169  {
1170  //--------------------------------------------------------------------------
1171  // Is it a final response?
1172  //--------------------------------------------------------------------------
1173  bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
1174  if( finalrsp )
1175  {
1176  // Do not do final processing of the response if we haven't had
1177  // confirmation the original request was sent (via OnStatusReady).
1178  // The final processing will be triggered when we get the confirm.
1179  const int sst = pSendingState.fetch_or( kFinalResp );
1180  if( ( sst & kSawReadySend ) && !( sst & kSendDone ) )
1181  return;
1182  }
1183 
1184  //--------------------------------------------------------------------------
1185  // Process the response and notify the listener
1186  //--------------------------------------------------------------------------
1188  XRootDStatus *status = ProcessStatus();
1189  AnyObject *response = 0;
1190 
1191  Log *log = DefaultEnv::GetLog();
1192  log->Debug( ExDbgMsg, "[%s] Calling MsgHandler: %p (message: %s ) "
1193  "with status: %s.",
1194  pUrl.GetHostId().c_str(), (void*)this,
1195  pRequest->GetObfuscatedDescription().c_str(),
1196  status->ToString().c_str() );
1197 
1198  if( status->IsOK() )
1199  {
1200  Status st = ParseResponse( response );
1201  if( !st.IsOK() )
1202  {
1203  delete status;
1204  delete response;
1205  status = new XRootDStatus( st );
1206  response = 0;
1207  }
1208  }
1209 
1210  //--------------------------------------------------------------------------
1211  // Close the redirect entry if necessary
1212  //--------------------------------------------------------------------------
1213  if( pRdirEntry )
1214  {
1215  pRdirEntry->status = *status;
1216  pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1217  }
1218 
1219  //--------------------------------------------------------------------------
1220  // Release the stream id
1221  //--------------------------------------------------------------------------
1222  if( pSidMgr && finalrsp )
1223  {
1224  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1225  if( status->IsOK() || !pMsgInFly ||
1226  !( status->code == errOperationExpired || status->code == errOperationInterrupted ) )
1227  pSidMgr->ReleaseSID( req->header.streamid );
1228  }
1229 
1230  HostList *hosts = pHosts.release();
1231  if( !finalrsp )
1232  pHosts.reset( new HostList( *hosts ) );
1233 
1234  pResponseHandler->HandleResponseWithHosts( status, response, hosts );
1235 
1236  //--------------------------------------------------------------------------
1237  // if it is the final response there is nothing more to do ...
1238  //--------------------------------------------------------------------------
1239  if( finalrsp )
1240  delete this;
1241  //--------------------------------------------------------------------------
1242  // on the other hand if it is not the final response, we have to keep the
1243  // MsgHandler and delete the current response
1244  //--------------------------------------------------------------------------
1245  else
1246  {
1247  XrdSysCondVarHelper lck( pCV );
1248  pResponse.reset();
1249  pTimeoutFence.store( false, std::memory_order_relaxed );
1250  pCV.Broadcast();
1251  }
1252  }
1253 
1254 
1255  //----------------------------------------------------------------------------
1256  // Extract the status information from the stuff that we got
1257  //----------------------------------------------------------------------------
1258  XRootDStatus *XRootDMsgHandler::ProcessStatus()
1259  {
1260  XRootDStatus *st = new XRootDStatus( pStatus );
1261  ServerResponse *rsp = 0;
1262  if( pResponse )
1263  rsp = (ServerResponse *)pResponse->GetBuffer();
1264 
1265  if( !pStatus.IsOK() && rsp )
1266  {
1267  if( pStatus.code == errErrorResponse )
1268  {
1269  st->errNo = rsp->body.error.errnum;
1270  // omit the last character as the string returned from the server
1271  // (acording to protocol specs) should be null-terminated
1272  std::string errmsg( rsp->body.error.errmsg, rsp->hdr.dlen-5 );
1273  if( st->errNo == kXR_noReplicas && !pLastError.IsOK() )
1274  errmsg += " Last seen error: " + pLastError.ToString();
1275  st->SetErrorMessage( errmsg );
1276  }
1277  else if( pStatus.code == errRedirect )
1278  st->SetErrorMessage( pRedirectUrl );
1279  }
1280  return st;
1281  }
1282 
1283  //------------------------------------------------------------------------
1284  // Parse the response and put it in an object that could be passed to
1285  // the user
1286  //------------------------------------------------------------------------
1287  Status XRootDMsgHandler::ParseResponse( AnyObject *&response )
1288  {
1289  if( !pResponse )
1290  return Status();
1291 
1292  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
1293  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1294  Log *log = DefaultEnv::GetLog();
1295 
1296  //--------------------------------------------------------------------------
1297  // Handle redirect as an answer
1298  //--------------------------------------------------------------------------
1299  if( rsp->hdr.status == kXR_redirect )
1300  {
1301  log->Error( XRootDMsg, "Internal Error: unable to process redirect" );
1302  return 0;
1303  }
1304 
1305  Buffer buff;
1306  uint32_t length = 0;
1307  char *buffer = 0;
1308 
1309  //--------------------------------------------------------------------------
1310  // We don't have any partial answers so pass what we have
1311  //--------------------------------------------------------------------------
1312  if( pPartialResps.empty() )
1313  {
1314  buffer = rsp->body.buffer.data;
1315  length = rsp->hdr.dlen;
1316  }
1317  //--------------------------------------------------------------------------
1318  // Partial answers, we need to glue them together before parsing
1319  //--------------------------------------------------------------------------
1320  else if( req->header.requestid != kXR_read &&
1321  req->header.requestid != kXR_readv )
1322  {
1323  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1324  {
1325  ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1326  length += part->hdr.dlen;
1327  }
1328  length += rsp->hdr.dlen;
1329 
1330  buff.Allocate( length );
1331  uint32_t offset = 0;
1332  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1333  {
1334  ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1335  buff.Append( part->body.buffer.data, part->hdr.dlen, offset );
1336  offset += part->hdr.dlen;
1337  }
1338  buff.Append( rsp->body.buffer.data, rsp->hdr.dlen, offset );
1339  buffer = buff.GetBuffer();
1340  }
1341 
1342  //--------------------------------------------------------------------------
1343  // Right, but what was the question?
1344  //--------------------------------------------------------------------------
1345  switch( req->header.requestid )
1346  {
1347  //------------------------------------------------------------------------
1348  // kXR_mv, kXR_truncate, kXR_rm, kXR_mkdir, kXR_rmdir, kXR_chmod,
1349  // kXR_ping, kXR_close, kXR_write, kXR_sync
1350  //------------------------------------------------------------------------
1351  case kXR_mv:
1352  case kXR_truncate:
1353  case kXR_rm:
1354  case kXR_mkdir:
1355  case kXR_rmdir:
1356  case kXR_chmod:
1357  case kXR_ping:
1358  case kXR_close:
1359  case kXR_write:
1360  case kXR_writev:
1361  case kXR_sync:
1362  case kXR_chkpoint:
1363  return Status();
1364 
1365  //------------------------------------------------------------------------
1366  // kXR_locate
1367  //------------------------------------------------------------------------
1368  case kXR_locate:
1369  {
1370  AnyObject *obj = new AnyObject();
1371 
1372  char *nullBuffer = new char[length+1];
1373  nullBuffer[length] = 0;
1374  memcpy( nullBuffer, buffer, length );
1375 
1376  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1377  "LocateInfo: %s", pUrl.GetHostId().c_str(),
1378  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1379  LocationInfo *data = new LocationInfo();
1380 
1381  if( data->ParseServerResponse( nullBuffer ) == false )
1382  {
1383  delete obj;
1384  delete data;
1385  delete [] nullBuffer;
1386  return Status( stError, errInvalidResponse );
1387  }
1388  delete [] nullBuffer;
1389 
1390  obj->Set( data );
1391  response = obj;
1392  return Status();
1393  }
1394 
1395  //------------------------------------------------------------------------
1396  // kXR_stat
1397  //------------------------------------------------------------------------
1398  case kXR_stat:
1399  {
1400  AnyObject *obj = new AnyObject();
1401 
1402  //----------------------------------------------------------------------
1403  // Virtual File System stat (kXR_vfs)
1404  //----------------------------------------------------------------------
1405  if( req->stat.options & kXR_vfs )
1406  {
1407  StatInfoVFS *data = new StatInfoVFS();
1408 
1409  char *nullBuffer = new char[length+1];
1410  nullBuffer[length] = 0;
1411  memcpy( nullBuffer, buffer, length );
1412 
1413  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1414  "StatInfoVFS: %s", pUrl.GetHostId().c_str(),
1415  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1416 
1417  if( data->ParseServerResponse( nullBuffer ) == false )
1418  {
1419  delete obj;
1420  delete data;
1421  delete [] nullBuffer;
1422  return Status( stError, errInvalidResponse );
1423  }
1424  delete [] nullBuffer;
1425 
1426  obj->Set( data );
1427  }
1428  //----------------------------------------------------------------------
1429  // Normal stat
1430  //----------------------------------------------------------------------
1431  else
1432  {
1433  StatInfo *data = new StatInfo();
1434 
1435  char *nullBuffer = new char[length+1];
1436  nullBuffer[length] = 0;
1437  memcpy( nullBuffer, buffer, length );
1438 
1439  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as StatInfo: "
1440  "%s", pUrl.GetHostId().c_str(),
1441  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1442 
1443  if( data->ParseServerResponse( nullBuffer ) == false )
1444  {
1445  delete obj;
1446  delete data;
1447  delete [] nullBuffer;
1448  return Status( stError, errInvalidResponse );
1449  }
1450  delete [] nullBuffer;
1451  obj->Set( data );
1452  }
1453 
1454  response = obj;
1455  return Status();
1456  }
1457 
1458  //------------------------------------------------------------------------
1459  // kXR_protocol
1460  //------------------------------------------------------------------------
1461  case kXR_protocol:
1462  {
1463  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ProtocolInfo",
1464  pUrl.GetHostId().c_str(),
1465  pRequest->GetObfuscatedDescription().c_str() );
1466 
1467  if( rsp->hdr.dlen < 8 )
1468  {
1469  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
1470  pUrl.GetHostId().c_str() );
1471  return Status( stError, errInvalidResponse );
1472  }
1473 
1474  AnyObject *obj = new AnyObject();
1475  ProtocolInfo *data = new ProtocolInfo( rsp->body.protocol.pval,
1476  rsp->body.protocol.flags );
1477  obj->Set( data );
1478  response = obj;
1479  return Status();
1480  }
1481 
1482  //------------------------------------------------------------------------
1483  // kXR_dirlist
1484  //------------------------------------------------------------------------
1485  case kXR_dirlist:
1486  {
1487  AnyObject *obj = new AnyObject();
1488  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1489  "DirectoryList", pUrl.GetHostId().c_str(),
1490  pRequest->GetObfuscatedDescription().c_str() );
1491 
1492  char *path = new char[req->dirlist.dlen+1];
1493  path[req->dirlist.dlen] = 0;
1494  memcpy( path, pRequest->GetBuffer(24), req->dirlist.dlen );
1495 
1496  DirectoryList *data = new DirectoryList();
1497  data->SetParentName( path );
1498  delete [] path;
1499 
1500  char *nullBuffer = new char[length+1];
1501  nullBuffer[length] = 0;
1502  memcpy( nullBuffer, buffer, length );
1503 
1504  bool invalidrsp = false;
1505 
1506  if( !pDirListStarted )
1507  {
1508  pDirListWithStat = DirectoryList::HasStatInfo( nullBuffer );
1509  pDirListStarted = true;
1510 
1511  invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer );
1512  }
1513  else
1514  invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer, pDirListWithStat );
1515 
1516  if( invalidrsp )
1517  {
1518  delete data;
1519  delete obj;
1520  delete [] nullBuffer;
1521  return Status( stError, errInvalidResponse );
1522  }
1523 
1524  delete [] nullBuffer;
1525  obj->Set( data );
1526  response = obj;
1527  return Status();
1528  }
1529 
1530  //------------------------------------------------------------------------
1531  // kXR_open - if we got the statistics, otherwise return 0
1532  //------------------------------------------------------------------------
1533  case kXR_open:
1534  {
1535  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as OpenInfo",
1536  pUrl.GetHostId().c_str(),
1537  pRequest->GetObfuscatedDescription().c_str() );
1538 
1539  if( rsp->hdr.dlen < 4 )
1540  {
1541  log->Error( XRootDMsg, "[%s] Got invalid open response.",
1542  pUrl.GetHostId().c_str() );
1543  return Status( stError, errInvalidResponse );
1544  }
1545 
1546  AnyObject *obj = new AnyObject();
1547  StatInfo *statInfo = 0;
1548 
1549  //----------------------------------------------------------------------
1550  // Handle StatInfo if requested
1551  //----------------------------------------------------------------------
1552  if( req->open.options & kXR_retstat )
1553  {
1554  log->Dump( XRootDMsg, "[%s] Parsing StatInfo in response to %s",
1555  pUrl.GetHostId().c_str(),
1556  pRequest->GetObfuscatedDescription().c_str() );
1557 
1558  if( rsp->hdr.dlen >= 12 )
1559  {
1560  char *nullBuffer = new char[rsp->hdr.dlen-11];
1561  nullBuffer[rsp->hdr.dlen-12] = 0;
1562  memcpy( nullBuffer, buffer+12, rsp->hdr.dlen-12 );
1563 
1564  statInfo = new StatInfo();
1565  if( statInfo->ParseServerResponse( nullBuffer ) == false )
1566  {
1567  delete statInfo;
1568  statInfo = 0;
1569  }
1570  delete [] nullBuffer;
1571  }
1572 
1573  if( rsp->hdr.dlen < 12 || !statInfo )
1574  {
1575  log->Error( XRootDMsg, "[%s] Unable to parse StatInfo in response "
1576  "to %s", pUrl.GetHostId().c_str(),
1577  pRequest->GetObfuscatedDescription().c_str() );
1578  delete obj;
1579  return Status( stError, errInvalidResponse );
1580  }
1581  }
1582 
1583  OpenInfo *data = new OpenInfo( (uint8_t*)buffer,
1584  pResponse->GetSessionId(),
1585  statInfo );
1586  obj->Set( data );
1587  response = obj;
1588  return Status();
1589  }
1590 
1591  //------------------------------------------------------------------------
1592  // kXR_read
1593  //------------------------------------------------------------------------
1594  case kXR_read:
1595  {
1596  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ChunkInfo",
1597  pUrl.GetHostId().c_str(),
1598  pRequest->GetObfuscatedDescription().c_str() );
1599 
1600  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1601  {
1602  //--------------------------------------------------------------------
1603  // we are expecting to have only the header in the message, the raw
1604  // data have been readout into the user buffer
1605  //--------------------------------------------------------------------
1606  if( pPartialResps[i]->GetSize() > 8 )
1607  return Status( stOK, errInternal );
1608  }
1609  //----------------------------------------------------------------------
1610  // we are expecting to have only the header in the message, the raw
1611  // data have been readout into the user buffer
1612  //----------------------------------------------------------------------
1613  if( pResponse->GetSize() > 8 )
1614  return Status( stOK, errInternal );
1615  //----------------------------------------------------------------------
1616  // Get the response for the end user
1617  //----------------------------------------------------------------------
1618  return pBodyReader->GetResponse( response );
1619  }
1620 
1621  //------------------------------------------------------------------------
1622  // kXR_pgread
1623  //------------------------------------------------------------------------
1624  case kXR_pgread:
1625  {
1626  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as PageInfo",
1627  pUrl.GetHostId().c_str(),
1628  pRequest->GetObfuscatedDescription().c_str() );
1629 
1630  //----------------------------------------------------------------------
1631  // Glue in the cached responses if necessary
1632  //----------------------------------------------------------------------
1633  ChunkInfo chunk = pChunkList->front();
1634  bool sizeMismatch = false;
1635  uint32_t currentOffset = 0;
1636  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1637  {
1638  ServerResponseV2 *part = (ServerResponseV2*)pPartialResps[i]->GetBuffer();
1639 
1640  //--------------------------------------------------------------------
1641  // the actual size of the raw data without the crc32c checksums
1642  //--------------------------------------------------------------------
1643  size_t datalen = part->status.bdy.dlen - NbPgPerRsp( part->info.pgread.offset,
1644  part->status.bdy.dlen ) * CksumSize;
1645 
1646  if( currentOffset + datalen > chunk.length )
1647  {
1648  sizeMismatch = true;
1649  break;
1650  }
1651 
1652  currentOffset += datalen;
1653  }
1654 
1655  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
1656  size_t datalen = rspst->status.bdy.dlen - NbPgPerRsp( rspst->info.pgread.offset,
1657  rspst->status.bdy.dlen ) * CksumSize;
1658  if( currentOffset + datalen <= chunk.length )
1659  currentOffset += datalen;
1660  else
1661  sizeMismatch = true;
1662 
1663  //----------------------------------------------------------------------
1664  // Overflow
1665  //----------------------------------------------------------------------
1666  if( pChunkStatus.front().sizeError || sizeMismatch )
1667  {
1668  log->Error( XRootDMsg, "[%s] Handling response to %s: user supplied "
1669  "buffer is too small for the received data.",
1670  pUrl.GetHostId().c_str(),
1671  pRequest->GetObfuscatedDescription().c_str() );
1672  return Status( stError, errInvalidResponse );
1673  }
1674 
1675  AnyObject *obj = new AnyObject();
1676  PageInfo *pgInfo = new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1677  std::move( pCrc32cDigests) );
1678 
1679  obj->Set( pgInfo );
1680  response = obj;
1681  return Status();
1682  }
1683 
1684  //------------------------------------------------------------------------
1685  // kXR_pgwrite
1686  //------------------------------------------------------------------------
1687  case kXR_pgwrite:
1688  {
1689  std::vector<std::tuple<uint64_t, uint32_t>> retries;
1690 
1691  ServerResponseV2 *rsp = (ServerResponseV2*)pResponse->GetBuffer();
1692  if( rsp->status.bdy.dlen > 0 )
1693  {
1694  ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) );
1695  size_t pgcnt = ( rsp->status.bdy.dlen - 8 ) / sizeof( kXR_int64 );
1696  retries.reserve( pgcnt );
1697  kXR_int64 *pgoffs = (kXR_int64*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) +
1698  sizeof( ServerResponseBody_pgWrCSE ) );
1699 
1700  for( size_t i = 0; i < pgcnt; ++i )
1701  {
1702  uint32_t len = XrdSys::PageSize;
1703  if( i == 0 ) len = cse->dlFirst;
1704  else if( i == pgcnt - 1 ) len = cse->dlLast;
1705  retries.push_back( std::make_tuple( pgoffs[i], len ) );
1706  }
1707  }
1708 
1709  RetryInfo *info = new RetryInfo( std::move( retries ) );
1710  AnyObject *obj = new AnyObject();
1711  obj->Set( info );
1712  response = obj;
1713 
1714  return Status();
1715  }
1716 
1717 
1718  //------------------------------------------------------------------------
1719  // kXR_readv - we need to pass the length of the buffer to the user code
1720  //------------------------------------------------------------------------
1721  case kXR_readv:
1722  {
1723  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1724  "VectorReadInfo", pUrl.GetHostId().c_str(),
1725  pRequest->GetObfuscatedDescription().c_str() );
1726 
1727  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1728  {
1729  //--------------------------------------------------------------------
1730  // we are expecting to have only the header in the message, the raw
1731  // data have been readout into the user buffer
1732  //--------------------------------------------------------------------
1733  if( pPartialResps[i]->GetSize() > 8 )
1734  return Status( stOK, errInternal );
1735  }
1736  //----------------------------------------------------------------------
1737  // we are expecting to have only the header in the message, the raw
1738  // data have been readout into the user buffer
1739  //----------------------------------------------------------------------
1740  if( pResponse->GetSize() > 8 )
1741  return Status( stOK, errInternal );
1742  //----------------------------------------------------------------------
1743  // Get the response for the end user
1744  //----------------------------------------------------------------------
1745  return pBodyReader->GetResponse( response );
1746  }
1747 
1748  //------------------------------------------------------------------------
1749  // kXR_fattr
1750  //------------------------------------------------------------------------
1751  case kXR_fattr:
1752  {
1753  int len = rsp->hdr.dlen;
1754  char* data = rsp->body.buffer.data;
1755 
1756  return ParseXAttrResponse( data, len, response );
1757  }
1758 
1759  //------------------------------------------------------------------------
1760  // kXR_query
1761  //------------------------------------------------------------------------
1762  case kXR_query:
1763  case kXR_set:
1764  case kXR_prepare:
1765  default:
1766  {
1767  AnyObject *obj = new AnyObject();
1768  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as BinaryData",
1769  pUrl.GetHostId().c_str(),
1770  pRequest->GetObfuscatedDescription().c_str() );
1771 
1772  BinaryDataInfo *data = new BinaryDataInfo();
1773  data->Allocate( length );
1774  data->Append( buffer, length );
1775  obj->Set( data );
1776  response = obj;
1777  return Status();
1778  }
1779  };
1780  return Status( stError, errInvalidMessage );
1781  }
1782 
1783  //------------------------------------------------------------------------
1784  // Parse the response to kXR_fattr request and put it in an object that
1785  // could be passed to the user
1786  //------------------------------------------------------------------------
1787  Status XRootDMsgHandler::ParseXAttrResponse( char *data, size_t len,
1788  AnyObject *&response )
1789  {
1790  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1791 // Log *log = DefaultEnv::GetLog(); //TODO
1792 
1793  switch( req->fattr.subcode )
1794  {
1795  case kXR_fattrDel:
1796  case kXR_fattrSet:
1797  {
1798  Status status;
1799 
1800  kXR_char nerrs = 0;
1801  if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1802  return status;
1803 
1804  kXR_char nattr = 0;
1805  if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1806  return status;
1807 
1808  std::vector<XAttrStatus> resp;
1809  // read the namevec
1810  for( kXR_char i = 0; i < nattr; ++i )
1811  {
1812  kXR_unt16 rc = 0;
1813  if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1814  return status;
1815  rc = ntohs( rc );
1816 
1817  // count errors
1818  if( rc ) --nerrs;
1819 
1820  std::string name;
1821  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1822  return status;
1823 
1824  XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1825  XRootDStatus();
1826  resp.push_back( XAttrStatus( name, st ) );
1827  }
1828 
1829  // check if we read all the data and if the error count is OK
1830  if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1831 
1832  // set up the response object
1833  response = new AnyObject();
1834  response->Set( new std::vector<XAttrStatus>( std::move( resp ) ) );
1835 
1836  return Status();
1837  }
1838 
1839  case kXR_fattrGet:
1840  {
1841  Status status;
1842 
1843  kXR_char nerrs = 0;
1844  if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1845  return status;
1846 
1847  kXR_char nattr = 0;
1848  if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1849  return status;
1850 
1851  std::vector<XAttr> resp;
1852  resp.reserve( nattr );
1853 
1854  // read the name vec
1855  for( kXR_char i = 0; i < nattr; ++i )
1856  {
1857  kXR_unt16 rc = 0;
1858  if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1859  return status;
1860  rc = ntohs( rc );
1861 
1862  // count errors
1863  if( rc ) --nerrs;
1864 
1865  std::string name;
1866  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1867  return status;
1868 
1869  XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1870  XRootDStatus();
1871  resp.push_back( XAttr( name, st ) );
1872  }
1873 
1874  // read the value vec
1875  for( kXR_char i = 0; i < nattr; ++i )
1876  {
1877  kXR_int32 vlen = 0;
1878  if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1879  return status;
1880  vlen = ntohl( vlen );
1881 
1882  std::string value;
1883  if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1884  return status;
1885 
1886  resp[i].value.swap( value );
1887  }
1888 
1889  // check if we read all the data and if the error count is OK
1890  if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1891 
1892  // set up the response object
1893  response = new AnyObject();
1894  response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1895 
1896  return Status();
1897  }
1898 
1899  case kXR_fattrList:
1900  {
1901  Status status;
1902  std::vector<XAttr> resp;
1903 
1904  while( len > 0 )
1905  {
1906  std::string name;
1907  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1908  return status;
1909 
1910  kXR_int32 vlen = 0;
1911  if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1912  return status;
1913  vlen = ntohl( vlen );
1914 
1915  std::string value;
1916  if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1917  return status;
1918 
1919  resp.push_back( XAttr( name, value ) );
1920  }
1921 
1922  // set up the response object
1923  response = new AnyObject();
1924  response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1925 
1926  return Status();
1927  }
1928 
1929  default:
1930  return Status( stError, errDataError );
1931  }
1932  }
1933 
1934  //----------------------------------------------------------------------------
1935  // Perform the changes to the original request needed by the redirect
1936  // procedure - allocate new streamid, append redirection data and such
1937  //----------------------------------------------------------------------------
1938  Status XRootDMsgHandler::RewriteRequestRedirect( const URL &newUrl )
1939  {
1940  Log *log = DefaultEnv::GetLog();
1941 
1942  Status st;
1943  // Append any "xrd.*" parameters present in newCgi so that any authentication
1944  // requirements are properly enforced
1945  const URL::ParamsMap &newCgi = newUrl.GetParams();
1946  std::string xrdCgi = "";
1947  std::ostringstream ossXrd;
1948  for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1949  {
1950  if( it->first.compare( 0, 4, "xrd." ) )
1951  continue;
1952  ossXrd << it->first << '=' << it->second << '&';
1953  }
1954 
1955  xrdCgi = ossXrd.str();
1956  // Redirection URL containing also any original xrd.* opaque parameters
1957  XrdCl::URL authUrl;
1958 
1959  if (xrdCgi.empty())
1960  {
1961  authUrl = newUrl;
1962  }
1963  else
1964  {
1965  std::string surl = newUrl.GetURL();
1966  (surl.find('?') == std::string::npos) ? (surl += '?') :
1967  ((*surl.rbegin() != '&') ? (surl += '&') : (surl += ""));
1968  surl += xrdCgi;
1969  if (!authUrl.FromString(surl))
1970  {
1971  std::string surlLog = surl;
1972  if( unlikely( log->GetLevel() >= Log::ErrorMsg ) ) {
1973  surlLog = obfuscateAuth(surlLog);
1974  }
1975  log->Error( XRootDMsg, "[%s] Failed to build redirection URL from data: %s",
1976  newUrl.GetHostId().c_str(), surl.c_str());
1977  return Status(stError, errInvalidRedirectURL);
1978  }
1979  }
1980 
1981  //--------------------------------------------------------------------------
1982  // Rewrite particular requests
1983  //--------------------------------------------------------------------------
1985  MessageUtils::RewriteCGIAndPath( pRequest, newCgi, true, newUrl.GetPath() );
1987  return Status();
1988  }
1989 
1990  //----------------------------------------------------------------------------
1991  // Some requests need to be rewritten also after getting kXR_wait
1992  //----------------------------------------------------------------------------
1993  Status XRootDMsgHandler::RewriteRequestWait()
1994  {
1995  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1996 
1998 
1999  //------------------------------------------------------------------------
2000  // For kXR_locate and kXR_open request the kXR_refresh bit needs to be
2001  // turned off after wait
2002  //------------------------------------------------------------------------
2003  switch( req->header.requestid )
2004  {
2005  case kXR_locate:
2006  {
2007  uint16_t refresh = kXR_refresh;
2008  req->locate.options &= (~refresh);
2009  break;
2010  }
2011 
2012  case kXR_open:
2013  {
2014  uint16_t refresh = kXR_refresh;
2015  req->locate.options &= (~refresh);
2016  break;
2017  }
2018  }
2019 
2020  XRootDTransport::SetDescription( pRequest );
2022  return Status();
2023  }
2024 
2025  //----------------------------------------------------------------------------
2026  // Recover error
2027  //----------------------------------------------------------------------------
2028  void XRootDMsgHandler::HandleError( XRootDStatus status )
2029  {
2030  //--------------------------------------------------------------------------
2031  // If there was no error then do nothing
2032  //--------------------------------------------------------------------------
2033  if( status.IsOK() )
2034  return;
2035 
2036  if( pSidMgr && pMsgInFly && (
2037  status.code == errOperationExpired ||
2038  status.code == errOperationInterrupted ) )
2039  {
2040  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2041  pSidMgr->TimeOutSID( req->header.streamid );
2042  }
2043 
2044  bool noreplicas = ( status.code == errErrorResponse &&
2045  status.errNo == kXR_noReplicas );
2046 
2047  if( !noreplicas ) pLastError = status;
2048 
2049  Log *log = DefaultEnv::GetLog();
2050  log->Debug( XRootDMsg, "[%s] Handling error while processing %s: %s.",
2051  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str(),
2052  status.ToString().c_str() );
2053 
2054  //--------------------------------------------------------------------------
2055  // Check if it is a fatal TLS error that has been marked as potentially
2056  // recoverable, if yes check if we can downgrade from fatal to error.
2057  //--------------------------------------------------------------------------
2058  if( status.IsFatal() && status.code == errTlsError && status.errNo == EAGAIN )
2059  {
2060  if( pSslErrCnt < MaxSslErrRetry )
2061  {
2062  status.status &= ~stFatal; // switch off fatal&error bits
2063  status.status |= stError; // switch on error bit
2064  }
2065  ++pSslErrCnt; // count number of consecutive SSL errors
2066  }
2067  else
2068  pSslErrCnt = 0;
2069 
2070  //--------------------------------------------------------------------------
2071  // We have got an error message, we can recover it at the load balancer if:
2072  // 1) we haven't got it from the load balancer
2073  // 2) we have a load balancer assigned
2074  // 3) the error is either one of: kXR_FSError, kXR_IOError, kXR_ServerError,
2075  // kXR_NotFound
2076  // 4) in the case of kXR_NotFound a kXR_refresh flags needs to be set
2077  //--------------------------------------------------------------------------
2078  if( status.code == errErrorResponse )
2079  {
2080  if( RetriableErrorResponse( status ) )
2081  {
2082  UpdateTriedCGI(status.errNo);
2083  if( status.errNo == kXR_NotFound || status.errNo == kXR_Overloaded )
2084  SwitchOnRefreshFlag();
2085  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2086  return;
2087  }
2088  else
2089  {
2090  pStatus = status;
2091  HandleRspOrQueue();
2092  return;
2093  }
2094  }
2095 
2096  //--------------------------------------------------------------------------
2097  // Nothing can be done if:
2098  // 1) a user timeout has occurred
2099  // 2) has a non-zero session id
2100  // 3) if another error occurred and the validity of the message expired
2101  //--------------------------------------------------------------------------
2102  if( status.code == errOperationExpired || pRequest->GetSessionId() ||
2103  status.code == errOperationInterrupted || time(0) >= pExpiration )
2104  {
2105  log->Error( XRootDMsg, "[%s] Unable to get the response to request %s",
2106  pUrl.GetHostId().c_str(),
2107  pRequest->GetObfuscatedDescription().c_str() );
2108  pStatus = status;
2109  HandleRspOrQueue();
2110  return;
2111  }
2112 
2113  //--------------------------------------------------------------------------
2114  // At this point we're left with connection errors, we recover them
2115  // at a load balancer if we have one and if not on the current server
2116  // until we get a response, an unrecoverable error or a timeout
2117  //--------------------------------------------------------------------------
2118  if( pLoadBalancer.url.IsValid() &&
2119  pLoadBalancer.url.GetLocation() != pUrl.GetLocation() )
2120  {
2121  UpdateTriedCGI( kXR_ServerError );
2122  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2123  return;
2124  }
2125  else
2126  {
2127  if( !status.IsFatal() && IsRetriable() )
2128  {
2129  log->Info( XRootDMsg, "[%s] Retrying request: %s.",
2130  pUrl.GetHostId().c_str(),
2131  pRequest->GetObfuscatedDescription().c_str() );
2132 
2133  UpdateTriedCGI( kXR_ServerError );
2134  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryRetry ) );
2135  return;
2136  }
2137  pStatus = status;
2138  HandleRspOrQueue();
2139  return;
2140  }
2141  }
2142 
2143  //----------------------------------------------------------------------------
2144  // Retry the message at another server
2145  //----------------------------------------------------------------------------
2146  Status XRootDMsgHandler::RetryAtServer( const URL &url, RedirectEntry::Type entryType )
2147  {
2148  // prepare to possibly be requeued in the out-queue for a different channel,
2149  // so reset sendingstate.
2150  pSendingState = 0;
2151 
2152  pResponse.reset();
2153  Log *log = DefaultEnv::GetLog();
2154 
2155  //--------------------------------------------------------------------------
2156  // Set up a redirect entry
2157  //--------------------------------------------------------------------------
2158  if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2159  pRdirEntry.reset( new RedirectEntry( pUrl.GetLocation(), url.GetLocation(), entryType ) );
2160 
2161  if( pUrl.GetLocation() != url.GetLocation() )
2162  {
2163  pHosts->push_back( url );
2164 
2165  //------------------------------------------------------------------------
2166  // Assign a new stream id to the message
2167  //------------------------------------------------------------------------
2168 
2169  // first release the old stream id
2170  // (though it could be a redirect from a local
2171  // metalink file, in this case there's no SID)
2172  ClientRequestHdr *req = (ClientRequestHdr*)pRequest->GetBuffer();
2173  if( pSidMgr )
2174  {
2175  pSidMgr->ReleaseSID( req->streamid );
2176  pSidMgr.reset();
2177  }
2178 
2179  // then get the new SIDManager
2180  // (again this could be a redirect to a local
2181  // file and in this case there is no SID)
2182  if( !url.IsLocalFile() )
2183  {
2184  pSidMgr = SIDMgrPool::Instance().GetSIDMgr( url );
2185  Status st = pSidMgr->AllocateSID( req->streamid );
2186  if( !st.IsOK() )
2187  {
2188  log->Error( XRootDMsg, "[%s] Impossible to send message %s.",
2189  pUrl.GetHostId().c_str(),
2190  pRequest->GetObfuscatedDescription().c_str() );
2191  return st;
2192  }
2193  }
2194 
2195  pUrl = url;
2196  }
2197 
2198  if( pUrl.IsMetalink() && pFollowMetalink )
2199  {
2200  log->Debug( ExDbgMsg, "[%s] Metaling redirection for MsgHandler: %p (message: %s ).",
2201  pUrl.GetHostId().c_str(), (void*)this,
2202  pRequest->GetObfuscatedDescription().c_str() );
2203 
2204  return pPostMaster->Redirect( pUrl, pRequest, this );
2205  }
2206  else if( pUrl.IsLocalFile() )
2207  {
2208  HandleLocalRedirect( &pUrl );
2209  return Status();
2210  }
2211  else
2212  {
2213  log->Debug( ExDbgMsg, "[%s] Retry at server MsgHandler: %p (message: %s ).",
2214  pUrl.GetHostId().c_str(), (void*)this,
2215  pRequest->GetObfuscatedDescription().c_str() );
2216  return pPostMaster->Send( pUrl, pRequest, this, true, pExpiration );
2217  }
2218  }
2219 
2220  //----------------------------------------------------------------------------
2221  // Update the "tried=" part of the CGI of the current message
2222  //----------------------------------------------------------------------------
2223  void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2224  {
2225  URL::ParamsMap cgi;
2226  std::string tried;
2227 
2228  //--------------------------------------------------------------------------
2229  // In case a data server responded with a kXR_redirect and we fail at the
2230  // node where we were redirected to, the original data server should be
2231  // included in the tried CGI opaque info (instead of the current one).
2232  //--------------------------------------------------------------------------
2233  if( pEffectiveDataServerUrl )
2234  {
2235  tried = pEffectiveDataServerUrl->GetHostName();
2236  delete pEffectiveDataServerUrl;
2237  pEffectiveDataServerUrl = 0;
2238  }
2239  //--------------------------------------------------------------------------
2240  // Otherwise use the current URL.
2241  //--------------------------------------------------------------------------
2242  else
2243  tried = pUrl.GetHostName();
2244 
2245  // Report the reason for the failure to the next location
2246  //
2247  if (errNo)
2248  { if (errNo == kXR_NotFound) cgi["triedrc"] = "enoent";
2249  else if (errNo == kXR_IOError) cgi["triedrc"] = "ioerr";
2250  else if (errNo == kXR_FSError) cgi["triedrc"] = "fserr";
2251  else if (errNo == kXR_ServerError) cgi["triedrc"] = "srverr";
2252  }
2253 
2254  //--------------------------------------------------------------------------
2255  // If our current load balancer is a metamanager and we failed either
2256  // at a diskserver or at an unidentified node we also exclude the last
2257  // known manager
2258  //--------------------------------------------------------------------------
2259  if( pLoadBalancer.url.IsValid() && (pLoadBalancer.flags & kXR_attrMeta) )
2260  {
2261  HostList::reverse_iterator it;
2262  for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2263  {
2264  if( it->loadBalancer )
2265  break;
2266 
2267  tried += "," + it->url.GetHostName();
2268 
2269  if( it->flags & kXR_isManager )
2270  break;
2271  }
2272  }
2273 
2274  cgi["tried"] = tried;
2276  MessageUtils::RewriteCGIAndPath( pRequest, cgi, false, "" );
2278  }
2279 
2280  //----------------------------------------------------------------------------
2281  // Switch on the refresh flag for some requests
2282  //----------------------------------------------------------------------------
2283  void XRootDMsgHandler::SwitchOnRefreshFlag()
2284  {
2286  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2287  switch( req->header.requestid )
2288  {
2289  case kXR_locate:
2290  {
2291  req->locate.options |= kXR_refresh;
2292  break;
2293  }
2294 
2295  case kXR_open:
2296  {
2297  req->locate.options |= kXR_refresh;
2298  break;
2299  }
2300  }
2301  XRootDTransport::SetDescription( pRequest );
2303  }
2304 
2305  //------------------------------------------------------------------------
2306  // If the current thread is a worker thread from our thread-pool
2307  // handle the response, otherwise submit a new task to the thread-pool
2308  //------------------------------------------------------------------------
2309  void XRootDMsgHandler::HandleRspOrQueue()
2310  {
2311  //--------------------------------------------------------------------------
2312  // Is it a final response?
2313  //--------------------------------------------------------------------------
2314  bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
2315  if( finalrsp )
2316  {
2317  // Do not do final processing of the response if we haven't had
2318  // confirmation the original request was sent (via OnStatusReady).
2319  // The final processing will be triggered when we get the confirm.
2320  const int sst = pSendingState.fetch_or( kFinalResp );
2321  if( ( sst & kSawReadySend ) && !( sst & kSendDone ) )
2322  return;
2323  }
2324 
2325  JobManager *jobMgr = pPostMaster->GetJobManager();
2326  if( jobMgr->IsWorker() )
2327  HandleResponse();
2328  else
2329  {
2330  Log *log = DefaultEnv::GetLog();
2331  log->Debug( ExDbgMsg, "[%s] Passing to the thread-pool MsgHandler: %p (message: %s ).",
2332  pUrl.GetHostId().c_str(), (void*)this,
2333  pRequest->GetObfuscatedDescription().c_str() );
2334  jobMgr->QueueJob( new HandleRspJob( this ), 0 );
2335  }
2336  }
2337 
2338  //------------------------------------------------------------------------
2339  // Notify the FileStateHandler to retry Open() with new URL
2340  //------------------------------------------------------------------------
2341  void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2342  {
2343  Log *log = DefaultEnv::GetLog();
2344  log->Debug( ExDbgMsg, "[%s] Handling local redirect - MsgHandler: %p (message: %s ).",
2345  pUrl.GetHostId().c_str(), (void*)this,
2346  pRequest->GetObfuscatedDescription().c_str() );
2347 
2348  if( !pLFileHandler )
2349  {
2350  HandleError( XRootDStatus( stFatal, errNotSupported ) );
2351  return;
2352  }
2353 
2354  AnyObject *resp = 0;
2355  pLFileHandler->SetHostList( *pHosts );
2356  XRootDStatus st = pLFileHandler->Open( url, pRequest, resp );
2357  if( !st.IsOK() )
2358  {
2359  HandleError( st );
2360  return;
2361  }
2362 
2363  pResponseHandler->HandleResponseWithHosts( new XRootDStatus(),
2364  resp,
2365  pHosts.release() );
2366  delete this;
2367 
2368  return;
2369  }
2370 
2371  //------------------------------------------------------------------------
2372  // Check if it is OK to retry this request
2373  //------------------------------------------------------------------------
2374  bool XRootDMsgHandler::IsRetriable()
2375  {
2376  std::string value;
2377  DefaultEnv::GetEnv()->GetString( "OpenRecovery", value );
2378  if( value == "true" ) return true;
2379 
2380  // check if it is a mutable open (open + truncate or open + create)
2381  ClientRequest *req = reinterpret_cast<ClientRequest*>( pRequest->GetBuffer() );
2382  if( req->header.requestid == htons( kXR_open ) )
2383  {
2384  bool _mutable = ( req->open.options & htons( kXR_delete ) ) ||
2385  ( req->open.options & htons( kXR_new ) );
2386 
2387  if( _mutable )
2388  {
2389  Log *log = DefaultEnv::GetLog();
2390  log->Debug( XRootDMsg,
2391  "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2392  pUrl.GetHostId().c_str(),
2393  pRequest->GetObfuscatedDescription().c_str() );
2394  // disallow retry if it is a mutable open
2395  return false;
2396  }
2397  }
2398 
2399  return true;
2400  }
2401 
2402  //------------------------------------------------------------------------
2403  // Check if for given request and Metalink redirector it is OK to omit
2404  // the kXR_wait and proceed straight to the next entry in the Metalink file
2405  //------------------------------------------------------------------------
2406  bool XRootDMsgHandler::OmitWait( Message &request, const URL &url )
2407  {
2408  // we can omit kXR_wait only if we have a Metalink redirector
2409  if( !url.IsMetalink() )
2410  return false;
2411 
2412  // we can omit kXR_wait only for requests that can be redirected
2413  // (kXR_read is the only stateful request that can be redirected)
2414  ClientRequest *req = reinterpret_cast<ClientRequest*>( request.GetBuffer() );
2415  if( pStateful && req->header.requestid != kXR_read )
2416  return false;
2417 
2418  // we can only omit kXR_wait if the Metalink redirect has more
2419  // replicas
2420  RedirectorRegistry &registry = RedirectorRegistry::Instance();
2421  VirtualRedirector *redirector = registry.Get( url );
2422 
2423  // we need more than one server as the current one is not reflected
2424  // in tried CGI
2425  if( redirector->Count( request ) > 1 )
2426  return true;
2427 
2428  return false;
2429  }
2430 
2431  //------------------------------------------------------------------------
2432  // Checks if the given error returned by server is retriable.
2433  //------------------------------------------------------------------------
2434  bool XRootDMsgHandler::RetriableErrorResponse( const Status &status )
2435  {
2436  // we can only retry error response if we have a valid load-balancer and
2437  // it is not our current URL
2438  if( !( pLoadBalancer.url.IsValid() &&
2439  pUrl.GetLocation() != pLoadBalancer.url.GetLocation() ) )
2440  return false;
2441 
2442  // following errors are retriable at any load-balancer
2443  if( status.errNo == kXR_FSError || status.errNo == kXR_IOError ||
2444  status.errNo == kXR_ServerError || status.errNo == kXR_NotFound ||
2445  status.errNo == kXR_Overloaded || status.errNo == kXR_NoMemory )
2446  return true;
2447 
2448  // check if the load-balancer is a meta-manager, if yes there are
2449  // more errors that can be recovered
2450  if( !( pLoadBalancer.flags & kXR_attrMeta ) ) return false;
2451 
2452  // those errors are retriable for meta-managers
2453  if( status.errNo == kXR_Unsupported || status.errNo == kXR_FileLocked )
2454  return true;
2455 
2456  // in case of not-authorized error there is an imposed upper limit
2457  // on how many times we can retry this error
2458  if( status.errNo == kXR_NotAuthorized )
2459  {
2460  int limit = DefaultNotAuthorizedRetryLimit;
2461  DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", limit );
2462  bool ret = pNotAuthorizedCounter < limit;
2463  ++pNotAuthorizedCounter;
2464  if( !ret )
2465  {
2466  Log *log = DefaultEnv::GetLog();
2467  log->Error( XRootDMsg,
2468  "[%s] Reached limit of NotAuthorized retries!",
2469  pUrl.GetHostId().c_str() );
2470  }
2471  return ret;
2472  }
2473 
2474  // check if the load-balancer is a virtual (metalink) redirector,
2475  // if yes there are even more errors that can be recovered
2476  if( !( pLoadBalancer.flags & kXR_attrVirtRdr ) ) return false;
2477 
2478  // those errors are retriable for virtual (metalink) redirectors
2479  if( status.errNo == kXR_noserver || status.errNo == kXR_ArgTooLong )
2480  return true;
2481 
2482  // otherwise it is a non-retriable error
2483  return false;
2484  }
2485 
2486  //------------------------------------------------------------------------
2487  // Dump the redirect-trace-back into the log file
2488  //------------------------------------------------------------------------
2489  void XRootDMsgHandler::DumpRedirectTraceBack()
2490  {
2491  if( pRedirectTraceBack.empty() ) return;
2492 
2493  std::stringstream sstrm;
2494 
2495  sstrm << "Redirect trace-back:\n";
2496 
2497  int counter = 0;
2498 
2499  auto itr = pRedirectTraceBack.begin();
2500  sstrm << '\t' << counter << ". " << (*itr)->ToString() << '\n';
2501 
2502  auto prev = itr;
2503  ++itr;
2504  ++counter;
2505 
2506  for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2507  sstrm << '\t' << counter << ". "
2508  << (*itr)->ToString( (*prev)->status.IsOK() ) << '\n';
2509 
2510  int authlimit = DefaultNotAuthorizedRetryLimit;
2511  DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", authlimit );
2512 
2513  bool warn = !pStatus.IsOK() &&
2514  ( pStatus.code == errNotFound ||
2515  pStatus.code == errRedirectLimit ||
2516  ( pStatus.code == errAuthFailed && pNotAuthorizedCounter >= authlimit ) );
2517 
2518  Log *log = DefaultEnv::GetLog();
2519  if( warn )
2520  log->Warning( XRootDMsg, "%s", sstrm.str().c_str() );
2521  else
2522  log->Debug( XRootDMsg, "%s", sstrm.str().c_str() );
2523  }
2524 
2525  // Read data from buffer
2526  //------------------------------------------------------------------------
2527  template<typename T>
2528  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, T& result )
2529  {
2530  if( sizeof( T ) > buflen ) return Status( stError, errDataError );
2531 
2532  memcpy(&result, buffer, sizeof(T));
2533 
2534  buffer += sizeof( T );
2535  buflen -= sizeof( T );
2536 
2537  return Status();
2538  }
2539 
2540  //------------------------------------------------------------------------
2541  // Read a string from buffer
2542  //------------------------------------------------------------------------
2543  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result )
2544  {
2545  Status status;
2546  char c = 0;
2547 
2548  while( true )
2549  {
2550  if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2551  return status;
2552 
2553  if( c == 0 ) break;
2554  result += c;
2555  }
2556 
2557  return status;
2558  }
2559 
2560  //------------------------------------------------------------------------
2561  // Read a string from buffer
2562  //------------------------------------------------------------------------
2563  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen,
2564  size_t size, std::string &result )
2565  {
2566  Status status;
2567 
2568  if( size > buflen ) return Status( stError, errDataError );
2569 
2570  result.append( buffer, size );
2571  buffer += size;
2572  buflen -= size;
2573 
2574  return status;
2575  }
2576 
2577 }
@ kXR_NotAuthorized
Definition: XProtocol.hh:1000
@ kXR_NotFound
Definition: XProtocol.hh:1001
@ kXR_FileLocked
Definition: XProtocol.hh:993
@ kXR_noReplicas
Definition: XProtocol.hh:1019
@ kXR_Unsupported
Definition: XProtocol.hh:1003
@ kXR_ServerError
Definition: XProtocol.hh:1002
@ kXR_Overloaded
Definition: XProtocol.hh:1014
@ kXR_ArgTooLong
Definition: XProtocol.hh:992
@ kXR_noserver
Definition: XProtocol.hh:1004
@ kXR_IOError
Definition: XProtocol.hh:997
@ kXR_FSError
Definition: XProtocol.hh:995
@ kXR_NoMemory
Definition: XProtocol.hh:998
#define kXR_isManager
Definition: XProtocol.hh:1156
union ServerResponse::@0 body
@ kXR_fattrDel
Definition: XProtocol.hh:270
@ kXR_fattrSet
Definition: XProtocol.hh:273
@ kXR_fattrList
Definition: XProtocol.hh:272
@ kXR_fattrGet
Definition: XProtocol.hh:271
struct ClientFattrRequest fattr
Definition: XProtocol.hh:854
#define kXR_collapseRedir
Definition: XProtocol.hh:1167
ServerResponseStatus status
Definition: XProtocol.hh:1310
#define kXR_attrMeta
Definition: XProtocol.hh:1159
kXR_char streamid[2]
Definition: XProtocol.hh:156
kXR_char streamid[2]
Definition: XProtocol.hh:914
kXR_unt16 options
Definition: XProtocol.hh:481
struct ClientDirlistRequest dirlist
Definition: XProtocol.hh:852
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_refresh
Definition: XProtocol.hh:459
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_retstat
Definition: XProtocol.hh:463
struct ClientOpenRequest open
Definition: XProtocol.hh:860
@ kXR_waitresp
Definition: XProtocol.hh:906
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_oksofar
Definition: XProtocol.hh:900
@ kXR_status
Definition: XProtocol.hh:907
@ kXR_ok
Definition: XProtocol.hh:899
@ kXR_attn
Definition: XProtocol.hh:901
@ kXR_wait
Definition: XProtocol.hh:905
@ kXR_error
Definition: XProtocol.hh:903
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1262
struct ClientRequestHdr header
Definition: XProtocol.hh:846
#define kXR_recoverWrts
Definition: XProtocol.hh:1166
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_mkdir
Definition: XProtocol.hh:120
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_chmod
Definition: XProtocol.hh:114
@ kXR_dirlist
Definition: XProtocol.hh:116
@ kXR_fattr
Definition: XProtocol.hh:132
@ kXR_rm
Definition: XProtocol.hh:126
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_set
Definition: XProtocol.hh:130
@ kXR_rmdir
Definition: XProtocol.hh:127
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_protocol
Definition: XProtocol.hh:118
@ kXR_mv
Definition: XProtocol.hh:121
@ kXR_ping
Definition: XProtocol.hh:123
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_locate
Definition: XProtocol.hh:139
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
@ kXR_prepare
Definition: XProtocol.hh:133
#define kXR_isServer
Definition: XProtocol.hh:1157
#define kXR_attrVirtRdr
Definition: XProtocol.hh:1162
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:849
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1261
union ServerResponseV2::@1 info
#define kXR_PROTOCOLVERSION
Definition: XProtocol.hh:70
@ kXR_vfs
Definition: XProtocol.hh:763
struct ClientStatRequest stat
Definition: XProtocol.hh:873
kXR_char options
Definition: XProtocol.hh:769
#define kXR_ecRedir
Definition: XProtocol.hh:1168
struct ClientLocateRequest locate
Definition: XProtocol.hh:856
ServerResponseHeader hdr
Definition: XProtocol.hh:1288
long long kXR_int64
Definition: XPtypes.hh:98
int kXR_int32
Definition: XPtypes.hh:89
unsigned short kXR_unt16
Definition: XPtypes.hh:67
unsigned char kXR_char
Definition: XPtypes.hh:65
#define unlikely(x)
std::string obfuscateAuth(const std::string &input)
void Get(Type &object)
Retrieve the object being held.
Object for reading out data from the PgRead response.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
virtual void Run(void *arg)
The job logic.
HandleRspJob(XrdCl::XRootDMsgHandler *handler)
Interface for a job to be run by the job manager.
void SetHostList(const HostList &hostList)
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
Handle diagnostics.
Definition: XrdClLog.hh:101
@ ErrorMsg
report errors
Definition: XrdClLog.hh:109
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
A network socket.
Definition: XrdClSocket.hh:43
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
Definition: XrdClSocket.cc:461
bool IsEncrypted()
Definition: XrdClSocket.cc:867
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:465
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
Definition: XrdClURL.cc:62
void SetPassword(const std::string &password)
Set the password.
Definition: XrdClURL.hh:161
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:402
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition: XrdClURL.cc:344
const std::string & GetUserName() const
Get the username.
Definition: XrdClURL.hh:135
const std::string & GetPassword() const
Get the password.
Definition: XrdClURL.hh:153
bool IsLocalFile() const
Definition: XrdClURL.cc:474
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
void SetProtocol(const std::string &protocol)
Set protocol.
Definition: XrdClURL.hh:126
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
void SetUserName(const std::string &userName)
Set the username.
Definition: XrdClURL.hh:143
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Definition: XrdClUtils.cc:703
Handle/Process/Forward XRootD messages.
const Message * GetRequest() const
Get the request pointer.
virtual uint16_t InspectStatusRsp() override
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
virtual void Process() override
Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
virtual bool IsRaw() const override
Are we a raw writer or not?
const std::string & GetErrorMessage() const
Get error message.
static void SetDescription(Message *msg)
Get the description of a message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errRedirectLimit
Definition: XrdClStatus.hh:102
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errNotFound
Definition: XrdClStatus.hh:100
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint64_t ExDbgMsg
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errInvalidRedirectURL
Definition: XrdClStatus.hh:98
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errAuthFailed
Definition: XrdClStatus.hh:88
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85
none object for initializing empty Optional
XrdSysError Log
Definition: XrdConfig.cc:113
@ kXR_PartialResult
Definition: XProtocol.hh:1251
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
URL url
URL of the host.
uint32_t flags
Host type.
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version