XRootD
XrdSys::IOEvents::Poller Class Referenceabstract

#include <XrdSysIOEvents.hh>

+ Inheritance diagram for XrdSys::IOEvents::Poller:
+ Collaboration diagram for XrdSys::IOEvents::Poller:

Classes

struct  PipeData
 

Public Types

enum  CreateOpts { optTOM }
 

Public Member Functions

 Poller (int cFD, int rFD)
 
virtual ~Poller ()
 Destructor. Stop() is effecively called when this object is deleted. More...
 
void Stop ()
 

Static Public Member Functions

static PollerCreate (int &eNum, const char **eTxt=0, int crOpts=0)
 

Protected Member Functions

virtual void Begin (XrdSysSemaphore *syncp, int &rc, const char **eTxt)=0
 
void CbkTMO ()
 
bool CbkXeq (Channel *cP, int events, int eNum, const char *eTxt)
 
 CPP_ATOMIC_TYPE (bool) wakePend
 
virtual void Exclude (Channel *cP, bool &isLocked, bool dover=1)=0
 
int GetFault (Channel *cP)
 
int GetPollEnt (Channel *cP)
 
int GetRequest ()
 
virtual bool Include (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
 
bool Init (Channel *cP, int &eNum, const char **eTxt, bool &isLockd)
 
void LockChannel (Channel *cP)
 
virtual bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
 
int Poll2Enum (short events)
 
int SendCmd (PipeData &cmd)
 
void SetPollEnt (Channel *cP, int ptEnt)
 
virtual void Shutdown ()=0
 
bool TmoAdd (Channel *cP, int tmoSet)
 
void TmoDel (Channel *cP)
 
int TmoGet ()
 
void UnLockChannel (Channel *cP)
 

Protected Attributes

ChannelattBase
 
bool chDead
 
int cmdFD
 
int pipeBlen
 
char * pipeBuff
 
struct pollfd pipePoll
 
pthread_t pollTid
 
PipeData reqBuff
 
int reqFD
 
ChanneltmoBase
 
unsigned char tmoMask
 

Static Protected Attributes

static time_t maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
 
static pid_t parentPID = getpid()
 

Friends

class BootStrap
 
class Channel
 

Detailed Description

Define a poller object interface. A poller fields and dispatches event callbacks. An actual instance of a poller object is obtained by using the Create() method. You cannot simply create an instance of this object using new or in-place declaration since it is abstract. Any number of these objects may created. Each creation spawns a polling thread.

Definition at line 371 of file XrdSysIOEvents.hh.

Member Enumeration Documentation

◆ CreateOpts

Create a specialized instance of a poller object, initialize it, and start the polling process. You must call Create() to obtain a specialized poller.

Parameters
eNumPlace where errno is placed upon failure.
eTxtPlace where a pointer to the description of the failing operation is to be set. If null, no description is returned.
crOptsPoller options (see static const optxxx): optTOM - Timeout resumption after a timeout event must be manually reenabled. By default, event timeouts are automatically renabled after successful callbacks.
Returns
!0 Poller successfully created and started. eNum contains zero. eTxt if not null contains a null string. The returned value is a pointer to the Poller object. 0 Poller could not be created. eNum contains the associated errno value. eTxt if not null contains the failing operation.
Enumerator
optTOM 

Definition at line 398 of file XrdSysIOEvents.hh.

Constructor & Destructor Documentation

◆ Poller()

XrdSys::IOEvents::Poller::Poller ( int  cFD,
int  rFD 
)

Constructor

Parameters
cFDThe file descriptor to send commands to the poll thread.
rFDThe file descriptor to recv commands in the poll thread.

Definition at line 571 of file XrdSysIOEvents.cc.

572 {
573 
574 // Now initialize local class members
575 //
576  attBase = 0;
577  tmoBase = 0;
578  cmdFD = cFD;
579  reqFD = rFD;
580  wakePend = false;
581  pipeBuff = 0;
582  pipeBlen = 0;
583  pipePoll.fd = rFD;
584  pipePoll.events = POLLIN | POLLRDNORM;
585  tmoMask = 255;
586 }

◆ ~Poller()

virtual XrdSys::IOEvents::Poller::~Poller ( )
inlinevirtual

Destructor. Stop() is effecively called when this object is deleted.

Definition at line 430 of file XrdSysIOEvents.hh.

430 {}

Member Function Documentation

◆ Begin()

virtual void XrdSys::IOEvents::Poller::Begin ( XrdSysSemaphore syncp,
int &  rc,
const char **  eTxt 
)
protectedpure virtual

Start the polling event loop. An implementation must be supplied. Begin() is called via the internal BootStrap class from a new thread.

Implemented in XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollPort, XrdSys::IOEvents::PollPoll, XrdSys::IOEvents::PollKQ, and XrdSys::IOEvents::PollE.

Referenced by XrdSys::IOEvents::BootStrap::Start().

+ Here is the caller graph for this function:

◆ CbkTMO()

void XrdSys::IOEvents::Poller::CbkTMO ( )
protected

Definition at line 614 of file XrdSysIOEvents.cc.

615 {
616  Channel *cP;
617 
618 // Process each element in the timeout queue, calling the callback function
619 // if the timeout has passed. As this method can be called with a lock on the
620 // channel mutex, we need to drop it prior to calling the callback.
621 //
622  toMutex.Lock();
623  while((cP = tmoBase) && cP->deadLine <= time(0))
624  {int dlType = cP->dlType;
625  toMutex.UnLock();
626  CbkXeq(cP, dlType, 0, 0);
627  toMutex.Lock();
628  }
629  toMutex.UnLock();
630 }
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt)

