forked from mirror/Riven
1
0
Fork 0

adding burst_pct to config

users/mingwei/surf
Mingwei Samuel 2019-10-26 21:57:36 -07:00
parent 506aed8b30
commit 9e66efb8ad
7 changed files with 148 additions and 33 deletions

View File

@ -6,7 +6,7 @@ repository = "https://github.com/MingweiSamuel/Riven"
description = "Riot API Library (WIP)"
license = "LGPL-3.0"
edition = "2018"
include = [ "src/**/*" ]
include = [ "src/**" ]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -26,5 +26,6 @@ url = "2.1"
[dev-dependencies]
colored = "1.8"
env_logger = "0.7"
fake_clock = { git = "https://github.com/MingweiSamuel/fake_clock", branch = "master" }
lazy_static = "1.4"
tokio = "0.2.0-alpha.6"

View File

@ -1,11 +1,16 @@
mod rate_limit;
mod rate_limit_type;
mod token_bucket;
mod regional_requester;
// mod requester_manager;
pub use rate_limit::*;
mod rate_limit_type;
pub use rate_limit_type::*;
use std::time::Instant; // Hack for token_bucket_test.rs.
mod token_bucket;
pub use token_bucket::*;
mod regional_requester;
pub use regional_requester::*;
// pub use requester_manager::*;
#[cfg(test)]
#[path = "token_bucket.test.rs"]
mod token_bucket_test;

View File

