From 9e66efb8adf1f60fbd0411086da7ee178ee9f7a2 Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Sat, 26 Oct 2019 21:57:36 -0700 Subject: [PATCH] adding burst_pct to config --- Cargo.toml | 3 +- src/req/mod.rs | 17 +++++--- src/req/rate_limit.rs | 15 +++---- src/req/regional_requester.rs | 4 +- src/req/token_bucket.rs | 50 ++++++++++++++++------- src/req/token_bucket.test.rs | 18 +++++++++ src/riot_api_config.rs | 74 ++++++++++++++++++++++++++++++++++- 7 files changed, 148 insertions(+), 33 deletions(-) create mode 100644 src/req/token_bucket.test.rs diff --git a/Cargo.toml b/Cargo.toml index 02d8707..3d31486 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ repository = "https://github.com/MingweiSamuel/Riven" description = "Riot API Library (WIP)" license = "LGPL-3.0" edition = "2018" -include = [ "src/**/*" ] +include = [ "src/**" ] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -26,5 +26,6 @@ url = "2.1" [dev-dependencies] colored = "1.8" env_logger = "0.7" +fake_clock = { git = "https://github.com/MingweiSamuel/fake_clock", branch = "master" } lazy_static = "1.4" tokio = "0.2.0-alpha.6" diff --git a/src/req/mod.rs b/src/req/mod.rs index 9bd0166..7e410d7 100644 --- a/src/req/mod.rs +++ b/src/req/mod.rs @@ -1,11 +1,16 @@ mod rate_limit; -mod rate_limit_type; -mod token_bucket; -mod regional_requester; -// mod requester_manager; - pub use rate_limit::*; + +mod rate_limit_type; pub use rate_limit_type::*; + +use std::time::Instant; // Hack for token_bucket_test.rs. +mod token_bucket; pub use token_bucket::*; + +mod regional_requester; pub use regional_requester::*; -// pub use requester_manager::*; + +#[cfg(test)] +#[path = "token_bucket.test.rs"] +mod token_bucket_test; diff --git a/src/req/rate_limit.rs b/src/req/rate_limit.rs index febc72d..7f5ee43 100644 --- a/src/req/rate_limit.rs +++ b/src/req/rate_limit.rs @@ -6,6 +6,7 @@ use parking_lot::{ RwLock, RwLockUpgradableReadGuard }; use reqwest::{ StatusCode, Response }; use scan_fmt::scan_fmt; +use crate::RiotApiConfig; use super::{ TokenBucket, VectorTokenBucket }; use super::RateLimitType; @@ -26,7 +27,7 @@ impl RateLimit { const HEADER_RETRYAFTER: &'static str = "Retry-After"; pub fn new(rate_limit_type: RateLimitType) -> Self { - let initial_bucket = VectorTokenBucket::new(Duration::from_secs(1), 1); + let initial_bucket = VectorTokenBucket::new(Duration::from_secs(1), 1, 1.0); RateLimit { rate_limit_type: rate_limit_type, // Rate limit before getting from response: 1/s. @@ -64,9 +65,9 @@ impl RateLimit { self.retry_after.read().and_then(|i| Instant::now().checked_duration_since(i)) } - pub fn on_response(&self, response: &Response) { + pub fn on_response(&self, config: &RiotApiConfig, response: &Response) { self.on_response_retry_after(response); - self.on_response_rate_limits(response); + self.on_response_rate_limits(config, response); } /// `on_response` helper for retry after check. @@ -105,7 +106,7 @@ impl RateLimit { } #[inline] - fn on_response_rate_limits(&self, response: &Response) { + fn on_response_rate_limits(&self, config: &RiotApiConfig, response: &Response) { // Check if rate limits changed. let headers = response.headers(); let limit_header_opt = headers.get(self.rate_limit_type.limit_header()) @@ -124,7 +125,7 @@ impl RateLimit { // Buckets require updating. Upgrade to write lock. let mut buckets = RwLockUpgradableReadGuard::upgrade(buckets); - *buckets = buckets_from_header(limit_header, count_header) + *buckets = buckets_from_header(config, limit_header, count_header) }} } } @@ -143,7 +144,7 @@ fn buckets_require_updating(limit_header: &str, buckets: &Vec false } -fn buckets_from_header(limit_header: &str, count_header: &str) -> Vec { +fn buckets_from_header(config: &RiotApiConfig, limit_header: &str, count_header: &str) -> Vec { // Limits: "20000:10,1200000:600" // Counts: "7:10,58:600" let size = limit_header.split(",").count(); @@ -157,7 +158,7 @@ fn buckets_from_header(limit_header: &str, count_header: &str) -> Vec Option; @@ -13,7 +22,8 @@ pub trait TokenBucket { /// # Parameters /// * `n` - Number of tokens to take. /// # Returns - /// True if the tokens were obtained without violating limits, false otherwise. + /// True if the tokens were obtained without violating limits, false + /// otherwise. fn get_tokens(&self, n: usize) -> bool; /// Get the duration of this bucket. @@ -33,15 +43,35 @@ pub struct VectorTokenBucket { duration: Duration, // Total tokens available from this TokenBucket. total_limit: usize, - // Record of timestamps (synchronized). + + /// TODO USE THESE !!!!!!! + /// Duration considered for burst factor. + burst_duration: Duration, + /// Limit allowed per burst_duration, for burst factor. + burst_limit: usize, + + /// Record of timestamps (synchronized). timestamps: Mutex>, } impl VectorTokenBucket { - pub fn new(duration: Duration, total_limit: usize) -> Self { + pub fn new(duration: Duration, total_limit: usize, burst_pct: f32) -> Self { + debug_assert!(0.0 < burst_pct && burst_pct <= 1.0, + "BAD burst_pct {}.", burst_pct); + // Float ops may lose precision, but nothing should be that precise. + // API always uses round numbers, burst_pct is frac of 256. + + let burst_duration = Duration::new( + (duration.as_secs() as f32 * burst_pct) as u64, + (duration.subsec_nanos() as f32 * burst_pct) as u32); + VectorTokenBucket { duration: duration, total_limit: total_limit, + + burst_duration: burst_duration, + burst_limit: (total_limit as f32 * burst_pct) as usize, + timestamps: Mutex::new(VecDeque::new()), } } @@ -90,13 +120,3 @@ impl TokenBucket for VectorTokenBucket { self.total_limit } } - -#[cfg(test)] -mod tests { - // use super::*; - // - // #[test] - // fn it_works() { - // assert_eq!(2 + 2, 4); - // } -} diff --git a/src/req/token_bucket.test.rs b/src/req/token_bucket.test.rs new file mode 100644 index 0000000..463d25c --- /dev/null +++ b/src/req/token_bucket.test.rs @@ -0,0 +1,18 @@ +#![cfg(test)] + +use std::time::Duration; + +use fake_clock::FakeClock as Instant; + +mod token_bucket { + include!("token_bucket.rs"); + + mod tests { + use super::*; + + #[test] + fn it_works() { + let _x = VectorTokenBucket::new(Duration::from_secs(1), 100); + } + } +} diff --git a/src/riot_api_config.rs b/src/riot_api_config.rs index 7d41afb..a678578 100644 --- a/src/riot_api_config.rs +++ b/src/riot_api_config.rs @@ -1,15 +1,85 @@ +pub mod riot_api_config { + pub const DEFAULT_BURST_PCT: f32 = 0.93; + pub const DEFAULT_BURST_FACTOR: u8 = ((BURST_FACTOR_DENOM * DEFAULT_BURST_PCT) as u16 - 1) as u8; + pub const BURST_FACTOR_DENOM: f32 = 256.0; +} +use riot_api_config::*; + /// Configuration for instantiating RiotApi. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub struct RiotApiConfig { + /// Riot Games API key from + /// [https://developer.riotgames.com/](https://developer.riotgames.com/). + /// Should be something like `"RGAPI-01234567-89ab-cdef-0123-456789abcdef"`. pub api_key: String, + /// Number of times to retry requests. Naturally, only retryable requests + /// will be retried: responses with status codes 5xx or 429 (after waiting + /// for retry-after headers). A value of `0` means one request will be sent + /// and it will not be retried if it fails. pub retries: u8, + /// Burst factor controls how requests are spread out. Higher means less + /// spread out, lower means more spread out. + /// + /// The value is converted into a "bust percentage": + /// `(burst_factor + 1) / 256`. How burst percentage controlls rate limiting + /// is detailed in the documentation of + /// [`set_burst_pct`](#method.set_burst_pct). + pub burst_factor: u8, } impl RiotApiConfig { + /// Creates a new `RiotApiConfig` with the given `api_key` and default + /// settings. pub fn with_key>(api_key: T) -> Self { Self { api_key: api_key.into(), - retries: 3 // TODO defaults. + retries: 3, // TODO defaults. + burst_factor: DEFAULT_BURST_FACTOR, } } + + /// Sets the "burst percentage", `pct`. The value must be between 0, + /// exclusive, and 1, inclusive, otherwise this method will panic. + /// + /// "Burst percentage" behaves as follows: + /// A burst percentage of x% means, for each token bucket, "x% of the + /// tokens can be used in x% of the bucket duration." So, for example, if x + /// is 90%, a bucket would allow 90% of the requests to be made without + /// any delay. Then, after waiting 90% of the bucket's duration, the + /// remaining 10% of requests could be made. + /// + /// A burst percentage of 100% results in no request spreading, which would + /// allow for the largest bursts and lowest latency, but could result in + /// 429s as bucket boundaries occur. + /// + /// A burst percentage of near 0% results in high spreading causing + /// temporally equidistant requests. This prevents 429s but has the highest + /// latency. Additionally, if the number of tokens is high, this may lower + /// the overall throughput due to the rate at which requests can be + /// scheduled. + /// + /// Therefore, for interactive applications like summoner & match history + /// lookup, a higher percentage may be better. For data-collection apps + /// like champion winrate aggregation, a medium-low percentage may be + /// better. + /// + /// # Panics + /// Panics if `pct` is not in the range `(0, 1]`; 0 exclusive, 1 inclusive. + /// + /// # Returns + /// `&mut self` for chaining. + pub fn set_burst_pct<'a>(&'a mut self, pct: f32) -> &'a mut Self + { + assert!(0.0 < pct && pct <= 1.1, + "pct must be in range (0, 1], was {}.", pct); + let sf = (std::u8::MAX as f32 * pct).ceil(); + self.burst_factor = sf as u8; + assert_eq!(sf, self.burst_factor as f32, + "!FAILED TO CONVERT FLOAT TO u8: {}, from pct {}.", sf, pct); + self + } + + pub fn get_burst_pct(&self) -> f32 { + (self.burst_factor as f32 + 1.0) / BURST_FACTOR_DENOM + } }