Updating bucket rate limiting system, adding bulk tests

This commit is contained in:
Mingwei Samuel 2019-11-02 20:55:07 -07:00
parent 1b7d78e5e3
commit a1b5406ac3
16 changed files with 349 additions and 136 deletions

View file

@ -29,3 +29,4 @@ env_logger = "0.7"
fake_clock = { git = "https://github.com/MingweiSamuel/fake_clock", branch = "master" } fake_clock = { git = "https://github.com/MingweiSamuel/fake_clock", branch = "master" }
lazy_static = "1.4" lazy_static = "1.4"
tokio = "0.2.0-alpha.6" tokio = "0.2.0-alpha.6"
futures-util-preview = "0.3.0-alpha.19"

150
src/config.rs Normal file
View file

@ -0,0 +1,150 @@
//! Configuration of RiotApi.
use std::time::Duration;
use reqwest::ClientBuilder;
/// Configuration for instantiating RiotApi.
///
///
#[derive(Debug)]
pub struct RiotApiConfig {
pub(crate) api_key: String,
pub(crate) retries: u8,
pub(crate) burst_pct: f32,
pub(crate) duration_overhead: Duration,
pub(crate) reqwest_client: ClientBuilder,
}
impl RiotApiConfig {
/// `0.99` - `burst_pct` used by `preconfig_burst` (and default `with_key`).
pub const PRECONFIG_BURST_BURST_PCT: f32 = 0.99;
/// `989` ms - `duration_overhead` used by `preconfig_burst` (and default `with_key`).
pub const PRECONFIG_BURST_DURATION_OVERHEAD_MILLIS: u64 = 989;
/// `0.47` - `burst_pct` used by `preconfig_throughput`.
pub const PRECONFIG_THROUGHPUT_BURST_PCT: f32 = 0.47;
// `10` ms - `duration_overhead` used by `preconfig_throughput`.
pub const PRECONFIG_THROUGHPUT_DURATION_OVERHEAD_MILLIS: u64 = 10;
/// Creates a new `RiotApiConfig` with the given `api_key` with the following
/// configuration:
///
/// * `retries = 3`.
/// * `purst_pct = 0.99` (`preconfig_burst`).
/// * `duration_overhead = 989 ms` (`preconfig_burst`).
///
/// `api_key` should be a Riot Games API key from
/// [https://developer.riotgames.com/](https://developer.riotgames.com/),
/// and should look like `"RGAPI-01234567-89ab-cdef-0123-456789abcdef"`.
pub fn with_key<T: Into<String>>(api_key: T) -> Self {
Self {
api_key: api_key.into(),
retries: 3,
burst_pct: Self::PRECONFIG_BURST_BURST_PCT,
duration_overhead: Duration::from_millis(Self::PRECONFIG_BURST_DURATION_OVERHEAD_MILLIS),
reqwest_client: ClientBuilder::new(),
}
}
/// Sets rate limiting settings to preconfigured values optimized for burst,
/// low latency:
///
/// * `burst_pct = 0.99` (`PRECONFIG_BURST_BURST_PCT`).
/// * `duration_overhead = 989 ms` (`PRECONFIG_BURST_DURATION_OVERHEAD_MILLIS`).
///
/// # Returns
/// `self`, for chaining.
pub fn preconfig_burst(mut self) -> Self {
self.burst_pct = Self::PRECONFIG_BURST_BURST_PCT;
self.duration_overhead = Duration::from_millis(Self::PRECONFIG_BURST_DURATION_OVERHEAD_MILLIS);
self
}
/// Sets the rate limiting settings to preconfigured values optimized for
/// high throughput:
///
/// * `burst_pct = 0.47` (`PRECONFIG_THROUGHPUT_BURST_PCT`).
/// * `duration_overhead = 10 ms` (`PRECONFIG_THROUGHPUT_DURATION_OVERHEAD_MILLIS`).
///
/// # Returns
/// `self`, for chaining.
pub fn preconfig_throughput(mut self) -> Self {
self.burst_pct = Self::PRECONFIG_THROUGHPUT_BURST_PCT;
self.duration_overhead = Duration::from_millis(Self::PRECONFIG_THROUGHPUT_DURATION_OVERHEAD_MILLIS);
self
}
/// Set number of times to retry requests. Naturally, only retryable requests
/// will be retried: responses with status codes 5xx or 429 (after waiting
/// for retry-after headers). A value of `0` means one request will be sent
/// and it will not be retried if it fails.
///
/// # Returns
/// `self`, for chaining.
pub fn set_retries(mut self, retries: u8) -> Self {
self.retries = retries;
self
}
/// Burst percentage controls how many burst requests are allowed and
/// therefore how requests are spread out. Higher equals more burst,
/// less spread. Lower equals less burst, more spread.
///
/// The value must be in the range (0, 1];
/// Between 0, exclusive, and 1, inclusive. However values should generally
/// be larger than 0.25.
///
/// Burst percentage behaves as follows:<br>
/// A burst percentage of x% means, for each token bucket, "x% of the
/// tokens can be used in x% of the bucket duration." So, for example, if x
/// is 90%, a bucket would allow 90% of the requests to be made without
/// any delay. Then, after waiting 90% of the bucket's duration, the
/// remaining 10% of requests could be made.
///
/// A burst percentage of 100% results in no request spreading, which would
/// allow for the largest bursts and lowest latency, but could result in
/// 429s as bucket boundaries occur.
///
/// A burst percentage of near 0% results in high spreading causing
/// temporally equidistant requests. This prevents 429s but has the highest
/// latency. Additionally, if the number of tokens is high, this may lower
/// the overall throughput due to the rate at which requests can be
/// scheduled.
///
/// Therefore, for interactive applications like summoner & match history
/// lookup, a higher percentage may be better. For data-collection apps
/// like champion winrate aggregation, a medium-low percentage may be
/// better.
///
/// # Panics
/// If `burst_pct` is not in range (0, 1].
///
/// # Returns
/// `self`, for chaining.
pub fn set_burst_pct(mut self, burst_pct: f32) -> Self {
// Use inverted check to handle NaN.
if 0.0 < burst_pct && burst_pct < 1.0 {
self.burst_pct = burst_pct;
return self;
}
panic!("burst_pct \"{}\" not in range (0, 1].", burst_pct);
}
/// Sets the additional bucket duration to consider when rate limiting.
/// Increasing this value will decrease the chances of 429s, but will lower
/// the overall throughput.
///
/// In a sense, the `duration_overhead` is how much to "widen" the temporal
/// width of buckets.
///
/// Given a particular Riot Game API rate limit bucket that allows N requests
/// per D duration, when counting requests this library will consider requests
/// sent in the past `D + duration_overhead` duration.
///
/// # Returns
/// `self`, for chaining.
pub fn set_duration_overhead(mut self, duration_overhead: Duration) -> Self {
self.duration_overhead = duration_overhead;
self
}
}

