From 75ab41aca2dd1f968c4004bbe5f3684c88b1e494 Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Sun, 4 Jul 2021 10:01:24 -0700 Subject: [PATCH] Wake up requests when rate limits update --- Cargo.toml | 2 +- src/req/rate_limit.rs | 35 +++++++++++++++++++++++++++-------- src/req/regional_requester.rs | 6 ++---- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cb4958c..e5b40b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ serde_json = "1.0" serde_repr = "0.1" strum = "0.20" strum_macros = "0.20" -tokio = { version = "1", default-features = false, features = [ "time", "parking_lot" ] } +tokio = { version = "1", default-features = false, features = [ "time", "macros", "parking_lot" ] } [dev-dependencies] colored = "2" diff --git a/src/req/rate_limit.rs b/src/req/rate_limit.rs index 08a079a..ef8ab4e 100644 --- a/src/req/rate_limit.rs +++ b/src/req/rate_limit.rs @@ -5,6 +5,7 @@ use log; use parking_lot::{ RwLock, RwLockUpgradableReadGuard }; use reqwest::{ StatusCode, Response }; use scan_fmt::scan_fmt; +use tokio::sync::Notify; use crate::RiotApiConfig; use super::{ TokenBucket, VectorTokenBucket }; @@ -18,6 +19,8 @@ pub struct RateLimit { buckets: RwLock>, // Set to when we can retry if a retry-after header is received. retry_after: RwLock>, + // Notifies waiters when rate limits are updated. + update_notify: Notify, } impl RateLimit { @@ -32,10 +35,23 @@ impl RateLimit { // Rate limit before getting from response: 1/s. buckets: RwLock::new(vec![initial_bucket]), retry_after: RwLock::new(None), + update_notify: Notify::new(), } } - pub fn get_both_or_delay(app_rate_limit: &Self, method_rate_limit: &Self) -> Option { + pub async fn acquire_both(app_rate_limit: &Self, method_rate_limit: &Self) { + while let Some(delay) = Self::acquire_both_or_duration(app_rate_limit, method_rate_limit) { + tokio::select! { + biased; + _ = tokio::time::sleep(delay) => { continue } + _ = app_rate_limit.update_notify.notified() => {} + _ = method_rate_limit.update_notify.notified() => {} + }; + log::trace!("Task awoken due to rate limit update."); + } + } + + fn acquire_both_or_duration(app_rate_limit: &Self, method_rate_limit: &Self) -> Option { // Check retry after. { let retry_after_delay = app_rate_limit.get_retry_after_delay() @@ -140,15 +156,18 @@ impl RateLimit { // https://github.com/rust-lang/rust/issues/53667 if let Some(limit_header) = limit_header_opt { if let Some(count_header) = count_header_opt { + { + let buckets = self.buckets.upgradable_read(); + if !buckets_require_updating(limit_header, &*buckets) { + return; + } - let buckets = self.buckets.upgradable_read(); - if !buckets_require_updating(limit_header, &*buckets) { - return; + // Buckets require updating. Upgrade to write lock. + let mut buckets = RwLockUpgradableReadGuard::upgrade(buckets); + *buckets = buckets_from_header(config, limit_header, count_header); } - - // Buckets require updating. Upgrade to write lock. - let mut buckets = RwLockUpgradableReadGuard::upgrade(buckets); - *buckets = buckets_from_header(config, limit_header, count_header) + // Notify waiters that buckets have updated (after unlocking). + self.update_notify.notify_waiters(); }} } } diff --git a/src/req/regional_requester.rs b/src/req/regional_requester.rs index 3b8ad4b..af3d513 100644 --- a/src/req/regional_requester.rs +++ b/src/req/regional_requester.rs @@ -45,10 +45,8 @@ impl RegionalRequester { let method_rate_limit: Arc = self.method_rate_limits .get_or_insert_with(method_id, || RateLimit::new(RateLimitType::Method)); - // Rate limiting. - while let Some(delay) = RateLimit::get_both_or_delay(&self.app_rate_limit, &*method_rate_limit) { - tokio::time::sleep(delay).await; - } + // Rate limit. + RateLimit::acquire_both(&self.app_rate_limit, &*method_rate_limit).await; // Send request. let request_clone = request.try_clone().expect("Failed to clone request.");