Wake up requests when rate limits update

This commit is contained in:
Mingwei Samuel 2021-07-04 10:01:24 -07:00
parent c10275831f
commit 75ab41aca2
3 changed files with 30 additions and 13 deletions

View file

@ -36,7 +36,7 @@ serde_json = "1.0"
serde_repr = "0.1" serde_repr = "0.1"
strum = "0.20" strum = "0.20"
strum_macros = "0.20" strum_macros = "0.20"
tokio = { version = "1", default-features = false, features = [ "time", "parking_lot" ] } tokio = { version = "1", default-features = false, features = [ "time", "macros", "parking_lot" ] }
[dev-dependencies] [dev-dependencies]
colored = "2" colored = "2"

View file

@ -5,6 +5,7 @@ use log;
use parking_lot::{ RwLock, RwLockUpgradableReadGuard }; use parking_lot::{ RwLock, RwLockUpgradableReadGuard };
use reqwest::{ StatusCode, Response }; use reqwest::{ StatusCode, Response };
use scan_fmt::scan_fmt; use scan_fmt::scan_fmt;
use tokio::sync::Notify;
use crate::RiotApiConfig; use crate::RiotApiConfig;
use super::{ TokenBucket, VectorTokenBucket }; use super::{ TokenBucket, VectorTokenBucket };
@ -18,6 +19,8 @@ pub struct RateLimit {
buckets: RwLock<Vec<VectorTokenBucket>>, buckets: RwLock<Vec<VectorTokenBucket>>,
// Set to when we can retry if a retry-after header is received. // Set to when we can retry if a retry-after header is received.
retry_after: RwLock<Option<Instant>>, retry_after: RwLock<Option<Instant>>,
// Notifies waiters when rate limits are updated.
update_notify: Notify,
} }
impl RateLimit { impl RateLimit {
@ -32,10 +35,23 @@ impl RateLimit {
// Rate limit before getting from response: 1/s. // Rate limit before getting from response: 1/s.
buckets: RwLock::new(vec![initial_bucket]), buckets: RwLock::new(vec![initial_bucket]),
retry_after: RwLock::new(None), retry_after: RwLock::new(None),
update_notify: Notify::new(),
} }
} }
pub fn get_both_or_delay(app_rate_limit: &Self, method_rate_limit: &Self) -> Option<Duration> { pub async fn acquire_both(app_rate_limit: &Self, method_rate_limit: &Self) {
while let Some(delay) = Self::acquire_both_or_duration(app_rate_limit, method_rate_limit) {
tokio::select! {
biased;
_ = tokio::time::sleep(delay) => { continue }
_ = app_rate_limit.update_notify.notified() => {}
_ = method_rate_limit.update_notify.notified() => {}
};
log::trace!("Task awoken due to rate limit update.");
}
}
fn acquire_both_or_duration(app_rate_limit: &Self, method_rate_limit: &Self) -> Option<Duration> {
// Check retry after. // Check retry after.
{ {
let retry_after_delay = app_rate_limit.get_retry_after_delay() let retry_after_delay = app_rate_limit.get_retry_after_delay()
@ -140,7 +156,7 @@ impl RateLimit {
// https://github.com/rust-lang/rust/issues/53667 // https://github.com/rust-lang/rust/issues/53667
if let Some(limit_header) = limit_header_opt { if let Some(limit_header) = limit_header_opt {
if let Some(count_header) = count_header_opt { if let Some(count_header) = count_header_opt {
{
let buckets = self.buckets.upgradable_read(); let buckets = self.buckets.upgradable_read();
if !buckets_require_updating(limit_header, &*buckets) { if !buckets_require_updating(limit_header, &*buckets) {
return; return;
@ -148,7 +164,10 @@ impl RateLimit {
// Buckets require updating. Upgrade to write lock. // Buckets require updating. Upgrade to write lock.
let mut buckets = RwLockUpgradableReadGuard::upgrade(buckets); let mut buckets = RwLockUpgradableReadGuard::upgrade(buckets);
*buckets = buckets_from_header(config, limit_header, count_header) *buckets = buckets_from_header(config, limit_header, count_header);
}
// Notify waiters that buckets have updated (after unlocking).
self.update_notify.notify_waiters();
}} }}
} }
} }

View file

@ -45,10 +45,8 @@ impl RegionalRequester {
let method_rate_limit: Arc<RateLimit> = self.method_rate_limits let method_rate_limit: Arc<RateLimit> = self.method_rate_limits
.get_or_insert_with(method_id, || RateLimit::new(RateLimitType::Method)); .get_or_insert_with(method_id, || RateLimit::new(RateLimitType::Method));
// Rate limiting. // Rate limit.
while let Some(delay) = RateLimit::get_both_or_delay(&self.app_rate_limit, &*method_rate_limit) { RateLimit::acquire_both(&self.app_rate_limit, &*method_rate_limit).await;
tokio::time::sleep(delay).await;
}
// Send request. // Send request.
let request_clone = request.try_clone().expect("Failed to clone request."); let request_clone = request.try_clone().expect("Failed to clone request.");