◆ CbkXeq()

bool XrdSys::IOEvents::Poller::CbkXeq ( Channel cP,
int  events,
int  eNum,
const char *  eTxt 
)
protected

Definition at line 636 of file XrdSysIOEvents.cc.

638 {
639  XrdSysMutexHelper cbkMHelp(cP->chMutex);
640  char oldEvents;
641  bool cbok, retval, isRead, isWrite, isLocked = true;
642 
643 // Perform any required tracing
644 //
645  if (TRACING)
646  {const char *cbtype = (cP->chPoller == cP->chPollXQ ? "norm" :
647  (cP->chPoller == &pollInit ? "init" :
648  (cP->chPoller == &pollWait ? "wait" : "err")));
649  DO_TRACE(CbkXeq,cP->chFD,"callback events=" <<events
650  <<" chev=" <<static_cast<int>(cP->chEvents)
651  <<" toq=" <<(cP->inTOQ != 0) <<" erc=" <<eNum
652  <<" callback " <<(cP->chCB ? "present" : "missing")
653  <<" poller=" <<cbtype);
654  }
655 
656 // Remove this from the timeout queue if there and reset the deadlines based
657 // on the event we are reflecting. This separates read and write deadlines
658 //
659  if (cP->inTOQ)
660  {TmoDel(cP);
661  cP->dlType |= (events & CallBack::ValidEvents) << 4;
662  isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut);
663  if (isRead) cP->rdDL = maxTime;
664  isWrite= events & (CallBack::ReadyToWrite | CallBack::WriteTimeOut);
665  if (isWrite) cP->wrDL = maxTime;
666  } else {
667  cP->dlType &= CallBack::ValidEvents;
668  isRead = isWrite = false;
669  }
670 
671 // Verify that there is a callback here and the channel is ready. If not,
672 // disable this channel for the events being refelcted unless the event is a
673 // fatal error. In this case we need to abandon the channel since error events
674 // may continue to be generated as we can't always disable them.
675 //
676  if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
677  {if (eNum)
678  {cP->chPoller = &pollErr1; cP->chFault = eNum;
679  cP->inPSet = 0;
680  return false;
681  }
682  oldEvents = cP->chEvents;
683  cP->chEvents = 0;
684  retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
685  TRACE_MOD(CbkXeq,cP->chFD,0);
686  if (!isLocked) cP->chMutex.Lock();
687  cP->chEvents = oldEvents;
688  return true;
689  }
690 
691 // Resolve the problem where we get an error event but the channel wants them
692 // presented as a read or write event. If neither is possible then defer the
693 // error until the channel is enabled again.
694 //
695  if (eNum)
696  {if (cP->chEvents & Channel::errorEvents)
697  {cP->chPoller = &pollErr1; cP->chFault = eNum;
698  cP->chStat = Channel::isCBMode;
699  chDead = false;
700  cbkMHelp.UnLock();
701  cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt);
702  if (chDead) return true;
703  cbkMHelp.Lock(&(cP->chMutex));
704  cP->inPSet = 0;
705  return false;
706  }
707  if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead;
708  else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite;
709  else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0;
710  return false;
711  }
712  }
713 
714 // Indicate that we are in callback mode then drop the channel lock and effect
715 // the callback. This allows the callback to freely manage locks.
716 //
717  cP->chStat = Channel::isCBMode;
718  chDead = false;
719  // Detach() may be called after unlocking the channel and would zero the
720  // callback pointer and argument. So keep a copy.
721  CallBack *cb = cP->chCB;
722  void *cba = cP->chCBA;
723  cbkMHelp.UnLock();
724  IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <<events);
725  cbok = cb->Event(cP,cba, events);
726  IF_TRACE(CbkXeq,cP->chFD,"callback returned " <<BOOLNAME(cbok));
727 
728 // If channel destroyed by the callback, bail really fast. Otherwise, regain
729 // the channel lock.
730 //
731  if (chDead) return true;
732  cbkMHelp.Lock(&(cP->chMutex));
733 
734 // If the channel is being destroyed; then another thread must have done so.
735 // Tell it the callback has finished and just return.
736 //
737  if (cP->chStat != Channel::isCBMode)
738  {if (cP->chStat == Channel::isDead)
739  {XrdSysSemaphore *theSem = (XrdSysSemaphore *)cP->chCBA;
740  // channel will be destroyed shortly after post, unlock mutex before
741  cbkMHelp.UnLock();
742  theSem->Post();
743  }
744  return true;
745  }
746  cP->chStat = Channel::isClear;
747 
748 // Handle enable or disable here. If we keep the channel enabled then reset
749 // the timeout if it hasn't been handled via a call from the callback.
750 //
751  if (!cbok) Detach(cP,isLocked,false);
752  else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
753  TmoAdd(cP, 0);
754 
755 // All done. While the mutex should not have been unlocked, we relock it if
756 // it has to keep the mutex helper from croaking.
757 //
758  if (!isLocked) cP->chMutex.Lock();
759  return true;
760 }
#define IF_TRACE(x, fd, y)
#define TRACING
#define DO_TRACE(x, fd, y)
#define REVENTS(x)
#define BOOLNAME(x)
#define TRACE_MOD(x, fd, y)
#define WEVENTS(x)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ WriteTimeOut
Write timeout.
@ ValidEvents
Mask to test for valid events.
@ errorEvents
Error event non-r/w specific.
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void TmoDel(Channel *cP)
bool TmoAdd(Channel *cP, int tmoSet)

