Add rate_usage_factor configurations

pull/34/head
Mingwei Samuel 2021-07-23 17:57:41 -07:00
parent 00e520b7af
commit e1bf531235
7 changed files with 214 additions and 68 deletions

View File

@ -11,7 +11,9 @@ use reqwest::header::{ HeaderMap, HeaderValue };
pub struct RiotApiConfig {
pub(crate) base_url: String,
pub(crate) retries: u8,
pub(crate) burst_pct: f32,
pub(crate) app_rate_usage_factor: f32,
pub(crate) method_rate_usage_factor: f32,
pub(crate) burst_factor: f32,
pub(crate) duration_overhead: Duration,
pub(crate) client_builder: Option<ClientBuilder>,
}
@ -33,10 +35,15 @@ impl RiotApiConfig {
/// Default number of retries.
pub const DEFAULT_RETRIES: u8 = 3;
/// `1.0`
///
/// Default rate limit usage factor.
pub const DEFAULT_RATE_USAGE_FACTOR: f32 = 1.0;
/// `0.99`
///
/// Default `burst_pct`, also used by `preconfig_burst`.
pub const PRECONFIG_BURST_BURST_PCT: f32 = 0.99;
/// Default `burst_factor`, also used by `preconfig_burst`.
pub const PRECONFIG_BURST_BURST_FACTOR: f32 = 0.99;
/// `989` ms
///
/// Default `duration_overhead`, also used by `preconfig_burst`.
@ -44,8 +51,8 @@ impl RiotApiConfig {
/// `0.47`
///
/// `burst_pct` used by `preconfig_throughput`.
pub const PRECONFIG_THROUGHPUT_BURST_PCT: f32 = 0.47;
/// `burst_factor` used by `preconfig_throughput`.
pub const PRECONFIG_THROUGHPUT_BURST_FACTOR: f32 = 0.47;
/// `10` ms.
///
/// `duration_overhead` used by `preconfig_throughput`.
@ -55,7 +62,7 @@ impl RiotApiConfig {
/// configuration:
///
/// * `retries = 3` (`RiotApiConfig::DEFAULT_RETRIES`).
/// * `purst_pct = 0.99` (`preconfig_burst`).
/// * `burst_factor = 0.99` (`preconfig_burst`).
/// * `duration_overhead = 989 ms` (`preconfig_burst`).
///
/// `api_key` should be a Riot Games API key from
@ -71,7 +78,9 @@ impl RiotApiConfig {
Self {
base_url: Self::DEFAULT_BASE_URL.into(),
retries: Self::DEFAULT_RETRIES,
burst_pct: Self::PRECONFIG_BURST_BURST_PCT,
app_rate_usage_factor: Self::DEFAULT_RATE_USAGE_FACTOR,
method_rate_usage_factor: Self::DEFAULT_RATE_USAGE_FACTOR,
burst_factor: Self::PRECONFIG_BURST_BURST_FACTOR,
duration_overhead: Self::PRECONFIG_BURST_DURATION_OVERHEAD,
client_builder: Some(
ClientBuilder::new()
@ -86,13 +95,15 @@ impl RiotApiConfig {
/// `RiotApiConfig::RIOT_KEY_HEADER`, otherwise authentication will fail.
///
/// * `retries = 3` (`RiotApiConfig::DEFAULT_RETRIES`).
/// * `purst_pct = 0.99` (`preconfig_burst`).
/// * `burst_factor = 0.99` (`preconfig_burst`).
/// * `duration_overhead = 989 ms` (`preconfig_burst`).
pub fn with_client_builder(client_builder: ClientBuilder) -> Self {
Self {
base_url: Self::DEFAULT_BASE_URL.to_owned(),
retries: Self::DEFAULT_RETRIES,
burst_pct: Self::PRECONFIG_BURST_BURST_PCT,
app_rate_usage_factor: Self::DEFAULT_RATE_USAGE_FACTOR,
method_rate_usage_factor: Self::DEFAULT_RATE_USAGE_FACTOR,
burst_factor: Self::PRECONFIG_BURST_BURST_FACTOR,
duration_overhead: Self::PRECONFIG_BURST_DURATION_OVERHEAD,
client_builder: Some(client_builder),
}
@ -101,13 +112,13 @@ impl RiotApiConfig {
/// Sets rate limiting settings to preconfigured values optimized for burst,
/// low latency:
///
/// * `burst_pct = 0.99` (`PRECONFIG_BURST_BURST_PCT`).
/// * `burst_factor = 0.99` (`PRECONFIG_BURST_BURST_FACTOR`).
/// * `duration_overhead = 989 ms` (`PRECONFIG_BURST_DURATION_OVERHEAD_MILLIS`).
///
/// # Returns
/// `self`, for chaining.
pub fn preconfig_burst(mut self) -> Self {
self.burst_pct = Self::PRECONFIG_BURST_BURST_PCT;
self.burst_factor = Self::PRECONFIG_BURST_BURST_FACTOR;
self.duration_overhead = Self::PRECONFIG_BURST_DURATION_OVERHEAD;
self
}
@ -115,13 +126,13 @@ impl RiotApiConfig {
/// Sets the rate limiting settings to preconfigured values optimized for
/// high throughput:
///
/// * `burst_pct = 0.47` (`PRECONFIG_THROUGHPUT_BURST_PCT`).
/// * `burst_factor = 0.47` (`PRECONFIG_THROUGHPUT_BURST_FACTOR`).
/// * `duration_overhead = 10 ms` (`PRECONFIG_THROUGHPUT_DURATION_OVERHEAD_MILLIS`).
///
/// # Returns
/// `self`, for chaining.
pub fn preconfig_throughput(mut self) -> Self {
self.burst_pct = Self::PRECONFIG_THROUGHPUT_BURST_PCT;
self.burst_factor = Self::PRECONFIG_THROUGHPUT_BURST_FACTOR;
self.duration_overhead = Self::PRECONFIG_THROUGHPUT_DURATION_OVERHEAD;
self
}
@ -149,6 +160,76 @@ impl RiotApiConfig {
self
}
/// The rate limit usage percentage controls how much of the API key's rate
/// limit will be used. The default value of `1.0` means the entirety of
/// the rate limit may be used if it is needed. This applies to both the
/// API key's rate limit (per route) _and_ to endpoint method rate limits.
///
/// Setting a value lower than `1.0` can be useful if you are running
/// multiple API instances on the same API key.
///
/// For example, four instances, possibly running on different machines,
/// could each have a value of `0.25` to share an API key's rate limit
/// evenly.
///
/// Note that if you have multiple instances hitting _different_ methods,
/// you should use [set_app_rate_usage_factor()] and [set_method_rate_usage_factor()]
/// separately, as this sets both.
///
/// This also can be used to reduce the chance of hitting 429s, although
/// 429s should be rare even with this set to `1.0`.
///
/// # Panics
/// If `rate_usage_factor` is not in range (0, 1].
///
/// # Returns
/// `self`, for chaining.
pub fn set_rate_usage_factor(mut self, rate_usage_factor: f32) -> Self {
// Use inverted check to handle NaN.
if 0.0 < rate_usage_factor && rate_usage_factor <= 1.0 {
self.app_rate_usage_factor = rate_usage_factor;
self.method_rate_usage_factor = rate_usage_factor;
return self;
}
panic!("rate_usage_factor \"{}\" not in range (0, 1].", rate_usage_factor);
}
/// See [set_rate_usage_factor]. Setting this is useful if you have multiple
/// instances sharing the app rate limit, but are hitting distinct methods
/// and therefore do not need their method usage decreased.
///
/// # Panics
/// If `app_rate_usage_factor` is not in range (0, 1\].
///
/// # Returns
/// `self`, for chaining.
pub fn set_app_rate_usage_factor(mut self, app_rate_usage_factor: f32) -> Self {
// Use inverted check to handle NaN.
if 0.0 < app_rate_usage_factor && app_rate_usage_factor <= 1.0 {
self.app_rate_usage_factor = app_rate_usage_factor;
return self;
}
panic!("app_rate_usage_factor \"{}\" not in range (0, 1].", app_rate_usage_factor);
}
/// See [set_rate_usage_factor] and [set_app_rate_usage_factor].
/// This method is mainly provided for completeness, though it may be
/// useful in advanced use cases.
///
/// # Panics
/// If `method_rate_usage_factor` is not in range (0, 1\].
///
/// # Returns
/// `self`, for chaining.
pub fn set_method_rate_usage_factor(mut self, method_rate_usage_factor: f32) -> Self {
// Use inverted check to handle NaN.
if 0.0 < method_rate_usage_factor && method_rate_usage_factor <= 1.0 {
self.method_rate_usage_factor = method_rate_usage_factor;
return self;
}
panic!("method_rate_usage_factor \"{}\" not in range (0, 1].", method_rate_usage_factor);
}
/// Burst percentage controls how many burst requests are allowed and
/// therefore how requests are spread out. Higher equals more burst,
/// less spread. Lower equals less burst, more spread.
@ -180,17 +261,17 @@ impl RiotApiConfig {
/// better.
///
/// # Panics
/// If `burst_pct` is not in range (0, 1].
/// If `burst_factor` is not in range (0, 1\].
///
/// # Returns
/// `self`, for chaining.
pub fn set_burst_pct(mut self, burst_pct: f32) -> Self {
pub fn set_burst_factor(mut self, burst_factor: f32) -> Self {
// Use inverted check to handle NaN.
if 0.0 < burst_pct && burst_pct < 1.0 {
self.burst_pct = burst_pct;
if 0.0 < burst_factor && burst_factor <= 1.0 {
self.burst_factor = burst_factor;
return self;
}
panic!("burst_pct \"{}\" not in range (0, 1].", burst_pct);
panic!("burst_factor \"{}\" not in range (0, 1].", burst_factor);
}
/// Sets the additional bucket duration to consider when rate limiting.

View File

@ -33,7 +33,7 @@ impl RateLimit {
pub fn new(rate_limit_type: RateLimitType) -> Self {
let initial_bucket = VectorTokenBucket::new(
Duration::from_secs(1), 1, Duration::new(0, 0), 1.0);
Duration::from_secs(1), 1, Duration::new(0, 0), 1.0, 1.0);
RateLimit {
rate_limit_type: rate_limit_type,
// Rate limit before getting from response: 1/s.
@ -168,7 +168,7 @@ impl RateLimit {
// Buckets require updating. Upgrade to write lock.
let mut buckets = RwLockUpgradableReadGuard::upgrade(buckets);
*buckets = buckets_from_header(config, limit_header, count_header);
*buckets = buckets_from_header(config, limit_header, count_header, self.rate_limit_type);
}
// Notify waiters that buckets have updated (after unlocking).
self.update_notify.notify_waiters();
@ -190,7 +190,7 @@ fn buckets_require_updating(limit_header: &str, buckets: &Vec<VectorTokenBucket>
false
}
fn buckets_from_header(config: &RiotApiConfig, limit_header: &str, count_header: &str) -> Vec<VectorTokenBucket> {
fn buckets_from_header(config: &RiotApiConfig, limit_header: &str, count_header: &str, rate_limit_type: RateLimitType) -> Vec<VectorTokenBucket> {
// Limits: "20000:10,1200000:600"
// Counts: "7:10,58:600"
let size = limit_header.split(",").count();
@ -204,11 +204,17 @@ fn buckets_from_header(config: &RiotApiConfig, limit_header: &str, count_header:
.unwrap_or_else(|_| panic!("Failed to parse count entry \"{}\".", count_entry));
debug_assert!(limit_secs == count_secs);
let rate_usage_factor = if RateLimitType::Application == rate_limit_type {
config.app_rate_usage_factor
} else {
config.method_rate_usage_factor
};
let limit_f32 = limit as f32;
let scaled_burst_pct = config.burst_pct * limit_f32 / (limit_f32 + 1.0);
let scaled_burst_factor = config.burst_factor * limit_f32 / (limit_f32 + 1.0);
let bucket = VectorTokenBucket::new(Duration::from_secs(limit_secs), limit,
config.duration_overhead, scaled_burst_pct);
config.duration_overhead, scaled_burst_factor, rate_usage_factor);
bucket.get_tokens(count);
out.push(bucket);
}

View File

@ -1,4 +1,4 @@
#[derive(Copy, Clone)]
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum RateLimitType {
Application,
Method,

View File

@ -39,10 +39,16 @@ pub trait TokenBucket {
}
pub struct VectorTokenBucket {
/// The total limit supplied to the constructor, unadjusted by the [rate_usage_factor].
_given_total_limit: usize,
/// Additional factor to reduce rate limit usage, in range (0, 1\].
_rate_usage_factor: f32,
/// Duration of this TokenBucket.
duration: Duration,
// Total tokens available from this TokenBucket.
total_limit: usize,
/// Extra duration to be considered on top of `duration`, to account for
/// varying network latency.
duration_overhead: Duration,
@ -58,30 +64,38 @@ pub struct VectorTokenBucket {
}
impl VectorTokenBucket {
pub fn new(duration: Duration, total_limit: usize,
duration_overhead: Duration, burst_pct: f32) -> Self
pub fn new(duration: Duration, given_total_limit: usize,
duration_overhead: Duration, burst_factor: f32,
rate_usage_factor: f32) -> Self
{
debug_assert!(0.0 < burst_pct && burst_pct <= 1.0,
"BAD burst_pct {}.", burst_pct);
debug_assert!(0.0 < rate_usage_factor && rate_usage_factor <= 1.0,
"BAD rate_usage_factor {}.", rate_usage_factor);
debug_assert!(0.0 < burst_factor && burst_factor <= 1.0,
"BAD burst_factor {}.", burst_factor);
// Float ops may lose precision, but nothing should be that precise.
// API always uses round numbers, burst_pct is frac of 256.
// API always uses round numbers, burst_factor is frac of 256.
// Adjust everything by rate_usage_factor.
let total_limit = std::cmp::max(1,
(given_total_limit as f32 * rate_usage_factor).floor() as usize);
// Effective duration.
let d_eff = duration + duration_overhead;
let burst_duration = Duration::new(
(d_eff.as_secs() as f32 * burst_pct).ceil() as u64,
(d_eff.subsec_nanos() as f32 * burst_pct).ceil() as u32);
let burst_duration = d_eff.mul_f32(burst_factor);
let burst_limit = std::cmp::max(1,
(total_limit as f32 * burst_pct).floor() as usize);
(total_limit as f32 * burst_factor).floor() as usize);
debug_assert!(burst_limit <= total_limit);
VectorTokenBucket {
duration: duration,
total_limit: total_limit,
duration_overhead: duration_overhead,
_given_total_limit: given_total_limit,
_rate_usage_factor: rate_usage_factor,
burst_duration: burst_duration,
burst_limit: burst_limit,
duration,
total_limit,
duration_overhead,
burst_duration,
burst_limit,
timestamps: Mutex::new(VecDeque::with_capacity(total_limit)),
}
@ -90,8 +104,7 @@ impl VectorTokenBucket {
fn update_get_timestamps(&self) -> MutexGuard<VecDeque<Instant>> {
let mut timestamps = self.timestamps.lock();
let cutoff = Instant::now() - self.duration - self.duration_overhead;
// We only need to trim the end of the queue to not leak memory.
// We could do it lazily somehow if we wanted to be really fancy.
// Pop off timestamps that are beyound the bucket duration.
while timestamps.back().map_or(false, |ts| *ts < cutoff) {
timestamps.pop_back();
}
@ -104,11 +117,6 @@ impl TokenBucket for VectorTokenBucket {
fn get_delay(&self) -> Option<Duration> {
let timestamps = self.update_get_timestamps();
// The "?" means:
// `if timestamps.len() < self.total_limit { return None }`
// Timestamp that needs to be popped before
// we can enter another timestamp.
// Full rate limit.
if let Some(ts) = timestamps.get(self.total_limit - 1) {
// Return amount of time needed for timestamp `ts` to go away.
@ -137,7 +145,21 @@ impl TokenBucket for VectorTokenBucket {
for _ in 0..n {
timestamps.push_front(now);
}
timestamps.len() <= self.total_limit
// Check total limit.
if self.total_limit < timestamps.len() {
return false;
}
// Check burst limit.
if let Some(burst_time) = timestamps.get(self.burst_limit) {
let duration_since = now.duration_since(*burst_time); // `now` before `burst_time` will panic.
if duration_since < self.burst_duration {
return false;
}
}
return true;
}
fn get_bucket_duration(&self) -> Duration {

View File

@ -17,7 +17,7 @@ mod token_bucket {
#[test]
fn test_basic() {
Instant::set_time(50_000);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.95);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.95, 1.0);
assert!(bucket.get_tokens(50), "Should have not violated limit.");
assert_eq!(None, bucket.get_delay(), "Can get stuff.");
assert!(!bucket.get_tokens(51), "Should have violated limit.");
@ -25,16 +25,20 @@ mod token_bucket {
#[test]
fn test_internal_constructor() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.0);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.0, 1.0);
assert_eq!(100, bucket.burst_limit);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1e-6);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1e-6, 1.0);
assert_eq!(1, bucket.burst_limit);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.0, 1e-6);
assert_eq!(1, bucket.total_limit);
assert_eq!(1, bucket.burst_limit);
}
#[test]
fn test_saturated_100_burst() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.00);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.00, 1.0);
Instant::set_time(50_000);
assert!(bucket.get_tokens(100), "All tokens should be immediately available.");
@ -47,31 +51,40 @@ mod token_bucket {
#[test]
fn test_saturated_95_burst() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.50);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.95, 1.0);
Instant::set_time(50_000);
assert!(bucket.get_tokens(95), "95 tokens should be immediately available.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(475); // Total 951.
Instant::advance_time(475);
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(476); // Total 951. Extra buffer for Duration(0).
Instant::advance_time(476); // Extra buffer for Duration(0).
assert!(bucket.get_tokens(5), "Last 5 tokens should be available.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(51);
assert!(bucket.get_tokens(95), "95 tokens should be available.");
Instant::advance_time(51); // Total 1002.
assert!(bucket.get_tokens(90), "90 tokens should be available.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(951);
assert!(bucket.get_tokens(5), "Last 5 tokens should be available.");
assert!(bucket.get_tokens(10), "Last 10 tokens should be available.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
}
#[test]
fn test_violated_50_burst() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.50, 1.0);
Instant::set_time(50_000);
assert!(!bucket.get_tokens(90), "Burst should be violated.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
}
#[test]
fn test_saturated_50_burst() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.5);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.50, 1.0);
Instant::set_time(50_000);
assert!(bucket.get_tokens(50), "Half the tokens should be immediately available.");
@ -90,21 +103,46 @@ mod token_bucket {
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
}
#[test]
fn test_saturated_90_burst_rate_usage_factor_50() {
let rate_usage_factor = 0.5;
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.90, rate_usage_factor);
Instant::set_time(50_000);
assert!(bucket.get_tokens(45), "45 tokens should be immediately available.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(475);
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(476); // Total 951. Extra buffer for Duration(0).
assert!(bucket.get_tokens(5), "Last 5 tokens should be available.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(51); // Total 1002.
assert!(bucket.get_tokens(40), "45 tokens should be available.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(951);
assert!(bucket.get_tokens(10), "Last 10 tokens should be available.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
}
#[test]
fn test_many() {
Instant::set_time(50_000);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.95);
assert!(bucket.get_tokens(50), "Should have not violated limit.");
assert_eq!(None, bucket.get_delay(), "Should not be blocked.");
for _ in 0..20_000 {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.5, 1.0);
assert!(bucket.get_tokens(50), "Should have not violated limit. i=-1.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay. i=-1.");
for i in 0..20_000 {
Instant::advance_time(501);
assert!(bucket.get_tokens(50), "Should have not violated limit.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
assert!(bucket.get_tokens(50), "Should have not violated limit. i={}.", i);
assert_ne!(None, bucket.get_delay(), "Bucket should have delay. i={}.", i);
Instant::advance_time(501);
assert!(bucket.get_tokens(50), "Should have not violated limit.");
assert_ne!(None, bucket.get_delay(), "Bucket should have delay.");
assert!(bucket.get_tokens(50), "Should have not violated limit. i={}.", i);
assert_ne!(None, bucket.get_delay(), "Bucket should have delay. i={}.", i);
}
assert!(bucket.timestamps.lock().len() < 110, "Check memory leak.");
assert!(bucket.timestamps.lock().len() < 110, "Should not memory leak.");
}
}
}

View File

@ -1,5 +1,5 @@
#!/bin/bash
set -ex
set -e
# Ensure stable builds.
cargo +stable test --no-run

View File

@ -1,3 +1,2 @@
#!/bin/bash
set -ex
RGAPI_KEY="$(cat apikey.txt)" RUST_BACKTRACE=1 RUST_LOG=riven=trace cargo +nightly test --features nightly -- --nocapture