adding token bucket burst, tests

pull/5/head
Mingwei Samuel 2019-10-27 00:42:28 -07:00
parent 9e66efb8ad
commit 3505dc10a9
2 changed files with 117 additions and 13 deletions

View File

@ -12,7 +12,7 @@ use super::Instant; // Hack for token_bucket_test.rs.
/// `"X-App-Rate-Limit": "20:1,100:120"`. Each `TokenBucket` corresponds to a /// `"X-App-Rate-Limit": "20:1,100:120"`. Each `TokenBucket` corresponds to a
/// single `"100:120"` (100 requests per 120 seconds). /// single `"100:120"` (100 requests per 120 seconds).
pub trait TokenBucket { pub trait TokenBucket {
/// Get the duration til the next available token, or 0 duration if a token /// Get the duration til the next available token, or None if a token
/// is available. /// is available.
/// # Returns /// # Returns
/// Duration or 0 duration. /// Duration or 0 duration.
@ -44,7 +44,6 @@ pub struct VectorTokenBucket {
// Total tokens available from this TokenBucket. // Total tokens available from this TokenBucket.
total_limit: usize, total_limit: usize,
/// TODO USE THESE !!!!!!!
/// Duration considered for burst factor. /// Duration considered for burst factor.
burst_duration: Duration, burst_duration: Duration,
/// Limit allowed per burst_duration, for burst factor. /// Limit allowed per burst_duration, for burst factor.
@ -62,23 +61,28 @@ impl VectorTokenBucket {
// API always uses round numbers, burst_pct is frac of 256. // API always uses round numbers, burst_pct is frac of 256.
let burst_duration = Duration::new( let burst_duration = Duration::new(
(duration.as_secs() as f32 * burst_pct) as u64, (duration.as_secs() as f32 * burst_pct).ceil() as u64,
(duration.subsec_nanos() as f32 * burst_pct) as u32); (duration.subsec_nanos() as f32 * burst_pct).ceil() as u32);
let burst_limit = (total_limit as f32 * burst_pct).ceil() as usize;
debug_assert!(burst_limit > 0);
VectorTokenBucket { VectorTokenBucket {
duration: duration, duration: duration,
total_limit: total_limit, total_limit: total_limit,
burst_duration: burst_duration, burst_duration: burst_duration,
burst_limit: (total_limit as f32 * burst_pct) as usize, burst_limit: burst_limit,
timestamps: Mutex::new(VecDeque::new()), timestamps: Mutex::new(VecDeque::with_capacity(total_limit)),
} }
} }
fn update_get_timestamps(&self) -> MutexGuard<VecDeque<Instant>> { fn update_get_timestamps(&self) -> MutexGuard<VecDeque<Instant>> {
let mut timestamps = self.timestamps.lock(); let mut timestamps = self.timestamps.lock();
let cutoff = Instant::now() - self.duration; let cutoff = Instant::now() - self.duration;
// We only need to trim the end of the queue to not leak memory.
// We could do it lazily somehow if we wanted to be really fancy.
while timestamps.back().map_or(false, |ts| ts < &cutoff) { while timestamps.back().map_or(false, |ts| ts < &cutoff) {
timestamps.pop_back(); timestamps.pop_back();
} }
@ -95,10 +99,24 @@ impl TokenBucket for VectorTokenBucket {
// `if timestamps.len() < self.total_limit { return None }` // `if timestamps.len() < self.total_limit { return None }`
// Timestamp that needs to be popped before // Timestamp that needs to be popped before
// we can enter another timestamp. // we can enter another timestamp.
let ts = *timestamps.get(self.total_limit - 1)?;
Instant::now().checked_duration_since(ts) // Full rate limit.
if let Some(ts) = timestamps.get(self.total_limit - 1) {
// Return amount of time needed for timestamp `ts` to go away.
Instant::now().checked_duration_since(*ts)
.and_then(|passed_dur| self.duration.checked_sub(passed_dur)) .and_then(|passed_dur| self.duration.checked_sub(passed_dur))
} }
// Otherwise burst rate limit.
else if let Some(ts) = timestamps.get(self.burst_limit - 1) {
// Return amount of time needed for timestamp `ts` to go away.
Instant::now().checked_duration_since(*ts)
.and_then(|passed_dur| self.burst_duration.checked_sub(passed_dur))
}
// No delay needed.
else {
None
}
}
fn get_tokens(&self, n: usize) -> bool { fn get_tokens(&self, n: usize) -> bool {
let mut timestamps = self.update_get_timestamps(); let mut timestamps = self.update_get_timestamps();

View File

@ -1,9 +1,8 @@
#![cfg(test)] #![cfg(test)]
use std::time::Duration;
use fake_clock::FakeClock as Instant; use fake_clock::FakeClock as Instant;
/// This is a hack to test token bucket, substituting FakeClock for Instant.
mod token_bucket { mod token_bucket {
include!("token_bucket.rs"); include!("token_bucket.rs");
@ -11,8 +10,95 @@ mod token_bucket {
use super::*; use super::*;
#[test] #[test]
fn it_works() { fn test_basic() {
let _x = VectorTokenBucket::new(Duration::from_secs(1), 100); Instant::set_time(50_000);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.95);
assert!(bucket.get_tokens(50), "Should have not violated limit.");
assert_eq!(None, bucket.get_delay(), "Can get stuff.");
assert!(!bucket.get_tokens(51), "Should have violated limit.");
}
#[test]
fn test_internal_constructor() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 1.0);
assert_eq!(100, bucket.burst_limit);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 1e-6);
assert_eq!(1, bucket.burst_limit);
}
#[test]
fn test_saturated_100_burst() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 1.00);
Instant::set_time(50_000);
assert!(bucket.get_tokens(100), "All tokens should be immediately available.");
assert!(None != bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(1001); // Extra buffer for Duration(0).
assert!(bucket.get_tokens(100), "All tokens should be available after a bucket duration.");
assert!(None != bucket.get_delay(), "Bucket should have delay.");
}
#[test]
fn test_saturated_95_burst() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.50);
Instant::set_time(50_000);
assert!(bucket.get_tokens(95), "95 tokens should be immediately available.");
assert!(None != bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(475); // Total 951.
assert!(None != bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(476); // Extra buffer for Duration(0).
assert!(bucket.get_tokens(5), "Last 5 tokens should be available.");
assert!(None != bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(51);
assert!(bucket.get_tokens(95), "95 tokens should be available.");
assert!(None != bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(951);
assert!(bucket.get_tokens(5));
assert!(None != bucket.get_delay());
}
#[test]
fn test_saturated_50_burst() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.5);
Instant::set_time(50_000);
assert!(bucket.get_tokens(50), "Half the tokens should be immediately available.");
assert!(None != bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(501); // Extra buffer for Duration(0).
assert!(bucket.get_tokens(50), "Half the tokens should be available after a half bucket duration.");
assert!(None != bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(501);
assert!(bucket.get_tokens(50), "Half the tokens should be available after a full bucket duration.");
assert!(None != bucket.get_delay(), "Bucket should have delay.");
Instant::advance_time(501);
assert!(bucket.get_tokens(50));
assert!(None != bucket.get_delay());
}
#[test]
fn test_many() {
Instant::set_time(50_000);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.95);
assert!(bucket.get_tokens(50), "Should have not violated limit.");
assert_eq!(None, bucket.get_delay(), "Should not be blocked.");
for _ in 0..20_000 {
Instant::advance_time(501);
assert!(bucket.get_tokens(50), "Should have not violated limit.");
assert!(None != bucket.get_delay(), "Should be blocked.");
Instant::advance_time(501);
assert!(bucket.get_tokens(50), "Should have not violated limit.");
assert!(None != bucket.get_delay(), "Should be blocked.");
}
} }
} }
} }