References BOOLNAME, DO_TRACE, XrdSys::IOEvents::Channel::errorEvents, XrdSys::IOEvents::CallBack::Event(), XrdSys::IOEvents::CallBack::Fatal(), IF_TRACE, XrdSysMutex::Lock(), XrdSysMutexHelper::Lock(), Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollInit, XrdSys::IOEvents::pollWait, XrdSysSemaphore::Post(), XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REVENTS, TRACE_MOD, TRACING, XrdSysMutexHelper::UnLock(), XrdSys::IOEvents::CallBack::ValidEvents, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

+ Here is the call graph for this function:

◆ CPP_ATOMIC_TYPE()

XrdSys::IOEvents::Poller::CPP_ATOMIC_TYPE ( bool  )
protected

◆ Create()

XrdSys::IOEvents::Poller * XrdSys::IOEvents::Poller::Create ( int &  eNum,
const char **  eTxt = 0,
int  crOpts = 0 
)
static

Definition at line 766 of file XrdSysIOEvents.cc.

769 {
770  int fildes[2];
771  struct pollArg pArg;
772  pthread_t tid;
773 
774 // Create a pipe used to break the poll wait loop
775 //
776  if (XrdSysFD_Pipe(fildes))
777  {eNum = errno;
778  if (eTxt) *eTxt = "creating poll pipe";
779  return 0;
780  }
781 
782 // Create an actual implementation of a poller
783 //
784  if (!(pArg.pollP = newPoller(fildes, eNum, eTxt)))
785  {close(fildes[0]);
786  close(fildes[1]);
787  return 0;
788  }
789 
790 // Now start a thread to handle this poller object
791 //
793  (void *)&pArg, XRDSYSTHREAD_BIND, "Poller")))
794  {if (eTxt) *eTxt = "creating poller thread"; return 0;}
795 
796 // Now wait for the thread to finish initializing before we allow use
797 // Note that the bootstrap takes ownership of the semaphore and will delete it
798 // once the thread positing the semaphore actually ends. This is to avoid
799 // semaphore bugs present in certain (e.g. Linux) kernels.
800 //
801  pArg.pollSync->Wait();
802 
803 // Check if all went well
804 //
805  if (pArg.retCode)
806  {if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller");
807  eNum = pArg.retCode;
808  delete pArg.pollP;
809  return 0;
810  }
811 
812 // Set creation options in the new poller
813 //
814  if (crOpts & optTOM)
815  pArg.pollP->tmoMask = ~(CallBack::ReadTimeOut|CallBack::WriteTimeOut);
816 
817 // All done
818 //
819  eNum = 0;
820  if (eTxt) *eTxt = "";
821  return pArg.pollP;
822 }
#define close(a)
Definition: XrdPosix.hh:48
#define XRDSYSTHREAD_BIND
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void * Start(void *parg)

