diff --git a/Cargo.toml b/Cargo.toml index c3cfa8b..b0c3877 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,12 @@ exclude = [ [dependencies] async-std = "0.99" +log = "0.4" parking_lot = { version = "0.9", features = [ "nightly" ] } reqwest = { version = "0.10.0-alpha.1", features = [ "gzip", "json" ] } +scan_fmt = "0.2" serde = "^1.0" tokio = "0.2.0-alpha.6" + +[dev-dependencies] +env_logger = "0.7" diff --git a/src/lib.rs b/src/lib.rs index 19c7bc9..041fd00 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,8 @@ mod tests { #[test] fn it_works() { + env_logger::init(); + let api_key_raw = std::fs::read_to_string("apikey.txt").unwrap(); // TODO don't use unwrap. let api_key = api_key_raw.trim(); @@ -23,11 +25,13 @@ mod tests { let rt = Runtime::new().unwrap(); let riot_api = RiotApi::with_key(api_key); - // https://na1.api.riotgames.com/lol/champion-mastery/v4/scores/by-summoner/SBM8Ubipo4ge2yj7bhEzL7yvV0C9Oc1XA2l6v5okGMA_nCw - let my_future = riot_api.get::("asdf", consts::Region::NA, - "/lol/champion-mastery/v4/scores/by-summoner/SBM8Ubipo4ge2yj7bhEzL7yvV0C9Oc1XA2l6v5okGMA_nCw", - &[]); - let val = rt.block_on(my_future).unwrap(); - println!("VAL: {}", val.unwrap()); + for i in 0..2 { + // https://na1.api.riotgames.com/lol/champion-mastery/v4/scores/by-summoner/SBM8Ubipo4ge2yj7bhEzL7yvV0C9Oc1XA2l6v5okGMA_nCw + let my_future = riot_api.get::("asdf", consts::Region::NA, + "/lol/champion-mastery/v4/scores/by-summoner/SBM8Ubipo4ge2yj7bhEzL7yvV0C9Oc1XA2l6v5okGMA_nCw", + &[]); + let val = rt.block_on(my_future).unwrap(); + println!("VAL {}: {}", i, val.unwrap()); + } } } diff --git a/src/req/rate_limit.rs b/src/req/rate_limit.rs index d7516b3..272a7df 100644 --- a/src/req/rate_limit.rs +++ b/src/req/rate_limit.rs @@ -1,17 +1,12 @@ use std::cmp; -use std::time::{ - Duration, - Instant, -}; +use std::time::{ Duration, Instant }; -use parking_lot::{ - RwLock, -}; +use log::debug; +use parking_lot::{ RwLock, RwLockUpgradableReadGuard }; +use reqwest::{ StatusCode, Response }; +use scan_fmt::scan_fmt; -use super::{ - TokenBucket, - VectorTokenBucket, -}; +use super::{ TokenBucket, VectorTokenBucket }; use super::RateLimitType; pub struct RateLimit { @@ -21,7 +16,7 @@ pub struct RateLimit { // from API response. buckets: RwLock>, // Set to when we can retry if a retry-after header is received. - retry_after: Option, + retry_after: RwLock>, } impl RateLimit { @@ -36,16 +31,18 @@ impl RateLimit { rate_limit_type: rate_limit_type, // Rate limit before getting from response: 1/s. buckets: RwLock::new(vec![initial_bucket]), - retry_after: None, + retry_after: RwLock::new(None), } } pub fn get_both_or_delay(app_rate_limit: &Self, method_rate_limit: &Self) -> Option { // Check retry after. - let retry_after_delay = app_rate_limit.get_retry_after_delay() - .and_then(|a| method_rate_limit.get_retry_after_delay().map(|m| cmp::max(a, m))); - if retry_after_delay.is_some() { - return retry_after_delay + { + let retry_after_delay = app_rate_limit.get_retry_after_delay() + .and_then(|a| method_rate_limit.get_retry_after_delay().map(|m| cmp::max(a, m))); + if retry_after_delay.is_some() { + return retry_after_delay + } } // Check buckets. let app_buckets = app_rate_limit.buckets.read(); @@ -64,21 +61,104 @@ impl RateLimit { } pub fn get_retry_after_delay(&self) -> Option { - self.retry_after.and_then(|i| Instant::now().checked_duration_since(i)) + self.retry_after.read().and_then(|i| Instant::now().checked_duration_since(i)) } - pub fn on_response(&self, _response: &reqwest::Response) { - return; - // TODO!!!!!!!!!! + pub fn on_response(&self, response: &Response) { + self.on_response_retry_after(response); + self.on_response_rate_limits(response); } + + /// `on_response` helper for retry after check. + #[inline] + fn on_response_retry_after(&self, response: &Response) { + if let Some(retry_after) = || -> Option { + // Only care about 429s. + if StatusCode::TOO_MANY_REQUESTS != response.status() { + return None; + } + + { + // Only care if the header that indicates the relevant RateLimit is present. + let type_name_header = response.headers() + .get(RateLimit::HEADER_XRATELIMITTYPE)?.to_str().unwrap(); + // Only care if that header's value matches us. + if self.rate_limit_type.type_name() != type_name_header.to_lowercase() { + return None; + } + } + + // Get retry after header. Only care if it exists. + let retry_after_header = response.headers() + .get(RateLimit::HEADER_RETRYAFTER)?.to_str().ok()?; + let retry_after_secs: u64 = retry_after_header.parse().ok()?; + return Some(Instant::now() + Duration::from_secs(retry_after_secs)); + }() { + *self.retry_after.write() = Some(retry_after); + } + } + + #[inline] + fn on_response_rate_limits(&self, response: &Response) { + // Check if rate limits changed. + let headers = response.headers(); + let limit_header_opt = headers.get(self.rate_limit_type.limit_header()).map(|h| h.to_str().unwrap()); + let count_header_opt = headers.get(self.rate_limit_type.count_header()).map(|h| h.to_str().unwrap()); + + if let Some(limit_header) = limit_header_opt { + if let Some(count_header) = count_header_opt { + let buckets = self.buckets.upgradable_read(); + if !buckets_require_updating(limit_header, &*buckets) { + return; + } + + // Buckets require updating. Upgrade to write lock. + let mut buckets = RwLockUpgradableReadGuard::upgrade(buckets); + *buckets = buckets_from_header(limit_header, count_header) + }} + } +} + +fn buckets_require_updating(limit_header: &str, buckets: &Vec) -> bool { + if buckets.len() != limit_header.split(",").count() { + return true; + } + for (limit_header_entry, bucket) in limit_header.split(",").zip(&*buckets) { + // limit_header_entry "100:60" means 100 req per 60 sec. + let bucket_entry = format!("{}:{}", bucket.get_total_limit(), bucket.get_bucket_duration().as_secs()); + if limit_header_entry != bucket_entry { + return true; + } + } + false +} + +fn buckets_from_header(limit_header: &str, count_header: &str) -> Vec { + // Limits: "20000:10,1200000:600" + // Counts: "7:10,58:600" + let size = limit_header.split(",").count(); + debug_assert!(size == count_header.split(",").count()); + let mut out = Vec::with_capacity(size); + + for (limit_entry, count_entry) in limit_header.split(",").zip(count_header.split(",")) { + let (limit, limit_secs) = scan_fmt!(limit_entry, "{d}:{d}", usize, u64).unwrap(); + let (count, count_secs) = scan_fmt!(count_entry, "{d}:{d}", usize, u64).unwrap(); + debug_assert!(limit_secs == count_secs); + + let bucket = VectorTokenBucket::new(Duration::from_secs(limit_secs), limit); + bucket.get_tokens(count); + out.push(bucket); + } + debug!("Set buckets to {} limit, {} count.", limit_header, count_header); + out } #[cfg(test)] mod tests { - use super::*; + // use super::*; - fn send_sync() { - fn is_send_sync() {} - is_send_sync::(); - } + // fn send_sync() { + // fn is_send_sync() {} + // is_send_sync::(); + // } } diff --git a/src/req/rate_limit_type.rs b/src/req/rate_limit_type.rs index e05567a..826a80b 100644 --- a/src/req/rate_limit_type.rs +++ b/src/req/rate_limit_type.rs @@ -1,3 +1,4 @@ +#[derive(Copy, Clone)] pub enum RateLimitType { Application, Method, diff --git a/src/req/regional_requester.rs b/src/req/regional_requester.rs index b4f9df9..7acbead 100644 --- a/src/req/regional_requester.rs +++ b/src/req/regional_requester.rs @@ -1,12 +1,7 @@ -use std::future::Future; use std::sync::Arc; use async_std::task; -use reqwest::{ - Client, - StatusCode, -}; -use parking_lot::Mutex; +use reqwest::{ Client, StatusCode }; use crate::riot_api_config::RiotApiConfig; use crate::consts::Region; @@ -113,5 +108,5 @@ impl<'a> RegionalRequester<'a> { #[cfg(test)] mod tests { - use super::*; + // use super::*; } diff --git a/src/req/requester_manager.rs b/src/req/requester_manager.rs index 54e1fa3..eff32eb 100644 --- a/src/req/requester_manager.rs +++ b/src/req/requester_manager.rs @@ -1,6 +1,3 @@ -use std::collections::HashMap; -use std::sync::Arc; - use reqwest::Client; use crate::riot_api_config::RiotApiConfig; diff --git a/src/req/token_bucket.rs b/src/req/token_bucket.rs index 4d203cd..27db671 100644 --- a/src/req/token_bucket.rs +++ b/src/req/token_bucket.rs @@ -27,6 +27,7 @@ pub trait TokenBucket { fn get_total_limit(&self) -> usize; } +#[derive(Debug)] pub struct VectorTokenBucket { /// Duration of this TokenBucket. duration: Duration, @@ -95,8 +96,8 @@ impl TokenBucket for VectorTokenBucket { #[cfg(test)] mod tests { - use super::*; - + // use super::*; + // // #[test] // fn it_works() { // assert_eq!(2 + 2, 4); diff --git a/src/util/insert_only_chashmap.rs b/src/util/insert_only_chashmap.rs index 9a496aa..375939b 100644 --- a/src/util/insert_only_chashmap.rs +++ b/src/util/insert_only_chashmap.rs @@ -1,4 +1,4 @@ -use std::borrow::Borrow; +// use std::borrow::Borrow; use std::collections::HashMap; use std::hash::Hash; use std::sync::Arc;