View file

@ -1,18 +1,19 @@
//! Module docs TODO. //! Module docs TODO.
#![feature(non_exhaustive)] #![feature(non_exhaustive)]
mod riot_api_error; mod config;
pub use riot_api_error::*; pub use config::RiotApiConfig;
pub mod consts; pub mod consts;
pub mod endpoints; pub mod endpoints;
pub mod riot_api_config; mod error;
pub use riot_api_config::RiotApiConfig; pub use error::*;
mod req;
mod riot_api; mod riot_api;
pub use riot_api::*; pub use riot_api::*;
mod req;
mod util; mod util;

View file

@ -1,3 +1,5 @@
//! Module containing rate limiting and requesting types.
mod rate_limit; mod rate_limit;
pub use rate_limit::*; pub use rate_limit::*;

View file

@ -1,7 +1,7 @@
use std::cmp; use std::cmp;
use std::time::{ Duration, Instant }; use std::time::{ Duration, Instant };
use log::*; use log;
use parking_lot::{ RwLock, RwLockUpgradableReadGuard }; use parking_lot::{ RwLock, RwLockUpgradableReadGuard };
use reqwest::{ StatusCode, Response }; use reqwest::{ StatusCode, Response };
use scan_fmt::scan_fmt; use scan_fmt::scan_fmt;
@ -27,7 +27,8 @@ impl RateLimit {
const HEADER_RETRYAFTER: &'static str = "Retry-After"; const HEADER_RETRYAFTER: &'static str = "Retry-After";
pub fn new(rate_limit_type: RateLimitType) -> Self { pub fn new(rate_limit_type: RateLimitType) -> Self {
let initial_bucket = VectorTokenBucket::new(Duration::from_secs(1), 1, 1.0); let initial_bucket = VectorTokenBucket::new(
Duration::from_secs(1), 1, Duration::new(0, 0), 1.0);
RateLimit { RateLimit {
rate_limit_type: rate_limit_type, rate_limit_type: rate_limit_type,
// Rate limit before getting from response: 1/s. // Rate limit before getting from response: 1/s.
@ -58,6 +59,8 @@ impl RateLimit {
for bucket in app_buckets.iter().chain(method_buckets.iter()) { for bucket in app_buckets.iter().chain(method_buckets.iter()) {
bucket.get_tokens(1); bucket.get_tokens(1);
} }
log::debug!("Tokens obtained, buckets: APP {:?} METHOD {:?}", app_buckets, method_buckets);
None None
} }
@ -94,6 +97,9 @@ impl RateLimit {
let retry_after_header = response.headers() let retry_after_header = response.headers()
.get(RateLimit::HEADER_RETRYAFTER)?.to_str() .get(RateLimit::HEADER_RETRYAFTER)?.to_str()
.expect("Failed to read retry-after header as string."); .expect("Failed to read retry-after header as string.");
log::debug!("Hit 429, retry-after {} secs.", retry_after_header);
// Header currently only returns ints, but float is more general. Can be zero. // Header currently only returns ints, but float is more general. Can be zero.
let retry_after_secs: f32 = retry_after_header.parse() let retry_after_secs: f32 = retry_after_header.parse()
.expect("Failed to parse retry-after header as f32."); .expect("Failed to parse retry-after header as f32.");
@ -158,11 +164,15 @@ fn buckets_from_header(config: &RiotApiConfig, limit_header: &str, count_header:
.unwrap_or_else(|_| panic!("Failed to parse count entry \"{}\".", count_entry)); .unwrap_or_else(|_| panic!("Failed to parse count entry \"{}\".", count_entry));
debug_assert!(limit_secs == count_secs); debug_assert!(limit_secs == count_secs);
let bucket = VectorTokenBucket::new(Duration::from_secs(limit_secs), limit, config.get_burst_pct()); let limit_f32 = limit as f32;
let scaled_burst_pct = config.burst_pct * limit_f32 / (limit_f32 + 1.0);
let bucket = VectorTokenBucket::new(Duration::from_secs(limit_secs), limit,
config.duration_overhead, scaled_burst_pct);
bucket.get_tokens(count); bucket.get_tokens(count);
out.push(bucket); out.push(bucket);
} }
debug!("Set buckets to {} limit, {} count.", limit_header, count_header); log::debug!("Set buckets to {} limit, {} count.", limit_header, count_header);
out out
} }

View file

@ -7,7 +7,7 @@ use tokio::timer::delay_for;
use crate::Result; use crate::Result;
use crate::RiotApiError; use crate::RiotApiError;
use crate::riot_api_config::RiotApiConfig; use crate::RiotApiConfig;
use crate::consts::Region; use crate::consts::Region;
use crate::util::InsertOnlyCHashMap; use crate::util::InsertOnlyCHashMap;
@ -73,15 +73,16 @@ impl RegionalRequester {
// Handle response. // Handle response.
let status = response.status(); let status = response.status();
log::trace!("Response {} (retried {} times).", status, retries);
// Special "none success" cases, return None. // Special "none success" cases, return None.
if Self::is_none_status_code(&status) { if Self::is_none_status_code(&status) {
log::trace!("Response {} (retried {} times), None result.", status, retries);
break Ok(None); break Ok(None);
} }
// Handle normal success / failure cases. // Handle normal success / failure cases.
match response.error_for_status_ref() { match response.error_for_status_ref() {
// Success. // Success.
Ok(_) => { Ok(_) => {
log::trace!("Response {} (retried {} times), parsed result.", status, retries);
let value = response.json::<T>().await; let value = response.json::<T>().await;
break value.map(|v| Some(v)) break value.map(|v| Some(v))
.map_err(|e| RiotApiError::new(e, retries, None)); .map_err(|e| RiotApiError::new(e, retries, None));
@ -94,8 +95,10 @@ impl RegionalRequester {
(StatusCode::TOO_MANY_REQUESTS != status (StatusCode::TOO_MANY_REQUESTS != status
&& !status.is_server_error()) && !status.is_server_error())
{ {
log::debug!("Response {} (retried {} times), returning error.", status, retries);
break Err(RiotApiError::new(err, retries, Some(response))); break Err(RiotApiError::new(err, retries, Some(response)));
} }
log::debug!("Response {} (retried {} times), retrying.", status, retries);
}, },
}; };

View file

@ -1,3 +1,4 @@
use std::fmt;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::time::Duration; use std::time::Duration;
@ -37,39 +38,47 @@ pub trait TokenBucket {
fn get_total_limit(&self) -> usize; fn get_total_limit(&self) -> usize;
} }
#[derive(Debug)]
pub struct VectorTokenBucket { pub struct VectorTokenBucket {
/// Duration of this TokenBucket. /// Duration of this TokenBucket.
duration: Duration, duration: Duration,
// Total tokens available from this TokenBucket. // Total tokens available from this TokenBucket.
total_limit: usize, total_limit: usize,
/// Extra duration to be considered on top of `duration`, to account for
/// varying network latency.
duration_overhead: Duration,
/// Duration considered for burst factor. /// Duration considered for burst factor.
burst_duration: Duration, burst_duration: Duration,
/// Limit allowed per burst_duration, for burst factor. /// Limit allowed per burst_duration, for burst factor.
burst_limit: usize, burst_limit: usize,
/// Record of timestamps (synchronized). /// Record of timestamps (synchronized).
timestamps: Mutex<VecDeque<Instant>>, timestamps: Mutex<VecDeque<Instant>>,
} }
impl VectorTokenBucket { impl VectorTokenBucket {
pub fn new(duration: Duration, total_limit: usize, burst_pct: f32) -> Self { pub fn new(duration: Duration, total_limit: usize,
duration_overhead: Duration, burst_pct: f32) -> Self
{
debug_assert!(0.0 < burst_pct && burst_pct <= 1.0, debug_assert!(0.0 < burst_pct && burst_pct <= 1.0,
"BAD burst_pct {}.", burst_pct); "BAD burst_pct {}.", burst_pct);
// Float ops may lose precision, but nothing should be that precise. // Float ops may lose precision, but nothing should be that precise.
// API always uses round numbers, burst_pct is frac of 256. // API always uses round numbers, burst_pct is frac of 256.
// Effective duration.
let d_eff = duration + duration_overhead;
let burst_duration = Duration::new( let burst_duration = Duration::new(
(duration.as_secs() as f32 * burst_pct).ceil() as u64, (d_eff.as_secs() as f32 * burst_pct).ceil() as u64,
(duration.subsec_nanos() as f32 * burst_pct).ceil() as u32); (d_eff.subsec_nanos() as f32 * burst_pct).ceil() as u32);
let burst_limit = std::cmp::max(1,
let burst_limit = (total_limit as f32 * burst_pct).ceil() as usize; (total_limit as f32 * burst_pct).floor() as usize);
debug_assert!(burst_limit > 0); debug_assert!(burst_limit <= total_limit);
VectorTokenBucket { VectorTokenBucket {
duration: duration, duration: duration,
total_limit: total_limit, total_limit: total_limit,
duration_overhead: duration_overhead,
burst_duration: burst_duration, burst_duration: burst_duration,
burst_limit: burst_limit, burst_limit: burst_limit,
@ -80,10 +89,10 @@ impl VectorTokenBucket {
fn update_get_timestamps(&self) -> MutexGuard<VecDeque<Instant>> { fn update_get_timestamps(&self) -> MutexGuard<VecDeque<Instant>> {
let mut timestamps = self.timestamps.lock(); let mut timestamps = self.timestamps.lock();
let cutoff = Instant::now() - self.duration; let cutoff = Instant::now() - self.duration - self.duration_overhead;
// We only need to trim the end of the queue to not leak memory. // We only need to trim the end of the queue to not leak memory.
// We could do it lazily somehow if we wanted to be really fancy. // We could do it lazily somehow if we wanted to be really fancy.
while timestamps.back().map_or(false, |ts| ts < &cutoff) { while timestamps.back().map_or(false, |ts| *ts < cutoff) {
timestamps.pop_back(); timestamps.pop_back();
} }
return timestamps; return timestamps;
@ -104,7 +113,8 @@ impl TokenBucket for VectorTokenBucket {
if let Some(ts) = timestamps.get(self.total_limit - 1) { if let Some(ts) = timestamps.get(self.total_limit - 1) {
// Return amount of time needed for timestamp `ts` to go away. // Return amount of time needed for timestamp `ts` to go away.
Instant::now().checked_duration_since(*ts) Instant::now().checked_duration_since(*ts)
.and_then(|passed_dur| self.duration.checked_sub(passed_dur)) .and_then(|passed_dur| (self.duration + self.duration_overhead)
.checked_sub(passed_dur))
} }
// Otherwise burst rate limit. // Otherwise burst rate limit.
else if let Some(ts) = timestamps.get(self.burst_limit - 1) { else if let Some(ts) = timestamps.get(self.burst_limit - 1) {
@ -138,3 +148,9 @@ impl TokenBucket for VectorTokenBucket {
self.total_limit self.total_limit
} }
} }
impl fmt::Debug for VectorTokenBucket {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "({}/{}:{})", self.timestamps.lock().len(), self.total_limit, self.duration.as_secs())
}
}

View file

@ -8,11 +8,16 @@ mod token_bucket {
mod tests { mod tests {
use super::*; use super::*;
use lazy_static::lazy_static;
lazy_static! {
pub static ref D00: Duration = Duration::new(0, 0);
}
#[test] #[test]
fn test_basic() { fn test_basic() {
Instant::set_time(50_000); Instant::set_time(50_000);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.95); let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.95);
assert!(bucket.get_tokens(50), "Should have not violated limit."); assert!(bucket.get_tokens(50), "Should have not violated limit.");
assert_eq!(None, bucket.get_delay(), "Can get stuff."); assert_eq!(None, bucket.get_delay(), "Can get stuff.");
assert!(!bucket.get_tokens(51), "Should have violated limit."); assert!(!bucket.get_tokens(51), "Should have violated limit.");
@ -20,16 +25,16 @@ mod token_bucket {
#[test] #[test]
fn test_internal_constructor() { fn test_internal_constructor() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 1.0); let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.0);
assert_eq!(100, bucket.burst_limit); assert_eq!(100, bucket.burst_limit);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 1e-6); let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1e-6);
assert_eq!(1, bucket.burst_limit); assert_eq!(1, bucket.burst_limit);
} }
#[test] #[test]
fn test_saturated_100_burst() { fn test_saturated_100_burst() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 1.00); let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 1.00);
Instant::set_time(50_000); Instant::set_time(50_000);
assert!(bucket.get_tokens(100), "All tokens should be immediately available."); assert!(bucket.get_tokens(100), "All tokens should be immediately available.");
@ -42,7 +47,7 @@ mod token_bucket {
#[test] #[test]
fn test_saturated_95_burst() { fn test_saturated_95_burst() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.50); let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.50);
Instant::set_time(50_000); Instant::set_time(50_000);
assert!(bucket.get_tokens(95), "95 tokens should be immediately available."); assert!(bucket.get_tokens(95), "95 tokens should be immediately available.");
@ -66,7 +71,7 @@ mod token_bucket {
#[test] #[test]
fn test_saturated_50_burst() { fn test_saturated_50_burst() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.5); let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.5);
Instant::set_time(50_000); Instant::set_time(50_000);
assert!(bucket.get_tokens(50), "Half the tokens should be immediately available."); assert!(bucket.get_tokens(50), "Half the tokens should be immediately available.");
@ -88,7 +93,7 @@ mod token_bucket {
#[test] #[test]
fn test_many() { fn test_many() {
Instant::set_time(50_000); Instant::set_time(50_000);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.95); let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, *D00, 0.95);
assert!(bucket.get_tokens(50), "Should have not violated limit."); assert!(bucket.get_tokens(50), "Should have not violated limit.");
assert_eq!(None, bucket.get_delay(), "Should not be blocked."); assert_eq!(None, bucket.get_delay(), "Should not be blocked.");
for _ in 0..20_000 { for _ in 0..20_000 {

View file

@ -10,6 +10,16 @@ use crate::req::RegionalRequester;
use crate::util::InsertOnlyCHashMap; use crate::util::InsertOnlyCHashMap;
/// For retrieving data from the Riot Games API. /// For retrieving data from the Riot Games API.
///
/// # Rate Limiting
///
/// The Riot Game API does _dynamic_ rate limiting, meaning that rate limits are
/// specified in response headers and (hypothetically) could change at any time.
/// Riven keeps track of changing rate limits seamlessly, preventing you from
/// getting blacklisted.
///
/// Riven's rate limiting is highly efficient, meaning that it can reach the limits
/// of your rate limit without going over.
pub struct RiotApi { pub struct RiotApi {
/// Configuration settings. /// Configuration settings.
config: RiotApiConfig, config: RiotApiConfig,
@ -40,7 +50,10 @@ impl RiotApi {
{ {
// TODO: max concurrent requests? Or can configure client? // TODO: max concurrent requests? Or can configure client?
self.regional_requesters self.regional_requesters
.get_or_insert_with(region, || RegionalRequester::new()) .get_or_insert_with(region, || {
log::debug!("Creating requester for region {}.", region.platform);
RegionalRequester::new()
})
.get(&self.config, &self.client, method_id, region, path, query) .get(&self.config, &self.client, method_id, region, path, query)
} }
} }

View file

@ -1,84 +0,0 @@
//! Configuration of RiotApi.
pub const DEFAULT_BURST_PCT: f32 = 0.93;
pub const DEFAULT_BURST_FACTOR: u8 = ((BURST_FACTOR_DENOM * DEFAULT_BURST_PCT) as u16 - 1) as u8;
pub const BURST_FACTOR_DENOM: f32 = 256.0;
/// Configuration for instantiating RiotApi.
#[derive(Debug, PartialEq, Eq)]
pub struct RiotApiConfig {
/// Riot Games API key from
/// [https://developer.riotgames.com/](https://developer.riotgames.com/).
/// Should be something like `"RGAPI-01234567-89ab-cdef-0123-456789abcdef"`.
pub api_key: String,
/// Number of times to retry requests. Naturally, only retryable requests
/// will be retried: responses with status codes 5xx or 429 (after waiting
/// for retry-after headers). A value of `0` means one request will be sent
/// and it will not be retried if it fails.
pub retries: u8,
/// Burst factor controls how requests are spread out. Higher means less
/// spread out, lower means more spread out.
///
/// The value is converted into a "bust percentage":
/// `(burst_factor + 1) / 256`. How burst percentage controlls rate limiting
/// is detailed in the documentation of
/// [`set_burst_pct`](#method.set_burst_pct).
pub burst_factor: u8,
}
impl RiotApiConfig {
/// Creates a new `RiotApiConfig` with the given `api_key` and default
/// settings.
pub fn with_key<T: Into<String>>(api_key: T) -> Self {
Self {
api_key: api_key.into(),
retries: 3, // TODO defaults.
burst_factor: DEFAULT_BURST_FACTOR,
}
}
/// Sets the "burst percentage", `pct`. The value must be between 0,
/// exclusive, and 1, inclusive, otherwise this method will panic.
///
/// "Burst percentage" behaves as follows:
/// A burst percentage of x% means, for each token bucket, "x% of the
/// tokens can be used in x% of the bucket duration." So, for example, if x
/// is 90%, a bucket would allow 90% of the requests to be made without
/// any delay. Then, after waiting 90% of the bucket's duration, the
/// remaining 10% of requests could be made.
///
/// A burst percentage of 100% results in no request spreading, which would
/// allow for the largest bursts and lowest latency, but could result in
/// 429s as bucket boundaries occur.
///
/// A burst percentage of near 0% results in high spreading causing
/// temporally equidistant requests. This prevents 429s but has the highest
/// latency. Additionally, if the number of tokens is high, this may lower
/// the overall throughput due to the rate at which requests can be
/// scheduled.
///
/// Therefore, for interactive applications like summoner & match history
/// lookup, a higher percentage may be better. For data-collection apps
/// like champion winrate aggregation, a medium-low percentage may be
/// better.
///
/// # Panics
/// Panics if `pct` is not in the range `(0, 1]`; 0 exclusive, 1 inclusive.
///
/// # Returns
/// `&mut self` for chaining.
pub fn set_burst_pct<'a>(&'a mut self, pct: f32) -> &'a mut Self
{
assert!(0.0 < pct && pct <= 1.1,
"pct must be in range (0, 1], was {}.", pct);
let sf = (std::u8::MAX as f32 * pct).ceil();
self.burst_factor = sf as u8;
assert_eq!(sf, self.burst_factor as f32,
"!FAILED TO CONVERT FLOAT TO u8: {}, from pct {}.", sf, pct);
self
}
pub fn get_burst_pct(&self) -> f32 {
(self.burst_factor as f32 + 1.0) / BURST_FACTOR_DENOM
}
}

View file

@ -4,8 +4,7 @@
macro_rules! async_tests { macro_rules! async_tests {
( $runner:ident { $( $name:ident : async $eval:block, )* } ) => { ( $runner:ident { $( $name:ident : async $eval:block, )* } ) => {
fn $runner(_: &[()]) { fn $runner(_: &[()]) {
const TUPLE_OK: (u32, u32) = (1, 0); env_logger::init();
const TUPLE_ERR: (u32, u32) = (0, 1);
std::process::exit({ std::process::exit({
let mut rt = tokio::runtime::current_thread::Runtime::new() let mut rt = tokio::runtime::current_thread::Runtime::new()
@ -19,26 +18,31 @@ macro_rules! async_tests {
let mut errs: u32 = 0; let mut errs: u32 = 0;
$( $(
let $name = async { let $name = async {
let result: Result<(), String> = async { let result: std::result::Result<(), String> = async {
$eval $eval
}.await; }.await;
print!("test {} ... ", stringify!($name)); result
match &result {
Ok(_) => {
println!("{}", "ok".green());
TUPLE_OK
}
Err(msg) => {
println!("{}", "error".bright_red());
println!("{}", msg);
TUPLE_ERR
}
}
}; };
let $name = tokio::executor::Executor::spawn_with_handle(
&mut tokio::executor::DefaultExecutor::current(), $name)
.expect("Failed to spawn.");
)* )*
$( $(
let $name = $name.await; let $name = $name.await;
oks += $name.0; errs += $name.1; )*
$(
print!("test {} ... ", stringify!($name));
match $name {
Ok(_) => {
println!("{}", "ok".green());
oks += 1;
}
Err(msg) => {
println!("{}", "error".bright_red());
println!("{}", msg);
errs += 1;
}
}
)* )*
println!(); println!();
print!("test result: {}. ", if errs > 0 { "error".bright_red() } else { "ok".green() }); print!("test result: {}. ", if errs > 0 { "error".bright_red() } else { "ok".green() });

View file

@ -7,7 +7,6 @@ use testutils::*;
use colored::*; use colored::*;
use riven::RiotApi;
use riven::consts::*; use riven::consts::*;

60
tests/tests_kr.rs.ignored Normal file
View file

@ -0,0 +1,60 @@
#![feature(custom_test_frameworks)]
#![feature(async_closure)]
#![test_runner(my_runner)]
mod async_tests;
mod testutils;
use testutils::*;
use futures_util::future;
use colored::*;
use riven::consts::*;
use riven::endpoints::summoner_v4::Summoner;
const REGION: Region = Region::KR;
async_tests!{
my_runner {
league_summoner_bulk_test: async {
let leagues = (1..20)
.map(async move |i| {
let leaguelist = RIOT_API.league_v4().get_league_entries(REGION,
QueueType::RANKED_SOLO_5x5, Tier::GOLD, Division::III, Some(i));
let leaguelist = leaguelist.await
.map_err(|e| e.to_string())?
.ok_or("Failed to get challenger league".to_owned())?;
println!("League list {}: {} items.", i, leaguelist.len());
let summoners = leaguelist
.iter()
.map(async move |leagueentry| {
let summonerfuture = RIOT_API.summoner_v4().get_by_summoner_id(
REGION, &leagueentry.summoner_id);
summonerfuture.await
.map_err(|e| e.to_string())?
.ok_or(format!("Failed to get summoner_id {}.",
leagueentry.summoner_id))
});
future::join_all(summoners).await
.into_iter()
// I'm not sure where this result goes.
.collect::<Result<Vec<Summoner>, String>>()
});
let all_summoners = future::join_all(leagues).await
.into_iter()
.flat_map(|league| league)
.flat_map(|summoner| summoner);
for (i, summoner) in all_summoners.enumerate() {
println!("{}: {}", i + 1, summoner.name);
}
Ok(())
},
}
}

View file

@ -3,20 +3,36 @@
mod async_tests; mod async_tests;
mod testutils; mod testutils;
use testutils::*; use testutils::{ RIOT_API, future_start };
use colored::*; use colored::*;
use riven::consts::*; use riven::consts::*;
use riven::endpoints::summoner_v4::Summoner;
const REGION: Region = Region::TR;
async_tests!{ async_tests!{
my_runner { my_runner {
league_summoner_bulk_test: async { league_summoner_bulk_test: async {
let p = RIOT_API.league_v4().get_challenger_league(Region::TR, QueueType::RANKED_SOLO_5x5); let p = RIOT_API.league_v4().get_challenger_league(REGION, QueueType::RANKED_SOLO_5x5);
// let p = future_start(p);
let ll = p.await.map_err(|e| e.to_string())?.ok_or("Failed to get challenger league".to_owned())?; let ll = p.await.map_err(|e| e.to_string())?.ok_or("Failed to get challenger league".to_owned())?;
// println!("{:#?}", ll);
// TODO!!! println!("{} Challenger {} entries.", REGION.key, ll.entries.len());
let sl = ll.entries[..50].iter()
.map(|entry| RIOT_API.summoner_v4().get_by_summoner_id(REGION, &entry.summoner_id))
.map(future_start)
.collect::<Vec<_>>();
for (i, s) in sl.into_iter().enumerate() {
let summoner_opt: Option<Summoner> = s.await.map_err(|e| e.to_string())?;
let summoner = summoner_opt.ok_or("Failed to get summoner.".to_owned())?;
println!("{}: {}", i + 1, summoner.name);
}
Ok(()) Ok(())
}, },
} }

View file

@ -1,15 +1,32 @@
use riven::RiotApi; #![allow(dead_code)]
use std::future::Future;
use futures_util::future::RemoteHandle;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tokio::executor::{ DefaultExecutor, Executor };
use riven::{ RiotApi, RiotApiConfig };
lazy_static! { lazy_static! {
pub static ref RIOT_API: RiotApi = { pub static ref RIOT_API: RiotApi = {
let api_key = std::env::var("RGAPI_KEY").ok() let api_key = std::env::var("RGAPI_KEY").ok()
.or_else(|| std::fs::read_to_string("apikey.txt").ok()) .or_else(|| std::fs::read_to_string("apikey.txt").ok())
.expect("Failed to find RGAPI_KEY env var or apikey.txt."); .expect("Failed to find RGAPI_KEY env var or apikey.txt.");
RiotApi::with_key(api_key.trim()) RiotApi::with_config(RiotApiConfig::with_key(api_key.trim())
.preconfig_burst())
}; };
} }
pub fn future_start<Fut>(future: Fut) -> RemoteHandle<<Fut as Future>::Output>
where
Fut: Future + Send + 'static,
<Fut as Future>::Output: Send,
{
Executor::spawn_with_handle(&mut DefaultExecutor::current(), future)
.expect("Failed to spawn.")
}
pub mod ids { pub mod ids {
pub const SUMMONER_ID_LUGNUTSK: &'static str = "SBM8Ubipo4ge2yj7bhEzL7yvV0C9Oc1XA2l6v5okGMA_nCw"; pub const SUMMONER_ID_LUGNUTSK: &'static str = "SBM8Ubipo4ge2yj7bhEzL7yvV0C9Oc1XA2l6v5okGMA_nCw";
pub const SUMMONER_ID_MA5TERY: &'static str = "IbC4uyFEEW3ZkZw6FZF4bViw3P1EynclAcI6-p-vCpI99Ec"; pub const SUMMONER_ID_MA5TERY: &'static str = "IbC4uyFEEW3ZkZw6FZF4bViw3P1EynclAcI6-p-vCpI99Ec";