References close, XrdSys::IOEvents::pollArg::pollP, XrdSys::IOEvents::pollArg::pollSync, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::pollArg::retCode, XrdSys::IOEvents::pollArg::retMsg, XrdSysThread::Run(), XrdSys::IOEvents::BootStrap::Start(), tmoMask, XrdSysSemaphore::Wait(), XrdSys::IOEvents::CallBack::WriteTimeOut, and XRDSYSTHREAD_BIND.

+ Here is the call graph for this function:

◆ Exclude()

virtual void XrdSys::IOEvents::Poller::Exclude ( Channel cP,
bool &  isLocked,
bool  dover = 1 
)
protectedpure virtual

Remove a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollPort, XrdSys::IOEvents::PollPoll, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollerInit, and XrdSys::IOEvents::PollerErr1.

◆ GetFault()

int XrdSys::IOEvents::Poller::GetFault ( Channel cP)
inlineprotected

Definition at line 437 of file XrdSysIOEvents.hh.

437 {return cP->chFault;}

Referenced by XrdSys::IOEvents::PollerErr1::Include(), and XrdSys::IOEvents::PollerErr1::Modify().

+ Here is the caller graph for this function:

◆ GetPollEnt()

int XrdSys::IOEvents::Poller::GetPollEnt ( Channel cP)
inlineprotected

Definition at line 438 of file XrdSysIOEvents.hh.

438 {return cP->pollEnt;}

◆ GetRequest()

int XrdSys::IOEvents::Poller::GetRequest ( )
protected

Definition at line 874 of file XrdSysIOEvents.cc.

875 {
876  ssize_t rlen;
877  int rc;
878 
879 // See if we are to resume a read or start a fresh one
880 //
881  if (!pipeBlen)
882  {pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);}
883 
884 // Wait for the next request. Some OS's (like Linux) don't support non-blocking
885 // pipes. So, we must front the read with a poll.
886 //
887  do {rc = poll(&pipePoll, 1, 0);}
888  while(rc < 0 && (errno == EAGAIN || errno == EINTR));
889  if (rc < 1) return 0;
890 
891 // Now we can put up a read without a delay. Normally a full command will be
892 // present. Under some heavy conditions, this may not be the case.
893 //
894  do {rlen = read(reqFD, pipeBuff, pipeBlen);}
895  while(rlen < 0 && errno == EINTR);
896  if (rlen <= 0)
897  {std::cerr <<"Poll: "<<XrdSysE2T(errno)<<" reading from request pipe\n"<< std::flush;
898  return 0;
899  }
900 
901 // Check if all the data has arrived. If not all the data is present, defer
902 // this request until more data arrives.
903 //
904  if (!(pipeBlen -= rlen)) return 1;
905  pipeBuff += rlen;
906  return 0;
907 }
ssize_t read(int fildes, void *buf, size_t nbyte)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104

References read(), and XrdSysE2T().

+ Here is the call graph for this function:

