responsive rate limit working

This commit is contained in:
Mingwei Samuel 2019-10-18 13:18:26 -07:00
parent 22da3abc0d
commit 907b19594b
8 changed files with 128 additions and 45 deletions

View file

@ -13,7 +13,12 @@ exclude = [
[dependencies]
async-std = "0.99"
log = "0.4"
parking_lot = { version = "0.9", features = [ "nightly" ] }
reqwest = { version = "0.10.0-alpha.1", features = [ "gzip", "json" ] }
scan_fmt = "0.2"
serde = "^1.0"
tokio = "0.2.0-alpha.6"
[dev-dependencies]
env_logger = "0.7"

View file

@ -16,6 +16,8 @@ mod tests {
#[test]
fn it_works() {
env_logger::init();
let api_key_raw = std::fs::read_to_string("apikey.txt").unwrap(); // TODO don't use unwrap.
let api_key = api_key_raw.trim();
@ -23,11 +25,13 @@ mod tests {
let rt = Runtime::new().unwrap();
let riot_api = RiotApi::with_key(api_key);
// https://na1.api.riotgames.com/lol/champion-mastery/v4/scores/by-summoner/SBM8Ubipo4ge2yj7bhEzL7yvV0C9Oc1XA2l6v5okGMA_nCw
let my_future = riot_api.get::<u32>("asdf", consts::Region::NA,
"/lol/champion-mastery/v4/scores/by-summoner/SBM8Ubipo4ge2yj7bhEzL7yvV0C9Oc1XA2l6v5okGMA_nCw",
&[]);
let val = rt.block_on(my_future).unwrap();
println!("VAL: {}", val.unwrap());
for i in 0..2 {
// https://na1.api.riotgames.com/lol/champion-mastery/v4/scores/by-summoner/SBM8Ubipo4ge2yj7bhEzL7yvV0C9Oc1XA2l6v5okGMA_nCw
let my_future = riot_api.get::<u32>("asdf", consts::Region::NA,
"/lol/champion-mastery/v4/scores/by-summoner/SBM8Ubipo4ge2yj7bhEzL7yvV0C9Oc1XA2l6v5okGMA_nCw",
&[]);
let val = rt.block_on(my_future).unwrap();
println!("VAL {}: {}", i, val.unwrap());
}
}
}

View file

@ -1,17 +1,12 @@
use std::cmp;
use std::time::{
Duration,
Instant,
};
use std::time::{ Duration, Instant };
use parking_lot::{
RwLock,
};
use log::debug;
use parking_lot::{ RwLock, RwLockUpgradableReadGuard };
use reqwest::{ StatusCode, Response };
use scan_fmt::scan_fmt;
use super::{
TokenBucket,
VectorTokenBucket,
};
use super::{ TokenBucket, VectorTokenBucket };
use super::RateLimitType;
pub struct RateLimit {
@ -21,7 +16,7 @@ pub struct RateLimit {
// from API response.
buckets: RwLock<Vec<VectorTokenBucket>>,
// Set to when we can retry if a retry-after header is received.
retry_after: Option<Instant>,
retry_after: RwLock<Option<Instant>>,
}
impl RateLimit {
@ -36,16 +31,18 @@ impl RateLimit {
rate_limit_type: rate_limit_type,
// Rate limit before getting from response: 1/s.
buckets: RwLock::new(vec![initial_bucket]),
retry_after: None,
retry_after: RwLock::new(None),
}
}
pub fn get_both_or_delay(app_rate_limit: &Self, method_rate_limit: &Self) -> Option<Duration> {
// Check retry after.
let retry_after_delay = app_rate_limit.get_retry_after_delay()
.and_then(|a| method_rate_limit.get_retry_after_delay().map(|m| cmp::max(a, m)));
if retry_after_delay.is_some() {
return retry_after_delay
{
let retry_after_delay = app_rate_limit.get_retry_after_delay()
.and_then(|a| method_rate_limit.get_retry_after_delay().map(|m| cmp::max(a, m)));
if retry_after_delay.is_some() {
return retry_after_delay
}
}
// Check buckets.
let app_buckets = app_rate_limit.buckets.read();
@ -64,21 +61,104 @@ impl RateLimit {
}
pub fn get_retry_after_delay(&self) -> Option<Duration> {
self.retry_after.and_then(|i| Instant::now().checked_duration_since(i))
self.retry_after.read().and_then(|i| Instant::now().checked_duration_since(i))
}
pub fn on_response(&self, _response: &reqwest::Response) {
return;
// TODO!!!!!!!!!!
pub fn on_response(&self, response: &Response) {
self.on_response_retry_after(response);
self.on_response_rate_limits(response);
}
/// `on_response` helper for retry after check.
#[inline]
fn on_response_retry_after(&self, response: &Response) {
if let Some(retry_after) = || -> Option<Instant> {
// Only care about 429s.
if StatusCode::TOO_MANY_REQUESTS != response.status() {
return None;
}
{
// Only care if the header that indicates the relevant RateLimit is present.
let type_name_header = response.headers()
.get(RateLimit::HEADER_XRATELIMITTYPE)?.to_str().unwrap();
// Only care if that header's value matches us.
if self.rate_limit_type.type_name() != type_name_header.to_lowercase() {
return None;
}
}
// Get retry after header. Only care if it exists.
let retry_after_header = response.headers()
.get(RateLimit::HEADER_RETRYAFTER)?.to_str().ok()?;
let retry_after_secs: u64 = retry_after_header.parse().ok()?;
return Some(Instant::now() + Duration::from_secs(retry_after_secs));
}() {
*self.retry_after.write() = Some(retry_after);
}
}
#[inline]
fn on_response_rate_limits(&self, response: &Response) {
// Check if rate limits changed.
let headers = response.headers();
let limit_header_opt = headers.get(self.rate_limit_type.limit_header()).map(|h| h.to_str().unwrap());
let count_header_opt = headers.get(self.rate_limit_type.count_header()).map(|h| h.to_str().unwrap());
if let Some(limit_header) = limit_header_opt {
if let Some(count_header) = count_header_opt {
let buckets = self.buckets.upgradable_read();
if !buckets_require_updating(limit_header, &*buckets) {
return;
}
// Buckets require updating. Upgrade to write lock.
let mut buckets = RwLockUpgradableReadGuard::upgrade(buckets);
*buckets = buckets_from_header(limit_header, count_header)
}}
}
}
fn buckets_require_updating(limit_header: &str, buckets: &Vec<VectorTokenBucket>) -> bool {
if buckets.len() != limit_header.split(",").count() {
return true;
}
for (limit_header_entry, bucket) in limit_header.split(",").zip(&*buckets) {
// limit_header_entry "100:60" means 100 req per 60 sec.
let bucket_entry = format!("{}:{}", bucket.get_total_limit(), bucket.get_bucket_duration().as_secs());
if limit_header_entry != bucket_entry {
return true;
}
}
false
}
fn buckets_from_header(limit_header: &str, count_header: &str) -> Vec<VectorTokenBucket> {
// Limits: "20000:10,1200000:600"
// Counts: "7:10,58:600"
let size = limit_header.split(",").count();
debug_assert!(size == count_header.split(",").count());
let mut out = Vec::with_capacity(size);
for (limit_entry, count_entry) in limit_header.split(",").zip(count_header.split(",")) {
let (limit, limit_secs) = scan_fmt!(limit_entry, "{d}:{d}", usize, u64).unwrap();
let (count, count_secs) = scan_fmt!(count_entry, "{d}:{d}", usize, u64).unwrap();
debug_assert!(limit_secs == count_secs);
let bucket = VectorTokenBucket::new(Duration::from_secs(limit_secs), limit);
bucket.get_tokens(count);
out.push(bucket);
}
debug!("Set buckets to {} limit, {} count.", limit_header, count_header);
out
}
#[cfg(test)]
mod tests {
use super::*;
// use super::*;
fn send_sync() {
fn is_send_sync<T: Send + Sync>() {}
is_send_sync::<RateLimit>();
}
// fn send_sync() {
// fn is_send_sync<T: Send + Sync>() {}
// is_send_sync::<RateLimit>();
// }
}

View file

@ -1,3 +1,4 @@
#[derive(Copy, Clone)]
pub enum RateLimitType {
Application,
Method,

View file

@ -1,12 +1,7 @@
use std::future::Future;
use std::sync::Arc;
use async_std::task;
use reqwest::{
Client,
StatusCode,
};
use parking_lot::Mutex;
use reqwest::{ Client, StatusCode };
use crate::riot_api_config::RiotApiConfig;
use crate::consts::Region;
@ -113,5 +108,5 @@ impl<'a> RegionalRequester<'a> {
#[cfg(test)]
mod tests {
use super::*;
// use super::*;
}

View file

@ -1,6 +1,3 @@
use std::collections::HashMap;
use std::sync::Arc;
use reqwest::Client;
use crate::riot_api_config::RiotApiConfig;

View file

@ -27,6 +27,7 @@ pub trait TokenBucket {
fn get_total_limit(&self) -> usize;
}
#[derive(Debug)]
pub struct VectorTokenBucket {
/// Duration of this TokenBucket.
duration: Duration,
@ -95,8 +96,8 @@ impl TokenBucket for VectorTokenBucket {
#[cfg(test)]
mod tests {
use super::*;
// use super::*;
//
// #[test]
// fn it_works() {
// assert_eq!(2 + 2, 4);

View file

@ -1,4 +1,4 @@
use std::borrow::Borrow;
// use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;