XRootD
XrdThrottleManager.cc
Go to the documentation of this file.
1 
2 #include "XrdThrottleManager.hh"
3 
4 #include "XrdOuc/XrdOucEnv.hh"
5 #include "XrdSec/XrdSecEntity.hh"
8 #include "XrdSys/XrdSysTimer.hh"
12 
13 #define XRD_TRACE m_trace->
15 
16 #include <algorithm>
17 #include <array>
18 #include <cmath>
19 #include <random>
20 #include <sstream>
21 
22 #if defined(__linux__)
23 
24 #include <sched.h>
25 unsigned XrdThrottleManager::GetTimerListHash() {
26  int cpu = sched_getcpu();
27  if (cpu < 0) {
28  return 0;
29  }
30  return cpu % m_timer_list_size;
31 }
32 
33 #else
34 
35 unsigned XrdThrottleManager::GetTimerListHash() {
36  return 0;
37 }
38 
39 #endif
40 
41 const char *
42 XrdThrottleManager::TraceID = "ThrottleManager";
43 
45  m_trace(tP),
46  m_log(lP),
47  m_interval_length_seconds(1.0),
48  m_bytes_per_second(-1),
49  m_ops_per_second(-1),
50  m_concurrency_limit(-1),
51  m_last_round_allocation(100*1024),
52  m_loadshed_host(""),
53  m_loadshed_port(0),
54  m_loadshed_frequency(0)
55 {
56 }
57 
58 void
60 {
61 
62  auto max_open = config.GetMaxOpen();
63  if (max_open != -1) SetMaxOpen(max_open);
64  auto max_conn = config.GetMaxConn();
65  if (max_conn != -1) SetMaxConns(max_conn);
66  auto max_wait = config.GetMaxWait();
67  if (max_wait != -1) SetMaxWait(max_wait);
68 
70  config.GetThrottleIOPSRate(),
71  config.GetThrottleConcurrency(),
72  static_cast<float>(config.GetThrottleRecomputeIntervalMS())/1000.0);
73 
74  m_trace->What = config.GetTraceLevels();
75 
76  auto loadshed_host = config.GetLoadshedHost();
77  auto loadshed_port = config.GetLoadshedPort();
78  auto loadshed_freq = config.GetLoadshedFreq();
79  if (!loadshed_host.empty() && loadshed_port > 0 && loadshed_freq > 0)
80  {
81  // Loadshed specified, so set it.
82  SetLoadShed(loadshed_host, loadshed_port, loadshed_freq);
83  }
84 }
85 
86 void
88 {
89  TRACE(DEBUG, "Initializing the throttle manager.");
90  // Initialize all our shares to zero.
91  m_primary_bytes_shares.resize(m_max_users);
92  m_secondary_bytes_shares.resize(m_max_users);
93  m_primary_ops_shares.resize(m_max_users);
94  m_secondary_ops_shares.resize(m_max_users);
95  for (auto & waiter : m_waiter_info) {
96  waiter.m_manager = this;
97  }
98 
99  // Allocate each user 100KB and 10 ops to bootstrap;
100  for (int i=0; i<m_max_users; i++)
101  {
102  m_primary_bytes_shares[i] = m_last_round_allocation;
103  m_secondary_bytes_shares[i] = 0;
104  m_primary_ops_shares[i] = 10;
105  m_secondary_ops_shares[i] = 0;
106  }
107 
108  int rc;
109  pthread_t tid;
110  if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
111  m_log->Emsg("ThrottleManager", rc, "create throttle thread");
112 
113 }
114 
115 std::tuple<std::string, uint16_t>
117  // client can be null, if so, return nobody
118  if (!client) {
119  return std::make_tuple("nobody", GetUid("nobody"));
120  }
121 
122  // Try various potential "names" associated with the request, from the most
123  // specific to most generic.
124  std::string user;
125 
126  if (client->eaAPI && client->eaAPI->Get("token.subject", user)) {
127  if (client->vorg) user = std::string(client->vorg) + ":" + user;
128  } else if (client->eaAPI) {
129  std::string request_name;
130  if (client->eaAPI->Get("request.name", request_name) && !request_name.empty()) user = request_name;
131  }
132  if (user.empty()) {user = client->name ? client->name : "nobody";}
133  uint16_t uid = GetUid(user.c_str());
134  return std::make_tuple(user, uid);
135 }
136 
137 /*
138  * Take as many shares as possible to fulfill the request; update
139  * request with current remaining value, or zero if satisfied.
140  */
141 inline void
142 XrdThrottleManager::GetShares(int &shares, int &request)
143 {
144  int remaining;
145  AtomicFSub(remaining, shares, request);
146  if (remaining > 0)
147  {
148  request -= (remaining < request) ? remaining : request;
149  }
150 }
151 
152 /*
153  * Iterate through all of the secondary shares, attempting
154  * to steal enough to fulfill the request.
155  */
156 void
157 XrdThrottleManager::StealShares(int uid, int &reqsize, int &reqops)
158 {
159  if (!reqsize && !reqops) return;
160  TRACE(BANDWIDTH, "Stealing shares to fill request of " << reqsize << " bytes");
161  TRACE(IOPS, "Stealing shares to fill request of " << reqops << " ops.");
162 
163  for (int i=uid+1; i % m_max_users == uid; i++)
164  {
165  if (reqsize) GetShares(m_secondary_bytes_shares[i % m_max_users], reqsize);
166  if (reqops) GetShares(m_secondary_ops_shares[ i % m_max_users], reqops);
167  }
168 
169  TRACE(BANDWIDTH, "After stealing shares, " << reqsize << " of request bytes remain.");
170  TRACE(IOPS, "After stealing shares, " << reqops << " of request ops remain.");
171 }
172 
173 /*
174  * Increment the number of files held open by a given entity. Returns false
175  * if the user is at the maximum; in this case, the internal counter is not
176  * incremented.
177  */
178 bool
179 XrdThrottleManager::OpenFile(const std::string &entity, std::string &error_message)
180 {
181  if (m_max_open == 0 && m_max_conns == 0) return true;
182 
183  const std::lock_guard<std::mutex> lock(m_file_mutex);
184  auto iter = m_file_counters.find(entity);
185  unsigned long cur_open_files = 0, cur_open_conns;
186  if (m_max_open) {
187  if (iter == m_file_counters.end()) {
188  m_file_counters[entity] = 1;
189  TRACE(FILES, "User " << entity << " has opened their first file");
190  cur_open_files = 1;
191  } else if (iter->second < m_max_open) {
192  iter->second++;
193  cur_open_files = iter->second;
194  } else {
195  std::stringstream ss;
196  ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
197  TRACE(FILES, ss.str());
198  error_message = ss.str();
199  return false;
200  }
201  }
202 
203  if (m_max_conns) {
204  auto pid = XrdSysThread::Num();
205  auto conn_iter = m_active_conns.find(entity);
206  auto conn_count_iter = m_conn_counters.find(entity);
207  if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
208  (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
209  {
210  // note: we are rolling back the increment in open files
211  if (m_max_open) iter->second--;
212  std::stringstream ss;
213  ss << "User " << entity << " has hit the limit of " << m_max_conns <<
214  " open connections";
215  TRACE(CONNS, ss.str());
216  error_message = ss.str();
217  return false;
218  }
219  if (conn_iter == m_active_conns.end()) {
220  std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
221  new std::unordered_map<pid_t, unsigned long>());
222  (*conn_map)[pid] = 1;
223  m_active_conns[entity] = std::move(conn_map);
224  if (conn_count_iter == m_conn_counters.end()) {
225  m_conn_counters[entity] = 1;
226  cur_open_conns = 1;
227  } else {
228  m_conn_counters[entity] ++;
229  cur_open_conns = m_conn_counters[entity];
230  }
231  } else {
232  auto pid_iter = conn_iter->second->find(pid);
233  if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
234  (*(conn_iter->second))[pid] = 1;
235  conn_count_iter->second++;
236  cur_open_conns = conn_count_iter->second;
237  } else {
238  (*(conn_iter->second))[pid] ++;
239  cur_open_conns = conn_count_iter->second;
240  }
241  }
242  TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections");
243  }
244  if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
245  return true;
246 }
247 
248 
249 /*
250  * Decrement the number of files held open by a given entity.
251  *
252  * Returns false if the value would have fallen below zero or
253  * if the entity isn't tracked.
254  */
255 bool
256 XrdThrottleManager::CloseFile(const std::string &entity)
257 {
258  if (m_max_open == 0 && m_max_conns == 0) return true;
259 
260  bool result = true;
261  const std::lock_guard<std::mutex> lock(m_file_mutex);
262  if (m_max_open) {
263  auto iter = m_file_counters.find(entity);
264  if (iter == m_file_counters.end()) {
265  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
266  result = false;
267  } else if (iter->second == 0) {
268  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
269  result = false;
270  } else {
271  iter->second--;
272  }
273  if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
274  " remain open");
275  }
276 
277  if (m_max_conns) {
278  auto pid = XrdSysThread::Num();
279  auto conn_iter = m_active_conns.find(entity);
280  auto conn_count_iter = m_conn_counters.find(entity);
281  if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
282  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
283  " tracking");
284  return false;
285  }
286  auto pid_iter = conn_iter->second->find(pid);
287  if (pid_iter == conn_iter->second->end()) {
288  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
289  " tracking");
290  return false;
291  }
292  if (pid_iter->second == 0) {
293  TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
294  " plugin thinks was idle");
295  } else {
296  pid_iter->second--;
297  }
298  if (conn_count_iter == m_conn_counters.end()) {
299  TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
300  " observed an open file");
301  } else if (pid_iter->second == 0) {
302  if (conn_count_iter->second == 0) {
303  TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
304  " throttle plugin already thought all connections were idle");
305  } else {
306  conn_count_iter->second--;
307  TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
308  << conn_count_iter->second << " active connections remain");
309  }
310  }
311  }
312 
313  return result;
314 }
315 
316 
317 /*
318  * Apply the throttle. If there are no limits set, returns immediately. Otherwise,
319  * this applies the limits as best possible, stalling the thread if necessary.
320  */
321 void
322 XrdThrottleManager::Apply(int reqsize, int reqops, int uid)
323 {
324  if (m_bytes_per_second < 0)
325  reqsize = 0;
326  if (m_ops_per_second < 0)
327  reqops = 0;
328  while (reqsize || reqops)
329  {
330  // Subtract the requested out of the shares
331  AtomicBeg(m_compute_var);
332  GetShares(m_primary_bytes_shares[uid], reqsize);
333  if (reqsize)
334  {
335  TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
336  GetShares(m_secondary_bytes_shares[uid], reqsize);
337  TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
338  }
339  else
340  {
341  TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
342  }
343  GetShares(m_primary_ops_shares[uid], reqops);
344  if (reqops)
345  {
346  GetShares(m_secondary_ops_shares[uid], reqops);
347  }
348  StealShares(uid, reqsize, reqops);
349  AtomicEnd(m_compute_var);
350 
351  if (reqsize || reqops)
352  {
353  if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
354  if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
355  m_compute_var.Wait();
356  m_loadshed_limit_hit++;
357  }
358  }
359 
360 }
361 
362 void
363 XrdThrottleManager::UserIOAccounting()
364 {
365  std::chrono::steady_clock::duration::rep total_active_time = 0;
366  for (size_t idx = 0; idx < m_timer_list.size(); idx++) {
367  auto &timerList = m_timer_list[idx];
368  std::unique_lock<std::mutex> lock(timerList.m_mutex);
369  auto timer = timerList.m_first;
370  while (timer) {
371  auto next = timer->m_next;
372  auto uid = timer->m_owner;
373  auto &waiter = m_waiter_info[uid];
374  auto recent_duration = timer->Reset();
375  waiter.m_io_time += recent_duration.count();
376 
377  total_active_time += recent_duration.count();
378  timer = next;
379  }
380  }
381  m_io_active_time += total_active_time;
382 }
383 
384 void
385 XrdThrottleManager::ComputeWaiterOrder()
386 {
387  // Update the IO time for long-running I/O operations. This prevents,
388  // for example, a 2-minute I/O operation from causing a spike in
389  // concurrency because it's otherwise only reported at the end.
390  UserIOAccounting();
391 
392  auto now = std::chrono::steady_clock::now();
393  auto elapsed = now - m_last_waiter_recompute_time;
394  m_last_waiter_recompute_time = now;
395  std::chrono::duration<double> elapsed_secs = elapsed;
396  // Alpha is the decay factor for the exponential moving average. One window is 10 seconds,
397  // so every 10 seconds we decay the prior average by 1/e (that is, the weight is 64% of the
398  // total). This means the contribution of I/O load from a minute ago is 0.2% of the total.
399 
400  // The moving average will be used to determine how close the user is to their "fair share"
401  // of the concurrency limit among the users that are waiting.
402  auto alpha = 1 - std::exp(-1 * elapsed_secs.count() / 10.0);
403 
404  std::vector<double> share;
405  share.resize(m_max_users);
406  size_t users_with_waiters = 0;
407  // For each user, compute their current concurrency and determine how many waiting users
408  // total there are.
409  for (int i = 0; i < m_max_users; i++)
410  {
411  auto &waiter = m_waiter_info[i];
412  auto io_duration_rep = waiter.m_io_time.exchange(std::chrono::steady_clock::duration(0).count());
413  std::chrono::steady_clock::duration io_duration = std::chrono::steady_clock::duration(io_duration_rep);
414  std::chrono::duration<double> io_duration_secs = io_duration;
415  auto prev_concurrency = io_duration_secs.count() / elapsed_secs.count();
416  float new_concurrency = waiter.m_concurrency;
417 
418  new_concurrency = (1 - alpha) * new_concurrency + alpha * prev_concurrency;
419  waiter.m_concurrency = new_concurrency;
420  if (new_concurrency > 0) {
421  TRACE(DEBUG, "User " << i << " has concurrency of " << new_concurrency);
422  }
423  unsigned waiting;
424  {
425  std::lock_guard<std::mutex> lock(waiter.m_mutex);
426  waiting = waiter.m_waiting;
427  }
428  if (waiting > 0)
429  {
430  share[i] = new_concurrency;
431  TRACE(DEBUG, "User " << i << " has concurrency of " << share[i] << " and is waiting for " << waiting);
432  // Handle the division-by-zero case; if we have no history of usage whatsoever, we should pretend we
433  // have at least some minimal load
434  if (share[i] == 0) {
435  share[i] = 0.1;
436  }
437  users_with_waiters++;
438  }
439  else
440  {
441  share[i] = 0;
442  }
443  }
444  auto fair_share = static_cast<double>(m_concurrency_limit) / static_cast<double>(users_with_waiters);
445  std::vector<uint16_t> waiter_order;
446  waiter_order.resize(m_max_users);
447 
448  // Calculate the share for each user. We assume the user should get a share proportional to how
449  // far above or below the fair share they are. So, a user with concurrency of 20 when the fairshare
450  // is 10 will get 0.5 shares; a user with concurrency of 5 when the fairshare is 10 will get 2.0 shares.
451  double shares_sum = 0;
452  for (int idx = 0; idx < m_max_users; idx++)
453  {
454  if (share[idx]) {
455  shares_sum += fair_share / share[idx];
456  }
457  }
458 
459  // We must quantize the overall shares into an array of 1024 elements. We do this by
460  // scaling up (or down) based on the total number of shares computed above. Note this
461  // quantization can lead to an over-provisioned user being assigned zero shares; thus,
462  // we scale based on (1024-#users) so we can give one extra share to each user.
463  auto scale_factor = (static_cast<double>(m_max_users) - static_cast<double>(users_with_waiters)) / shares_sum;
464  size_t offset = 0;
465  for (int uid = 0; uid < m_max_users; uid++) {
466  if (share[uid] > 0) {
467  auto shares = static_cast<unsigned>(scale_factor * fair_share / share[uid]) + 1;
468  TRACE(DEBUG, "User " << uid << " has " << shares << " shares");
469  for (unsigned idx = 0; idx < shares; idx++)
470  {
471  waiter_order[offset % m_max_users] = uid;
472  offset++;
473  }
474  }
475  }
476  if (offset < m_max_users) {
477  for (size_t idx = offset; idx < m_max_users; idx++) {
478  waiter_order[idx] = -1;
479  }
480  }
481  // Shuffle the order to randomize the wakeup order.
482  std::shuffle(waiter_order.begin(), waiter_order.end(), std::default_random_engine());
483 
484  // Copy the order to the inactive array. We do not shuffle in-place because RAtomics are
485  // not move constructible, which is a requirement for std::shuffle.
486  auto &waiter_order_to_modify = (m_wake_order_active == 0) ? m_wake_order_1 : m_wake_order_0;
487  std::copy(waiter_order.begin(), waiter_order.end(), waiter_order_to_modify.begin());
488 
489  // Set the array we just modified to be the active one. Since this is a relaxed write, it could take
490  // some time for other CPUs to see the change; that's OK as this is all stochastic anyway.
491  m_wake_order_active = (m_wake_order_active + 1) % 2;
492 
493  m_waiter_offset = 0;
494 
495  // If we find ourselves below the concurrency limit because we woke up too few operations in the last
496  // interval, try waking up enough operations to fill the gap. If we race with new incoming operations,
497  // the threads will just go back to sleep.
498  if (users_with_waiters) {
499  m_waiting_users = users_with_waiters;
500  auto io_active = m_io_active.load(std::memory_order_acquire);
501  for (size_t idx = io_active; idx < static_cast<size_t>(m_concurrency_limit); idx++) {
502  NotifyOne();
503  }
504  }
505 }
506 
507 void *
508 XrdThrottleManager::RecomputeBootstrap(void *instance)
509 {
510  XrdThrottleManager * manager = static_cast<XrdThrottleManager*>(instance);
511  manager->Recompute();
512  return NULL;
513 }
514 
515 void
516 XrdThrottleManager::Recompute()
517 {
518  while (1)
519  {
520  // The connection counter can accumulate a number of known-idle connections.
521  // We only need to keep long-term memory of idle ones. Take this chance to garbage
522  // collect old connection counters.
523  if (m_max_open || m_max_conns) {
524  const std::lock_guard<std::mutex> lock(m_file_mutex);
525  for (auto iter = m_active_conns.begin(); iter != m_active_conns.end();)
526  {
527  auto & conn_count = *iter;
528  if (!conn_count.second) {
529  iter = m_active_conns.erase(iter);
530  continue;
531  }
532  for (auto iter2 = conn_count.second->begin(); iter2 != conn_count.second->end();) {
533  if (iter2->second == 0) {
534  iter2 = conn_count.second->erase(iter2);
535  } else {
536  iter2++;
537  }
538  }
539  if (!conn_count.second->size()) {
540  iter = m_active_conns.erase(iter);
541  } else {
542  iter++;
543  }
544  }
545  for (auto iter = m_conn_counters.begin(); iter != m_conn_counters.end();) {
546  if (!iter->second) {
547  iter = m_conn_counters.erase(iter);
548  } else {
549  iter++;
550  }
551  }
552  for (auto iter = m_file_counters.begin(); iter != m_file_counters.end();) {
553  if (!iter->second) {
554  iter = m_file_counters.erase(iter);
555  } else {
556  iter++;
557  }
558  }
559  }
560 
561  TRACE(DEBUG, "Recomputing fairshares for throttle.");
562  RecomputeInternal();
563  ComputeWaiterOrder();
564  TRACE(DEBUG, "Finished recomputing fairshares for throttle; sleeping for " << m_interval_length_seconds << " seconds.");
565  XrdSysTimer::Wait(static_cast<int>(1000*m_interval_length_seconds));
566  }
567 }
568 
569 /*
570  * The heart of the manager approach.
571  *
572  * This routine periodically recomputes the shares of each current user.
573  * Each user has a "primary" and a "secondary" share. At the end of the
574  * each time interval, the remaining primary share is moved to secondary.
575  * A user can utilize both shares; if both are gone, they must block until
576  * the next recompute interval.
577  *
578  * The secondary share can be "stolen" by any other user; so, if a user
579  * is idle or under-utilizing, their share can be used by someone else.
580  * However, they can never be completely starved, as no one can steal
581  * primary share.
582  *
583  * In this way, we violate the throttle for an interval, but never starve.
584  *
585  */
586 void
587 XrdThrottleManager::RecomputeInternal()
588 {
589  // Compute total shares for this interval;
590  float intervals_per_second = 1.0/m_interval_length_seconds;
591  float total_bytes_shares = m_bytes_per_second / intervals_per_second;
592  float total_ops_shares = m_ops_per_second / intervals_per_second;
593 
594  // Compute the number of active users; a user is active if they used
595  // any primary share during the last interval;
596  AtomicBeg(m_compute_var);
597  float active_users = 0;
598  long bytes_used = 0;
599  for (int i=0; i<m_max_users; i++)
600  {
601  int primary = AtomicFAZ(m_primary_bytes_shares[i]);
602  if (primary != m_last_round_allocation)
603  {
604  active_users++;
605  if (primary >= 0)
606  m_secondary_bytes_shares[i] = primary;
607  primary = AtomicFAZ(m_primary_ops_shares[i]);
608  if (primary >= 0)
609  m_secondary_ops_shares[i] = primary;
610  bytes_used += (primary < 0) ? m_last_round_allocation : (m_last_round_allocation-primary);
611  }
612  }
613 
614  if (active_users == 0)
615  {
616  active_users++;
617  }
618 
619  // Note we allocate the same number of shares to *all* users, not
620  // just the active ones. If a new user becomes active in the next
621  // interval, we'll go over our bandwidth budget just a bit.
622  m_last_round_allocation = static_cast<int>(total_bytes_shares / active_users);
623  int ops_shares = static_cast<int>(total_ops_shares / active_users);
624  TRACE(BANDWIDTH, "Round byte allocation " << m_last_round_allocation << " ; last round used " << bytes_used << ".");
625  TRACE(IOPS, "Round ops allocation " << ops_shares);
626  for (int i=0; i<m_max_users; i++)
627  {
628  m_primary_bytes_shares[i] = m_last_round_allocation;
629  m_primary_ops_shares[i] = ops_shares;
630  }
631 
632  AtomicEnd(m_compute_var);
633 
634  // Reset the loadshed limit counter.
635  int limit_hit = m_loadshed_limit_hit.exchange(0);
636  TRACE(DEBUG, "Throttle limit hit " << limit_hit << " times during last interval.");
637 
638  // Update the IO counters
639  m_compute_var.Lock();
640  m_stable_io_active = m_io_active.load(std::memory_order_acquire);
641  auto io_active = m_stable_io_active;
642  m_stable_io_total = m_io_total;
643  auto io_total = m_stable_io_total;
644  auto io_wait_rep = m_io_active_time.exchange(std::chrono::steady_clock::duration(0).count());
645  m_stable_io_wait += std::chrono::steady_clock::duration(io_wait_rep);
646 
647  m_compute_var.UnLock();
648 
649  auto io_wait_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_stable_io_wait).count();
650  TRACE(IOLOAD, "Current IO counter is " << io_active << "; total IO active time is " << io_wait_ms << "ms.");
651  if (m_gstream)
652  {
653  char buf[128];
654  auto len = snprintf(buf, 128,
655  R"({"event":"throttle_update","io_wait":%.4f,"io_active":%d,"io_total":%llu})",
656  static_cast<double>(io_wait_ms) / 1000.0, io_active, static_cast<long long unsigned>(io_total));
657  auto suc = (len < 128) ? m_gstream->Insert(buf, len + 1) : false;
658  if (!suc)
659  {
660  TRACE(IOLOAD, "Failed g-stream insertion of throttle_update record (len=" << len << "): " << buf);
661  }
662  }
663  m_compute_var.Broadcast();
664 }
665 
666 /*
667  * Do a simple hash across the username.
668  */
669 uint16_t
670 XrdThrottleManager::GetUid(const std::string &username)
671 {
672  std::hash<std::string> hash_fn;
673  auto hash = hash_fn(username);
674  auto uid = static_cast<uint16_t>(hash % m_max_users);
675  TRACE(DEBUG, "Mapping user " << username << " to UID " << uid);
676  return uid;
677 }
678 
679 /*
680  * Notify a single waiter thread that it can proceed.
681  */
682 void
683 XrdThrottleManager::NotifyOne()
684 {
685  auto &wake_order = (m_wake_order_active == 0) ? m_wake_order_0 : m_wake_order_1;
686 
687  for (size_t idx = 0; idx < m_max_users; ++idx)
688  {
689  auto offset = m_waiter_offset.fetch_add(1, std::memory_order_acq_rel);
690  int16_t uid = wake_order[offset % m_max_users];
691  if (uid < 0)
692  {
693  continue;
694  }
695  auto &waiter_info = m_waiter_info[uid];
696  std::unique_lock<std::mutex> lock(waiter_info.m_mutex);
697  if (waiter_info.m_waiting) {
698  waiter_info.NotifyOne(std::move(lock));
699  return;
700  }
701  }
702 }
703 
704 /*
705  * Create an IO timer object; increment the number of outstanding IOs.
706  */
708 XrdThrottleManager::StartIOTimer(uint16_t uid, bool &ok)
709 {
710  int cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
711  m_io_total++;
712 
713  while (m_concurrency_limit >= 0 && cur_counter >= m_concurrency_limit)
714  {
715  // If the user has essentially no concurrency, then we let them
716  // temporarily exceed the limit. This prevents potential waits for
717  // every single read for an infrequent user.
718  if (m_waiter_info[uid].m_concurrency < 1)
719  {
720  break;
721  }
722  m_loadshed_limit_hit++;
723  m_io_active.fetch_sub(1, std::memory_order_acq_rel);
724  TRACE(DEBUG, "ThrottleManager (user=" << uid << "): IO concurrency limit hit; waiting for other IOs to finish.");
725  ok = m_waiter_info[uid].Wait();
726  if (!ok) {
727  TRACE(DEBUG, "ThrottleManager (user=" << uid << "): timed out waiting for other IOs to finish.");
728  return XrdThrottleTimer();
729  }
730  cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
731  }
732 
733  ok = true;
734  return XrdThrottleTimer(this, uid);
735 }
736 
737 /*
738  * Finish recording an IO timer.
739  */
740 void
741 XrdThrottleManager::StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid)
742 {
743  m_io_active_time += event_duration.count();
744  auto old_active = m_io_active.fetch_sub(1, std::memory_order_acq_rel);
745  m_waiter_info[uid].m_io_time += event_duration.count();
746  if (old_active == static_cast<unsigned>(m_concurrency_limit))
747  {
748  // If we are below the concurrency limit threshold and have another waiter
749  // for our user, then execute it immediately. Otherwise, we will give
750  // someone else a chance to run (as we have gotten more than our share recently).
751  unsigned waiting_users = m_waiting_users;
752  if (waiting_users == 0) waiting_users = 1;
753  if (m_waiter_info[uid].m_concurrency < m_concurrency_limit / waiting_users)
754  {
755  std::unique_lock<std::mutex> lock(m_waiter_info[uid].m_mutex);
756  if (m_waiter_info[uid].m_waiting > 0)
757  {
758  m_waiter_info[uid].NotifyOne(std::move(lock));
759  return;
760  }
761  }
762  NotifyOne();
763  }
764 }
765 
766 /*
767  * Check the counters to see if we have hit any throttle limits in the
768  * current time period. If so, shed the client randomly.
769  *
770  * If the client has already been load-shedded once and reconnected to this
771  * server, then do not load-shed it again.
772  */
773 bool
774 XrdThrottleManager::CheckLoadShed(const std::string &opaque)
775 {
776  if (m_loadshed_port == 0)
777  {
778  return false;
779  }
780  if (m_loadshed_limit_hit == 0)
781  {
782  return false;
783  }
784  if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
785  {
786  return false;
787  }
788  if (opaque.empty())
789  {
790  return false;
791  }
792  return true;
793 }
794 
795 void
796 XrdThrottleManager::PrepLoadShed(const char * opaque, std::string &lsOpaque)
797 {
798  if (m_loadshed_port == 0)
799  {
800  return;
801  }
802  if (opaque && opaque[0])
803  {
804  XrdOucEnv env(opaque);
805  // Do not load shed client if it has already been done once.
806  if (env.Get("throttle.shed") != 0)
807  {
808  return;
809  }
810  lsOpaque = opaque;
811  lsOpaque += "&throttle.shed=1";
812  }
813  else
814  {
815  lsOpaque = "throttle.shed=1";
816  }
817 }
818 
819 void
820 XrdThrottleManager::PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
821 {
822  host = m_loadshed_host;
823  host += "?";
824  host += opaque;
825  port = m_loadshed_port;
826 }
827 
828 bool
829 XrdThrottleManager::Waiter::Wait()
830 {
831  auto timeout = std::chrono::steady_clock::now() + m_manager->m_max_wait_time;
832  {
833  std::unique_lock<std::mutex> lock(m_mutex);
834  m_waiting++;
835  m_cv.wait_until(lock, timeout,
836  [&] { return m_manager->m_io_active.load(std::memory_order_acquire) < static_cast<unsigned>(m_manager->m_concurrency_limit) || std::chrono::steady_clock::now() >= timeout; });
837  m_waiting--;
838  }
839  if (std::chrono::steady_clock::now() > timeout) {
840  return false;
841  }
842  return true;
843 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define AtomicFSub(w, x, y)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
char * Get(const char *varname)
Definition: XrdOucEnv.hh:69
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s)
Definition: XrdSecEntity.hh:71
XrdSecEntityAttr * eaAPI
non-const API to attributes
Definition: XrdSecEntity.hh:92
char * name
Entity's name.
Definition: XrdSecEntity.hh:69
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static unsigned long Num(void)
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
T exchange(T v, std::memory_order mo=std::memory_order_relaxed) noexcept
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
friend class XrdThrottleTimer
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
long long GetLoadshedPort() const
long long GetThrottleDataRate() const
long long GetThrottleConcurrency() const
const std::string & GetLoadshedHost() const
long long GetLoadshedFreq() const
long long GetThrottleIOPSRate() const
long long GetThrottleRecomputeIntervalMS() const
bool Insert(const char *data, int dlen)