◆ Include()

virtual bool XrdSys::IOEvents::Poller::Include ( Channel cP,
int &  eNum,
const char **  eTxt,
bool &  isLocked 
)
protectedpure virtual

Add a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollPort, XrdSys::IOEvents::PollPoll, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollerInit, and XrdSys::IOEvents::PollerErr1.

Referenced by Init().

+ Here is the caller graph for this function:

◆ Init()

bool XrdSys::IOEvents::Poller::Init ( Channel cP,
int &  eNum,
const char **  eTxt,
bool &  isLockd 
)
protected

Definition at line 913 of file XrdSysIOEvents.cc.

915 {
916 // The channel must be locked upon entry!
917 //
918  bool retval;
919 
920 
921 // If we are already in progress then simply update the shadow events and
922 // resuppress all current events.
923 //
924  if (cP->chPoller == &pollWait)
925  {cP->reMod = cP->chEvents;
926  cP->chEvents = 0;
927  IF_TRACE(Init,cP->chFD,"defer events=" <<cP->reMod);
928  return true;
929  }
930 
931 // Trace this entry
932 //
933  IF_TRACE(Init,cP->chFD,"begin events=" <<int(cP->chEvents));
934 
935 // If no events are enabled at this point, just return
936 //
937  if (!(cP->chEvents)) return true;
938 
939 // Refuse to enable a channel without a callback function
940 //
941  if (!(cP->chCB))
942  {eNum = EDESTADDRREQ;
943  if (eTxt) *eTxt = "enabling without a callback";
944  return false;
945  }
946 
947 // So, now we can include the channel in the poll set. We will include it
948 // with no events enabled to prevent callbacks prior to completion here.
949 //
950  cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
951  retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
952  IF_TRACE(Init,cP->chFD,"Include() returned " <<BOOLNAME(retval) <<TRACE_LOK);
953  if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
954 
955 // Determine what future poller to use. If we can use the regular poller then
956 // set the correct event mask for the channel. Note that we could have lost
957 // control but the correct events will be reflected in the "reMod" member.
958 //
959  if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
960  else {cP->chPoller = cP->chPollXQ;
961  cP->inPSet = 1;
962  if (cP->reMod)
963  {cP->chEvents = cP->reMod;
964  retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
965  TRACE_MOD(Init,cP->chFD,int(cP->reMod));
966  if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
967  } else {
968  TRACE_NOD(Init,cP->chFD,0);
969  }
970  }
971 
972 // All done
973 //
974  cP->reMod = 0;
975  return retval;
976 }
#define TRACE_LOK
#define TRACE_NOD(x, fd, y)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd)

References BOOLNAME, IF_TRACE, Include(), XrdSysMutex::Lock(), Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollWait, TRACE_LOK, TRACE_MOD, and TRACE_NOD.

Referenced by XrdSys::IOEvents::PollerInit::Modify(), and XrdSys::IOEvents::PollerWait::Modify().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ LockChannel()

void XrdSys::IOEvents::Poller::LockChannel ( Channel cP)
inlineprotected

Definition at line 441 of file XrdSysIOEvents.hh.

441 {cP->chMutex.Lock();}

References XrdSysMutex::Lock().

+ Here is the call graph for this function:

◆ Modify()

virtual bool XrdSys::IOEvents::Poller::Modify ( Channel cP,
int &  eNum,
const char **  eTxt,
bool &  isLocked 
)
protectedpure virtual

Modify the event status of a channel. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollPort, XrdSys::IOEvents::PollPoll, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollerInit, and XrdSys::IOEvents::PollerErr1.

Referenced by CbkXeq(), and Init().

+ Here is the caller graph for this function:

◆ Poll2Enum()

int XrdSys::IOEvents::Poller::Poll2Enum ( short  events)
protected

Definition at line 982 of file XrdSysIOEvents.cc.

983 {
984  if (events & POLLERR) return EPIPE;
985 
986  if (events & POLLHUP) return ECONNRESET;
987 
988  if (events & POLLNVAL) return EBADF;
989 
990  return EOPNOTSUPP;
991 }

◆ SendCmd()

int XrdSys::IOEvents::Poller::SendCmd ( PipeData cmd)
protected

