From e1bf5312357f3f278978775fb486c5f9477c707c Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Fri, 23 Jul 2021 17:57:41 -0700 Subject: [PATCH] Add rate_usage_factor configurations --- src/config.rs | 117 +++++++++++++++++++++++++++++------ src/req/rate_limit.rs | 16 +++-- src/req/rate_limit_type.rs | 2 +- src/req/token_bucket.rs | 66 +++++++++++++------- src/req/token_bucket.test.rs | 78 +++++++++++++++++------ test-full.bash | 2 +- test.bash | 1 - 7 files changed, 214 insertions(+), 68 deletions(-) diff --git a/src/config.rs b/src/config.rs index 1d4e2d3..723fffa 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,7 +11,9 @@ use reqwest::header::{ HeaderMap, HeaderValue }; pub struct RiotApiConfig { pub(crate) base_url: String, pub(crate) retries: u8, - pub(crate) burst_pct: f32, + pub(crate) app_rate_usage_factor: f32, + pub(crate) method_rate_usage_factor: f32, + pub(crate) burst_factor: f32, pub(crate) duration_overhead: Duration, pub(crate) client_builder: Option, } @@ -33,10 +35,15 @@ impl RiotApiConfig { /// Default number of retries. pub const DEFAULT_RETRIES: u8 = 3; + /// `1.0` + /// + /// Default rate limit usage factor. + pub const DEFAULT_RATE_USAGE_FACTOR: f32 = 1.0; + /// `0.99` /// - /// Default `burst_pct`, also used by `preconfig_burst`. - pub const PRECONFIG_BURST_BURST_PCT: f32 = 0.99; + /// Default `burst_factor`, also used by `preconfig_burst`. + pub const PRECONFIG_BURST_BURST_FACTOR: f32 = 0.99; /// `989` ms /// /// Default `duration_overhead`, also used by `preconfig_burst`. @@ -44,8 +51,8 @@ impl RiotApiConfig { /// `0.47` /// - /// `burst_pct` used by `preconfig_throughput`. - pub const PRECONFIG_THROUGHPUT_BURST_PCT: f32 = 0.47; + /// `burst_factor` used by `preconfig_throughput`. + pub const PRECONFIG_THROUGHPUT_BURST_FACTOR: f32 = 0.47; /// `10` ms. /// /// `duration_overhead` used by `preconfig_throughput`. @@ -55,7 +62,7 @@ impl RiotApiConfig { /// configuration: /// /// * `retries = 3` (`RiotApiConfig::DEFAULT_RETRIES`). - /// * `purst_pct = 0.99` (`preconfig_burst`). + /// * `burst_factor = 0.99` (`preconfig_burst`). /// * `duration_overhead = 989 ms` (`preconfig_burst`). /// /// `api_key` should be a Riot Games API key from @@ -71,7 +78,9 @@ impl RiotApiConfig { Self { base_url: Self::DEFAULT_BASE_URL.into(), retries: Self::DEFAULT_RETRIES, - burst_pct: Self::PRECONFIG_BURST_BURST_PCT, + app_rate_usage_factor: Self::DEFAULT_RATE_USAGE_FACTOR, + method_rate_usage_factor: Self::DEFAULT_RATE_USAGE_FACTOR, + burst_factor: Self::PRECONFIG_BURST_BURST_FACTOR, duration_overhead: Self::PRECONFIG_BURST_DURATION_OVERHEAD, client_builder: Some( ClientBuilder::new() @@ -86,13 +95,15 @@ impl RiotApiConfig { /// `RiotApiConfig::RIOT_KEY_HEADER`, otherwise authentication will fail. /// /// * `retries = 3` (`RiotApiConfig::DEFAULT_RETRIES`). - /// * `purst_pct = 0.99` (`preconfig_burst`). + /// * `burst_factor = 0.99` (`preconfig_burst`). /// * `duration_overhead = 989 ms` (`preconfig_burst`). pub fn with_client_builder(client_builder: ClientBuilder) -> Self { Self { base_url: Self::DEFAULT_BASE_URL.to_owned(), retries: Self::DEFAULT_RETRIES, - burst_pct: Self::PRECONFIG_BURST_BURST_PCT, + app_rate_usage_factor: Self::DEFAULT_RATE_USAGE_FACTOR, + method_rate_usage_factor: Self::DEFAULT_RATE_USAGE_FACTOR, + burst_factor: Self::PRECONFIG_BURST_BURST_FACTOR, duration_overhead: Self::PRECONFIG_BURST_DURATION_OVERHEAD, client_builder: Some(client_builder), } @@ -101,13 +112,13 @@ impl RiotApiConfig { /// Sets rate limiting settings to preconfigured values optimized for burst, /// low latency: /// - /// * `burst_pct = 0.99` (`PRECONFIG_BURST_BURST_PCT`). + /// * `burst_factor = 0.99` (`PRECONFIG_BURST_BURST_FACTOR`). /// * `duration_overhead = 989 ms` (`PRECONFIG_BURST_DURATION_OVERHEAD_MILLIS`). /// /// # Returns /// `self`, for chaining. pub fn preconfig_burst(mut self) -> Self { - self.burst_pct = Self::PRECONFIG_BURST_BURST_PCT; + self.burst_factor = Self::PRECONFIG_BURST_BURST_FACTOR; self.duration_overhead = Self::PRECONFIG_BURST_DURATION_OVERHEAD; self } @@ -115,13 +126,13 @@ impl RiotApiConfig { /// Sets the rate limiting settings to preconfigured values optimized for /// high throughput: /// - /// * `burst_pct = 0.47` (`PRECONFIG_THROUGHPUT_BURST_PCT`). + /// * `burst_factor = 0.47` (`PRECONFIG_THROUGHPUT_BURST_FACTOR`). /// * `duration_overhead = 10 ms` (`PRECONFIG_THROUGHPUT_DURATION_OVERHEAD_MILLIS`). /// /// # Returns /// `self`, for chaining. pub fn preconfig_throughput(mut self) -> Self { - self.burst_pct = Self::PRECONFIG_THROUGHPUT_BURST_PCT; + self.burst_factor = Self::PRECONFIG_THROUGHPUT_BURST_FACTOR; self.duration_overhead = Self::PRECONFIG_THROUGHPUT_DURATION_OVERHEAD; self } @@ -149,6 +160,76 @@ impl RiotApiConfig { self } + /// The rate limit usage percentage controls how much of the API key's rate + /// limit will be used. The default value of `1.0` means the entirety of + /// the rate limit may be used if it is needed. This applies to both the + /// API key's rate limit (per route) _and_ to endpoint method rate limits. + /// + /// Setting a value lower than `1.0` can be useful if you are running + /// multiple API instances on the same API key. + /// + /// For example, four instances, possibly running on different machines, + /// could each have a value of `0.25` to share an API key's rate limit + /// evenly. + /// + /// Note that if you have multiple instances hitting _different_ methods, + /// you should use [set_app_rate_usage_factor()] and [set_method_rate_usage_factor()] + /// separately, as this sets both. + /// + /// This also can be used to reduce the chance of hitting 429s, although + /// 429s should be rare even with this set to `1.0`. + /// + /// # Panics + /// If `rate_usage_factor` is not in range (0, 1]. + /// + /// # Returns + /// `self`, for chaining. + pub fn set_rate_usage_factor(mut self, rate_usage_factor: f32) -> Self { + // Use inverted check to handle NaN. + if 0.0 < rate_usage_factor && rate_usage_factor <= 1.0 { + self.app_rate_usage_factor = rate_usage_factor; + self.method_rate_usage_factor = rate_usage_factor; + return self; + } + panic!("rate_usage_factor \"{}\" not in range (0, 1].", rate_usage_factor); + } + + /// See [set_rate_usage_factor]. Setting this is useful if you have multiple + /// instances sharing the app rate limit, but are hitting distinct methods + /// and therefore do not need their method usage decreased. + /// + /// # Panics + /// If `app_rate_usage_factor` is not in range (0, 1\]. + /// + /// # Returns + /// `self`, for chaining. + pub fn set_app_rate_usage_factor(mut self, app_rate_usage_factor: f32) -> Self { + // Use inverted check to handle NaN. + if 0.0 < app_rate_usage_factor && app_rate_usage_factor <= 1.0 { + self.app_rate_usage_factor = app_rate_usage_factor; + return self; + } + panic!("app_rate_usage_factor \"{}\" not in range (0, 1].", app_rate_usage_factor); + } + + /// See [set_rate_usage_factor] and [set_app_rate_usage_factor]. + /// This method is mainly provided for completeness, though it may be + /// useful in advanced use cases. + /// + /// # Panics + /// If `method_rate_usage_factor` is not in range (0, 1\]. + /// + /// # Returns + /// `self`, for chaining. + pub fn set_method_rate_usage_factor(mut self, method_rate_usage_factor: f32) -> Self { + // Use inverted check to handle NaN. + if 0.0 < method_rate_usage_factor && method_rate_usage_factor <= 1.0 { + self.method_rate_usage_factor = method_rate_usage_factor; + return self; + } + panic!("method_rate_usage_factor \"{}\" not in range (0, 1].", method_rate_usage_factor); + } + /// Burst percentage controls how many burst requests are allowed and /// therefore how requests are spread out. Higher equals more burst, /// less spread. Lower equals less burst, more spread. @@ -180,17 +261,17 @@ impl RiotApiConfig { /// better. /// /// # Panics - /// If `burst_pct` is not in range (0, 1]. + /// If `burst_factor` is not in range (0, 1\]. /// /// # Returns /// `self`, for chaining. - pub fn set_burst_pct(mut self, burst_pct: f32) -> Self { + pub fn set_burst_factor(mut self, burst_factor: f32) -> Self { // Use inverted check to handle NaN. - if 0.0 < burst_pct && burst_pct < 1.0 { - self.burst_pct = burst_pct; + if 0.0 < burst_factor && burst_factor <= 1.0 { + self.burst_factor = burst_factor; return self; } - panic!("burst_pct \"{}\" not in range (0, 1].", burst_pct); + panic!("burst_factor \"{}\" not in range (0, 1].", burst_factor); } /// Sets the additional bucket duration to consider when rate limiting. diff --git a/src/req/rate_limit.rs b/src/req/rate_limit.rs index 7b98bb4..70d233c 100644 --- a/src/req/rate_limit.rs +++ b/src/req/rate_limit.rs @@ -33,7 +33,7 @@ impl RateLimit { pub fn new(rate_limit_type: RateLimitType) -> Self { let initial_bucket = VectorTokenBucket::new( - Duration::from_secs(1), 1, Duration::new(0, 0), 1.0); + Duration::from_secs(1), 1, Duration::new(0, 0), 1.0, 1.0); RateLimit { rate_limit_type: rate_limit_type, // Rate limit before getting from response: 1/s. @@ -168,7 +168,7 @@ impl RateLimit { // Buckets require updating. Upgrade to write lock. let mut buckets = RwLockUpgradableReadGuard::upgrade(buckets); - *buckets = buckets_from_header(config, limit_header, count_header); + *buckets = buckets_from_header(config, limit_header, count_header, self.rate_limit_type); } // Notify waiters that buckets have updated (after unlocking). self.update_notify.notify_waiters(); @@ -190,7 +190,7 @@ fn buckets_require_updating(limit_header: &str, buckets: &Vec false } -fn buckets_from_header(config: &RiotApiConfig, limit_header: &str, count_header: &str) -> Vec { +fn buckets_from_header(config: &RiotApiConfig, limit_header: &str, count_header: &str, rate_limit_type: RateLimitType) -> Vec { // Limits: "20000:10,1200000:600" // Counts: "7:10,58:600" let size = limit_header.split(",").count(); @@ -204,11 +204,17 @@ fn buckets_from_header(config: &RiotApiConfig, limit_header: &str, count_header: .unwrap_or_else(|_| panic!("Failed to parse count entry \"{}\".", count_entry)); debug_assert!(limit_secs == count_secs); + let rate_usage_factor = if RateLimitType::Application == rate_limit_type { + config.app_rate_usage_factor + } else { + config.method_rate_usage_factor + }; + let limit_f32 = limit as f32; - let scaled_burst_pct = config.burst_pct * limit_f32 / (limit_f32 + 1.0); + let scaled_burst_factor = config.burst_factor * limit_f32 / (limit_f32 + 1.0); let bucket = VectorTokenBucket::new(Duration::from_secs(limit_secs), limit, - config.duration_overhead, scaled_burst_pct); + config.duration_overhead, scaled_burst_factor, rate_usage_factor); bucket.get_tokens(count); out.push(bucket); } diff --git a/src/req/rate_limit_type.rs b/src/req/rate_limit_type.rs index 7c33a8f..a3a662d 100644 --- a/src/req/rate_limit_type.rs +++ b/src/req/rate_limit_type.rs @@ -1,4 +1,4 @@ -#[derive(Copy, Clone)] +#[derive(Copy, Clone, PartialEq, Eq)] pub enum RateLimitType { Application, Method, diff --git a/src/req/token_bucket.rs b/src/req/token_bucket.rs index c124dc4..953c290 100644 --- a/src/req/token_bucket.rs +++ b/src/req/token_bucket.rs @@ -39,10 +39,16 @@ pub trait TokenBucket { } pub struct VectorTokenBucket { + /// The total limit supplied to the constructor, unadjusted by the [rate_usage_factor]. + _given_total_limit: usize, + /// Additional factor to reduce rate limit usage, in range (0, 1\]. + _rate_usage_factor: f32, + /// Duration of this TokenBucket. duration: Duration, // Total tokens available from this TokenBucket. total_limit: usize, + /// Extra duration to be considered on top of `duration`, to account for /// varying network latency. duration_overhead: Duration, @@ -58,30 +64,38 @@ pub struct VectorTokenBucket { } impl VectorTokenBucket { - pub fn new(duration: Duration, total_limit: usize, - duration_overhead: Duration, burst_pct: f32) -> Self + pub fn new(duration: Duration, given_total_limit: usize, + duration_overhead: Duration, burst_factor: f32, + rate_usage_factor: f32) -> Self { - debug_assert!(0.0 < burst_pct && burst_pct <= 1.0, - "BAD burst_pct {}.", burst_pct); + debug_assert!(0.0 < rate_usage_factor && rate_usage_factor <= 1.0, + "BAD rate_usage_factor {}.", rate_usage_factor); + debug_assert!(0.0 < burst_factor && burst_factor <= 1.0, + "BAD burst_factor {}.", burst_factor); // Float ops may lose precision, but nothing should be that precise. - // API always uses round numbers, burst_pct is frac of 256. + // API always uses round numbers, burst_factor is frac of 256. + + // Adjust everything by rate_usage_factor. + let total_limit = std::cmp::max(1, + (given_total_limit as f32 * rate_usage_factor).floor() as usize); // Effective duration. let d_eff = duration + duration_overhead; - let burst_duration = Duration::new( - (d_eff.as_secs() as f32 * burst_pct).ceil() as u64, - (d_eff.subsec_nanos() as f32 * burst_pct).ceil() as u32); + let burst_duration = d_eff.mul_f32(burst_factor); let burst_limit = std::cmp::max(1, - (total_limit as f32 * burst_pct).floor() as usize); + (total_limit as f32 * burst_factor).floor() as usize); debug_assert!(burst_limit <= total_limit); VectorTokenBucket { - duration: duration, - total_limit: total_limit, - duration_overhead: duration_overhead, + _given_total_limit: given_total_limit, + _rate_usage_factor: rate_usage_factor, - burst_duration: burst_duration, - burst_limit: burst_limit, + duration, + total_limit, + + duration_overhead, + burst_duration, + burst_limit, timestamps: Mutex::new(VecDeque::with_capacity(total_limit)), } @@ -90,8 +104,7 @@ impl VectorTokenBucket { fn update_get_timestamps(&self) -> MutexGuard> { let mut timestamps = self.timestamps.lock(); let cutoff = Instant::now() - self.duration - self.duration_overhead; - // We only need to trim the end of the queue to not leak memory. - // We could do it lazily somehow if we wanted to be really fancy. + // Pop off timestamps that are beyound the bucket duration. while timestamps.back().map_or(false, |ts| *ts < cutoff) { timestamps.pop_back(); } @@ -104,11 +117,6 @@ impl TokenBucket for VectorTokenBucket { fn get_delay(&self) -> Option { let timestamps = self.update_get_timestamps(); - // The "?" means: - // `if timestamps.len() < self.total_limit { return None }` - // Timestamp that needs to be popped before - // we can enter another timestamp. - // Full rate limit. if let Some(ts) = timestamps.get(self.total_limit - 1) { // Return amount of time needed for timestamp `ts` to go away. @@ -137,7 +145,21 @@ impl TokenBucket for VectorTokenBucket { for _ in 0..n { timestamps.push_front(now); } - timestamps.len() <= self.total_limit + + // Check total limit. + if self.total_limit < timestamps.len() { + return false; + } + + // Check burst limit. + if let Some(burst_time) = timestamps.get(self.burst_limit) { + let duration_since = now.duration_since(*burst_time); // `now` before `burst_time` will panic. + if duration_since < self.burst_duration { + return false; + } + } + + return true; } fn get_bucket_duration(&self) -> Duration { diff --git a/src/req/token_bucket.test.rs b/src/req/token_bucket.test.rs index d3f878d..e8d83cf 100644 --- a/src/req/token_bucket.test.rs +++ b/src/req/token_bucket.test.rs @@ -17,7 +17,7 @@ mod token_bucket { #[test] fn test_basic() { Instant::set_time(50_000); - let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.95); + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.95, 1.0); assert!(bucket.get_tokens(50), "Should have not violated limit."); assert_eq!(None, bucket.get_delay(), "Can get stuff."); assert!(!bucket.get_tokens(51), "Should have violated limit."); @@ -25,16 +25,20 @@ mod token_bucket { #[test] fn test_internal_constructor() { - let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.0); + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.0, 1.0); assert_eq!(100, bucket.burst_limit); - let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1e-6); + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1e-6, 1.0); + assert_eq!(1, bucket.burst_limit); + + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.0, 1e-6); + assert_eq!(1, bucket.total_limit); assert_eq!(1, bucket.burst_limit); } #[test] fn test_saturated_100_burst() { - let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.00); + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.00, 1.0); Instant::set_time(50_000); assert!(bucket.get_tokens(100), "All tokens should be immediately available."); @@ -47,31 +51,40 @@ mod token_bucket { #[test] fn test_saturated_95_burst() { - let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.50); + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.95, 1.0); Instant::set_time(50_000); assert!(bucket.get_tokens(95), "95 tokens should be immediately available."); assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); - Instant::advance_time(475); // Total 951. + Instant::advance_time(475); assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); + Instant::advance_time(476); // Total 951. Extra buffer for Duration(0). - Instant::advance_time(476); // Extra buffer for Duration(0). assert!(bucket.get_tokens(5), "Last 5 tokens should be available."); assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); - Instant::advance_time(51); - assert!(bucket.get_tokens(95), "95 tokens should be available."); + Instant::advance_time(51); // Total 1002. + assert!(bucket.get_tokens(90), "90 tokens should be available."); assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); Instant::advance_time(951); - assert!(bucket.get_tokens(5), "Last 5 tokens should be available."); + assert!(bucket.get_tokens(10), "Last 10 tokens should be available."); + assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); + } + + #[test] + fn test_violated_50_burst() { + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.50, 1.0); + + Instant::set_time(50_000); + assert!(!bucket.get_tokens(90), "Burst should be violated."); assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); } #[test] fn test_saturated_50_burst() { - let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.5); + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.50, 1.0); Instant::set_time(50_000); assert!(bucket.get_tokens(50), "Half the tokens should be immediately available."); @@ -90,21 +103,46 @@ mod token_bucket { assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); } + #[test] + fn test_saturated_90_burst_rate_usage_factor_50() { + let rate_usage_factor = 0.5; + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.90, rate_usage_factor); + + Instant::set_time(50_000); + assert!(bucket.get_tokens(45), "45 tokens should be immediately available."); + assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); + + Instant::advance_time(475); + assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); + Instant::advance_time(476); // Total 951. Extra buffer for Duration(0). + + assert!(bucket.get_tokens(5), "Last 5 tokens should be available."); + assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); + + Instant::advance_time(51); // Total 1002. + assert!(bucket.get_tokens(40), "45 tokens should be available."); + assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); + + Instant::advance_time(951); + assert!(bucket.get_tokens(10), "Last 10 tokens should be available."); + assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); + } + #[test] fn test_many() { Instant::set_time(50_000); - let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.95); - assert!(bucket.get_tokens(50), "Should have not violated limit."); - assert_eq!(None, bucket.get_delay(), "Should not be blocked."); - for _ in 0..20_000 { + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.5, 1.0); + assert!(bucket.get_tokens(50), "Should have not violated limit. i=-1."); + assert_ne!(None, bucket.get_delay(), "Bucket should have delay. i=-1."); + for i in 0..20_000 { Instant::advance_time(501); - assert!(bucket.get_tokens(50), "Should have not violated limit."); - assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); + assert!(bucket.get_tokens(50), "Should have not violated limit. i={}.", i); + assert_ne!(None, bucket.get_delay(), "Bucket should have delay. i={}.", i); Instant::advance_time(501); - assert!(bucket.get_tokens(50), "Should have not violated limit."); - assert_ne!(None, bucket.get_delay(), "Bucket should have delay."); + assert!(bucket.get_tokens(50), "Should have not violated limit. i={}.", i); + assert_ne!(None, bucket.get_delay(), "Bucket should have delay. i={}.", i); } - assert!(bucket.timestamps.lock().len() < 110, "Check memory leak."); + assert!(bucket.timestamps.lock().len() < 110, "Should not memory leak."); } } } diff --git a/test-full.bash b/test-full.bash index ed8cbb1..ff51948 100644 --- a/test-full.bash +++ b/test-full.bash @@ -1,5 +1,5 @@ #!/bin/bash -set -ex +set -e # Ensure stable builds. cargo +stable test --no-run diff --git a/test.bash b/test.bash index 5f6f44f..05ebdb7 100755 --- a/test.bash +++ b/test.bash @@ -1,3 +1,2 @@ #!/bin/bash -set -ex RGAPI_KEY="$(cat apikey.txt)" RUST_BACKTRACE=1 RUST_LOG=riven=trace cargo +nightly test --features nightly -- --nocapture