@ -6,6 +6,7 @@ use parking_lot::{ RwLock, RwLockUpgradableReadGuard };
use reqwest::{ StatusCode, Response };
use scan_fmt::scan_fmt;
use crate::RiotApiConfig;
use super::{ TokenBucket, VectorTokenBucket };
use super::RateLimitType;
@ -26,7 +27,7 @@ impl RateLimit {
const HEADER_RETRYAFTER: &'static str = "Retry-After";
pub fn new(rate_limit_type: RateLimitType) -> Self {
let initial_bucket = VectorTokenBucket::new(Duration::from_secs(1), 1);
let initial_bucket = VectorTokenBucket::new(Duration::from_secs(1), 1, 1.0);
RateLimit {
rate_limit_type: rate_limit_type,
// Rate limit before getting from response: 1/s.
@ -64,9 +65,9 @@ impl RateLimit {
self.retry_after.read().and_then(|i| Instant::now().checked_duration_since(i))
}
pub fn on_response(&self, response: &Response) {
pub fn on_response(&self, config: &RiotApiConfig, response: &Response) {
self.on_response_retry_after(response);
self.on_response_rate_limits(response);
self.on_response_rate_limits(config, response);
}
/// `on_response` helper for retry after check.
@ -105,7 +106,7 @@ impl RateLimit {
}
#[inline]
fn on_response_rate_limits(&self, response: &Response) {
fn on_response_rate_limits(&self, config: &RiotApiConfig, response: &Response) {
// Check if rate limits changed.
let headers = response.headers();
let limit_header_opt = headers.get(self.rate_limit_type.limit_header())
@ -124,7 +125,7 @@ impl RateLimit {
// Buckets require updating. Upgrade to write lock.
let mut buckets = RwLockUpgradableReadGuard::upgrade(buckets);
*buckets = buckets_from_header(limit_header, count_header)
*buckets = buckets_from_header(config, limit_header, count_header)
}}
}
}
@ -143,7 +144,7 @@ fn buckets_require_updating(limit_header: &str, buckets: &Vec<VectorTokenBucket>
false
}
fn buckets_from_header(limit_header: &str, count_header: &str) -> Vec<VectorTokenBucket> {
fn buckets_from_header(config: &RiotApiConfig, limit_header: &str, count_header: &str) -> Vec<VectorTokenBucket> {
// Limits: "20000:10,1200000:600"
// Counts: "7:10,58:600"
let size = limit_header.split(",").count();
@ -157,7 +158,7 @@ fn buckets_from_header(limit_header: &str, count_header: &str) -> Vec<VectorToke
.unwrap_or_else(|_| panic!("Failed to parse count entry \"{}\".", count_entry));
debug_assert!(limit_secs == count_secs);
let bucket = VectorTokenBucket::new(Duration::from_secs(limit_secs), limit);
let bucket = VectorTokenBucket::new(Duration::from_secs(limit_secs), limit, config.get_burst_pct());
bucket.get_tokens(count);
out.push(bucket);
}

View File

@ -68,8 +68,8 @@ impl RegionalRequester {
.map_err(|e| RiotApiError::new(e, retries, None))?;
// Maybe update rate limits (based on response headers).
self.app_rate_limit.on_response(&response);
method_rate_limit.on_response(&response);
self.app_rate_limit.on_response(&config, &response);
method_rate_limit.on_response(&config, &response);
// Handle response.
let status = response.status();

View File

@ -1,10 +1,19 @@
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use std::time::Duration;
use parking_lot::{Mutex, MutexGuard};
use super::Instant; // Hack for token_bucket_test.rs.
/// A `TokenBucket` keeps track of number of requests allowed per duration of
/// time.
///
/// Respone headers contain descriptions of rate limits such as
/// `"X-App-Rate-Limit": "20:1,100:120"`. Each `TokenBucket` corresponds to a
/// single `"100:120"` (100 requests per 120 seconds).
pub trait TokenBucket {
/// Get the duration til the next available token, or 0 duration if a token is available.
/// Get the duration til the next available token, or 0 duration if a token
/// is available.
/// # Returns
/// Duration or 0 duration.
fn get_delay(&self) -> Option<Duration>;
@ -13,7 +22,8 @@ pub trait TokenBucket {
/// # Parameters
/// * `n` - Number of tokens to take.
/// # Returns
/// True if the tokens were obtained without violating limits, false otherwise.
/// True if the tokens were obtained without violating limits, false
/// otherwise.
fn get_tokens(&self, n: usize) -> bool;
/// Get the duration of this bucket.
@ -33,15 +43,35 @@ pub struct VectorTokenBucket {
duration: Duration,
// Total tokens available from this TokenBucket.
total_limit: usize,
// Record of timestamps (synchronized).
/// TODO USE THESE !!!!!!!
/// Duration considered for burst factor.
burst_duration: Duration,
/// Limit allowed per burst_duration, for burst factor.
burst_limit: usize,
/// Record of timestamps (synchronized).
timestamps: Mutex<VecDeque<Instant>>,
}
impl VectorTokenBucket {
pub fn new(duration: Duration, total_limit: usize) -> Self {
pub fn new(duration: Duration, total_limit: usize, burst_pct: f32) -> Self {
debug_assert!(0.0 < burst_pct && burst_pct <= 1.0,
"BAD burst_pct {}.", burst_pct);
// Float ops may lose precision, but nothing should be that precise.
// API always uses round numbers, burst_pct is frac of 256.
let burst_duration = Duration::new(
(duration.as_secs() as f32 * burst_pct) as u64,
(duration.subsec_nanos() as f32 * burst_pct) as u32);
VectorTokenBucket {
duration: duration,
total_limit: total_limit,
burst_duration: burst_duration,
burst_limit: (total_limit as f32 * burst_pct) as usize,
timestamps: Mutex::new(VecDeque::new()),
}
}
@ -90,13 +120,3 @@ impl TokenBucket for VectorTokenBucket {
self.total_limit
}
}
#[cfg(test)]
mod tests {
// use super::*;
//
// #[test]
// fn it_works() {
// assert_eq!(2 + 2, 4);
// }
}

View File

@ -0,0 +1,18 @@
#![cfg(test)]
use std::time::Duration;
use fake_clock::FakeClock as Instant;
mod token_bucket {
include!("token_bucket.rs");
mod tests {
use super::*;
#[test]
fn it_works() {
let _x = VectorTokenBucket::new(Duration::from_secs(1), 100);
}
}
}

View File

@ -1,15 +1,85 @@
pub mod riot_api_config {
pub const DEFAULT_BURST_PCT: f32 = 0.93;
pub const DEFAULT_BURST_FACTOR: u8 = ((BURST_FACTOR_DENOM * DEFAULT_BURST_PCT) as u16 - 1) as u8;
pub const BURST_FACTOR_DENOM: f32 = 256.0;
}
use riot_api_config::*;
/// Configuration for instantiating RiotApi.
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub struct RiotApiConfig {
/// Riot Games API key from
/// [https://developer.riotgames.com/](https://developer.riotgames.com/).
/// Should be something like `"RGAPI-01234567-89ab-cdef-0123-456789abcdef"`.
pub api_key: String,
/// Number of times to retry requests. Naturally, only retryable requests
/// will be retried: responses with status codes 5xx or 429 (after waiting
/// for retry-after headers). A value of `0` means one request will be sent
/// and it will not be retried if it fails.
pub retries: u8,
/// Burst factor controls how requests are spread out. Higher means less
/// spread out, lower means more spread out.
///
/// The value is converted into a "bust percentage":
/// `(burst_factor + 1) / 256`. How burst percentage controlls rate limiting
/// is detailed in the documentation of
/// [`set_burst_pct`](#method.set_burst_pct).
pub burst_factor: u8,
}
impl RiotApiConfig {
/// Creates a new `RiotApiConfig` with the given `api_key` and default
/// settings.
pub fn with_key<T: Into<String>>(api_key: T) -> Self {
Self {
api_key: api_key.into(),
retries: 3 // TODO defaults.
retries: 3, // TODO defaults.
burst_factor: DEFAULT_BURST_FACTOR,
}
}
/// Sets the "burst percentage", `pct`. The value must be between 0,
/// exclusive, and 1, inclusive, otherwise this method will panic.
///
/// "Burst percentage" behaves as follows:
/// A burst percentage of x% means, for each token bucket, "x% of the
/// tokens can be used in x% of the bucket duration." So, for example, if x
/// is 90%, a bucket would allow 90% of the requests to be made without
/// any delay. Then, after waiting 90% of the bucket's duration, the
/// remaining 10% of requests could be made.
///
/// A burst percentage of 100% results in no request spreading, which would
/// allow for the largest bursts and lowest latency, but could result in
/// 429s as bucket boundaries occur.
///
/// A burst percentage of near 0% results in high spreading causing
/// temporally equidistant requests. This prevents 429s but has the highest
/// latency. Additionally, if the number of tokens is high, this may lower
/// the overall throughput due to the rate at which requests can be
/// scheduled.
///
/// Therefore, for interactive applications like summoner & match history
/// lookup, a higher percentage may be better. For data-collection apps
/// like champion winrate aggregation, a medium-low percentage may be
/// better.
///
/// # Panics
/// Panics if `pct` is not in the range `(0, 1]`; 0 exclusive, 1 inclusive.
///
/// # Returns
/// `&mut self` for chaining.
pub fn set_burst_pct<'a>(&'a mut self, pct: f32) -> &'a mut Self
{
assert!(0.0 < pct && pct <= 1.1,
"pct must be in range (0, 1], was {}.", pct);
let sf = (std::u8::MAX as f32 * pct).ceil();
self.burst_factor = sf as u8;
assert_eq!(sf, self.burst_factor as f32,
"!FAILED TO CONVERT FLOAT TO u8: {}, from pct {}.", sf, pct);
self
}
pub fn get_burst_pct(&self) -> f32 {
(self.burst_factor as f32 + 1.0) / BURST_FACTOR_DENOM
}
}