Definition at line 997 of file XrdSysIOEvents.cc.

998 {
999  int wlen;
1000 
1001 // Pipe writes are atomic so we don't need locks. Some commands require
1002 // confirmation. We handle that here based on the command. Note that pipes
1003 // gaurantee that all of the data will be written or we will block.
1004 //
1005  if (cmd.req >= PipeData::Post)
1006  {XrdSysSemaphore mySem(0);
1007  cmd.theSem = &mySem;
1008  do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1009  while (wlen < 0 && errno == EINTR);
1010  if (wlen > 0) mySem.Wait();
1011  } else {
1012  do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1013  while (wlen < 0 && errno == EINTR);
1014  }
1015 
1016 // All done
1017 //
1018  return (wlen >= 0 ? 0 : errno);
1019 }
ssize_t write(int fildes, const void *buf, size_t nbyte)

References XrdSys::IOEvents::Poller::PipeData::req, XrdSys::IOEvents::Poller::PipeData::theSem, XrdSysSemaphore::Wait(), and write().

+ Here is the call graph for this function:

◆ SetPollEnt()

void XrdSys::IOEvents::Poller::SetPollEnt ( Channel cP,
int  ptEnt 
)
protected

Definition at line 1025 of file XrdSysIOEvents.cc.

1026 {
1027  cP->pollEnt = pe;
1028 }

◆ Shutdown()

virtual void XrdSys::IOEvents::Poller::Shutdown ( )
protectedpure virtual

Shutdown the poller. An implementation must be supplied. The shutdown method must release any allocated storage and close private file descriptors. The polling thread will have already been terminated and x-thread pipe closed. Warning: the derived destructor must call Stop() and do nothing else!

Implemented in XrdSys::IOEvents::PollPort, XrdSys::IOEvents::PollPoll, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollerInit, and XrdSys::IOEvents::PollerErr1.

◆ Stop()

void XrdSys::IOEvents::Poller::Stop ( )

Stop a poller object. Active callbacks are completed. Pending callbacks are discarded. After which the poller event thread exits. Subsequently, each associated channel is disabled and removed from the poller object. If the channel is enabled for a StopEvent, the stop callback is invoked. However, any attempt to use the channel methods that require an active poller will return an error.

Since a stopped poller cannot be restarted; the only thing left is to delete it. This also applies to all the associated channels since they no longer have an active poller.

Definition at line 1034 of file XrdSysIOEvents.cc.

1035 {
1036  PipeData cmdbuff;
1037  CallBack *theCB;
1038  Channel *cP;
1039  void *cbArg;
1040  int doCB;
1041 
1042 // Initialize the pipdata structure
1043 //
1044  memset(static_cast<void*>( &cmdbuff ), 0, sizeof(cmdbuff));
1045  cmdbuff.req = PipeData::Stop;
1046 
1047 // Lock all of this
1048 //
1049  adMutex.Lock();
1050 
1051 // If we are already shutdown then we are done
1052 //
1053  if (cmdFD == -1) {adMutex.UnLock(); return;}
1054 
1055 // First we must stop the poller thread in an orderly fashion.
1056 //
1057  adMutex.UnLock();
1058  SendCmd(cmdbuff);
1059  adMutex.Lock();
1060 
1061 // Close the pipe communication mechanism
1062 //
1063  close(cmdFD); cmdFD = -1;
1064  close(reqFD); reqFD = -1;
1065 
1066 // Run through cleaning up the channels. While there should not be any other
1067 // operations happening on this poller, we take the conservative approach.
1068 //
1069  while((cP = attBase))
1070  {REMOVE(attBase, attList, cP);
1071  adMutex.UnLock();
1072  cP->chMutex.Lock();
1073  doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent);
1074  if (cP->inTOQ) TmoDel(cP);
1075  cP->Reset(&pollErr1, cP->chFD, EIDRM);
1076  cP->chPollXQ = &pollErr1;
1077  if (doCB)
1078  {cP->chStat = Channel::isClear;
1079  theCB = cP->chCB; cbArg = cP->chCBA;
1080  cP->chMutex.UnLock();
1081  theCB->Stop(cP, cbArg);
1082  } else cP->chMutex.UnLock();
1083  adMutex.Lock();
1084  }
1085 
1086 // Now invoke the poller specific shutdown
1087 //
1088  Shutdown();
1089  adMutex.UnLock();
1090 }
#define REMOVE(dlbase, dlvar, curitem)
@ stopEvent
Poller stop event.
int SendCmd(PipeData &cmd)
virtual void Shutdown()=0

References close, XrdSysMutex::Lock(), XrdSys::IOEvents::pollErr1, REMOVE, XrdSys::IOEvents::Poller::PipeData::req, XrdSys::IOEvents::CallBack::Stop(), XrdSys::IOEvents::Channel::stopEvent, and XrdSysMutex::UnLock().

Referenced by XrdSys::IOEvents::PollE::~PollE(), XrdSys::IOEvents::PollKQ::~PollKQ(), XrdSys::IOEvents::PollPoll::~PollPoll(), XrdSys::IOEvents::PollPort::~PollPort(), and XrdCl::PollerBuiltIn::Stop().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ TmoAdd()

bool XrdSys::IOEvents::Poller::TmoAdd ( Channel cP,
int  tmoSet 
)
protected

Definition at line 1096 of file XrdSysIOEvents.cc.

1097 {
1098  XrdSysMutexHelper mHelper(toMutex);
1099  time_t tNow;
1100  Channel *ncP;
1101  bool setRTO, setWTO;
1102 
1103 // Do some tracing
1104 //
1105  IF_TRACE(TmoAdd,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1106  <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1107 
1108 // Remove element from timeout queue if it is there
1109 //
1110  if (cP->inTOQ)
1111  {REMOVE(tmoBase, tmoList, cP);
1112  cP->inTOQ = 0;
1113  }
1114 
1115 // Determine which timeouts need to be reset
1116 //
1117  tmoSet|= cP->dlType >> 4;
1118  setRTO = (tmoSet&tmoMask) & (CallBack::ReadyToRead |CallBack:: ReadTimeOut);
1120 
1121 // Reset the required deadlines
1122 //
1123  tNow = time(0);
1124  if (setRTO && REVENTS(cP->chEvents) && cP->chRTO)
1125  cP->rdDL = cP->chRTO + tNow;
1126  if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO)
1127  cP->wrDL = cP->chWTO + tNow;
1128 
1129 // Calculate the closest enabled deadline
1130 //
1131  if (cP->rdDL < cP->wrDL)
1132  {cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut;
1133  } else {
1134  cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut;
1135  if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut;
1136  }
1137  IF_TRACE(TmoAdd, cP->chFD, "t=" <<tNow <<" rdDL=" <<setRTO <<' ' <<cP->rdDL
1138  <<" wrDL=" <<setWTO <<' ' <<cP->wrDL);
1139 
1140 // If no timeout really applies, we are done
1141 //
1142  if (cP->deadLine == maxTime) return false;
1143 
1144 // Add the channel to the timeout queue in correct deadline position.
1145 //
1146  if ((ncP = tmoBase))
1147  {do {if (cP->deadLine < ncP->deadLine) break;
1148  ncP = ncP->tmoList.next;
1149  } while(ncP != tmoBase);
1150  INSERT(tmoList, ncP, cP);
1151  if (cP->deadLine < tmoBase->deadLine) tmoBase = cP;
1152  } else tmoBase = cP;
1153  cP->inTOQ = 1;
1154 
1155 // Indicate to the caller whether or not a wakeup is required
1156 //
1157  return (tmoBase == cP);
1158 }
#define STATUSOF(x)
#define INSERT(dlvar, curitem, newitem)
@ dec
Definition: XrdSysTrace.hh:42
@ hex
Definition: XrdSysTrace.hh:42

References BOOLNAME, Xrd::dec, Xrd::hex, IF_TRACE, INSERT, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REMOVE, REVENTS, STATUSOF, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

◆ TmoDel()

void XrdSys::IOEvents::Poller::TmoDel ( Channel cP)
protected

Definition at line 1164 of file XrdSysIOEvents.cc.

1165 {
1166 
1167 // Do some tracing
1168 //
1169  IF_TRACE(TmoDel,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1170  <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1171 
1172 // Get the timeout queue lock and remove the channel from the queue
1173 //
1174  toMutex.Lock();
1175  REMOVE(tmoBase, tmoList, cP);
1176  cP->inTOQ = 0;
1177  toMutex.UnLock();
1178 }

References BOOLNAME, Xrd::dec, Xrd::hex, IF_TRACE, REMOVE, and STATUSOF.

◆ TmoGet()

int XrdSys::IOEvents::Poller::TmoGet ( )
protected

Definition at line 1184 of file XrdSysIOEvents.cc.

1185 {
1186  int wtval;
1187 
1188 // Lock the timeout queue
1189 //
1190  toMutex.Lock();
1191 
1192 // Calculate wait time. If the deadline passed, invoke the timeout callback.
1193 // we will need to drop the timeout lock as we don't have the channel lock.
1194 //
1195  do {if (!tmoBase) {wtval = -1; break;}
1196  wtval = (tmoBase->deadLine - time(0)) * 1000;
1197  if (wtval > 0) break;
1198  toMutex.UnLock();
1199  CbkTMO();
1200  toMutex.Lock();
1201  } while(1);
1202 
1203 // Return the value
1204 //
1205  CPP_ATOMIC_STORE(wakePend, false, std::memory_order_release);
1206  toMutex.UnLock();
1207  return wtval;
1208 }
#define CPP_ATOMIC_STORE(x, val, order)

References CPP_ATOMIC_STORE.

Referenced by XrdSys::IOEvents::PollPort::BegTO().

+ Here is the caller graph for this function:

◆ UnLockChannel()

void XrdSys::IOEvents::Poller::UnLockChannel ( Channel cP)
inlineprotected

Definition at line 448 of file XrdSysIOEvents.hh.

448 {cP->chMutex.UnLock();}

References XrdSysMutex::UnLock().

+ Here is the call graph for this function:

Friends And Related Function Documentation

◆ BootStrap

friend class BootStrap
friend

Definition at line 373 of file XrdSysIOEvents.hh.

◆ Channel

friend class Channel
friend

Definition at line 374 of file XrdSysIOEvents.hh.

Member Data Documentation

◆ attBase

Channel* XrdSys::IOEvents::Poller::attBase
protected

Definition at line 488 of file XrdSysIOEvents.hh.

◆ chDead

bool XrdSys::IOEvents::Poller::chDead
protected

Definition at line 511 of file XrdSysIOEvents.hh.

Referenced by XrdSys::IOEvents::Channel::Delete().

◆ cmdFD

int XrdSys::IOEvents::Poller::cmdFD
protected

Definition at line 494 of file XrdSysIOEvents.hh.

◆ maxTime

time_t XrdSys::IOEvents::Poller::maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
staticprotected

Definition at line 513 of file XrdSysIOEvents.hh.

Referenced by XrdSys::IOEvents::Channel::Enable().

◆ parentPID

pid_t XrdSys::IOEvents::Poller::parentPID = getpid()
staticprotected

Definition at line 515 of file XrdSysIOEvents.hh.

◆ pipeBlen

int XrdSys::IOEvents::Poller::pipeBlen
protected

Definition at line 508 of file XrdSysIOEvents.hh.

◆ pipeBuff

char* XrdSys::IOEvents::Poller::pipeBuff
protected

Definition at line 507 of file XrdSysIOEvents.hh.

◆ pipePoll

struct pollfd XrdSys::IOEvents::Poller::pipePoll
protected

Definition at line 491 of file XrdSysIOEvents.hh.

◆ pollTid

pthread_t XrdSys::IOEvents::Poller::pollTid
protected

◆ reqBuff

PipeData XrdSys::IOEvents::Poller::reqBuff
protected

Definition at line 506 of file XrdSysIOEvents.hh.

◆ reqFD

int XrdSys::IOEvents::Poller::reqFD
protected

Definition at line 495 of file XrdSysIOEvents.hh.

Referenced by XrdSys::IOEvents::PollKQ::PollKQ().

◆ tmoBase

Channel* XrdSys::IOEvents::Poller::tmoBase
protected

Definition at line 489 of file XrdSysIOEvents.hh.

◆ tmoMask

unsigned char XrdSys::IOEvents::Poller::tmoMask
protected

Definition at line 509 of file XrdSysIOEvents.hh.

Referenced by Create().


The documentation for this class was generated from the following files: