diff --git a/src/req/token_bucket.rs b/src/req/token_bucket.rs index 69953b1..ce08d49 100644 --- a/src/req/token_bucket.rs +++ b/src/req/token_bucket.rs @@ -12,7 +12,7 @@ use super::Instant; // Hack for token_bucket_test.rs. /// `"X-App-Rate-Limit": "20:1,100:120"`. Each `TokenBucket` corresponds to a /// single `"100:120"` (100 requests per 120 seconds). pub trait TokenBucket { - /// Get the duration til the next available token, or 0 duration if a token + /// Get the duration til the next available token, or None if a token /// is available. /// # Returns /// Duration or 0 duration. @@ -44,7 +44,6 @@ pub struct VectorTokenBucket { // Total tokens available from this TokenBucket. total_limit: usize, - /// TODO USE THESE !!!!!!! /// Duration considered for burst factor. burst_duration: Duration, /// Limit allowed per burst_duration, for burst factor. @@ -62,23 +61,28 @@ impl VectorTokenBucket { // API always uses round numbers, burst_pct is frac of 256. let burst_duration = Duration::new( - (duration.as_secs() as f32 * burst_pct) as u64, - (duration.subsec_nanos() as f32 * burst_pct) as u32); + (duration.as_secs() as f32 * burst_pct).ceil() as u64, + (duration.subsec_nanos() as f32 * burst_pct).ceil() as u32); + + let burst_limit = (total_limit as f32 * burst_pct).ceil() as usize; + debug_assert!(burst_limit > 0); VectorTokenBucket { duration: duration, total_limit: total_limit, burst_duration: burst_duration, - burst_limit: (total_limit as f32 * burst_pct) as usize, + burst_limit: burst_limit, - timestamps: Mutex::new(VecDeque::new()), + timestamps: Mutex::new(VecDeque::with_capacity(total_limit)), } } fn update_get_timestamps(&self) -> MutexGuard> { let mut timestamps = self.timestamps.lock(); let cutoff = Instant::now() - self.duration; + // We only need to trim the end of the queue to not leak memory. + // We could do it lazily somehow if we wanted to be really fancy. while timestamps.back().map_or(false, |ts| ts < &cutoff) { timestamps.pop_back(); } @@ -95,9 +99,23 @@ impl TokenBucket for VectorTokenBucket { // `if timestamps.len() < self.total_limit { return None }` // Timestamp that needs to be popped before // we can enter another timestamp. - let ts = *timestamps.get(self.total_limit - 1)?; - Instant::now().checked_duration_since(ts) - .and_then(|passed_dur| self.duration.checked_sub(passed_dur)) + + // Full rate limit. + if let Some(ts) = timestamps.get(self.total_limit - 1) { + // Return amount of time needed for timestamp `ts` to go away. + Instant::now().checked_duration_since(*ts) + .and_then(|passed_dur| self.duration.checked_sub(passed_dur)) + } + // Otherwise burst rate limit. + else if let Some(ts) = timestamps.get(self.burst_limit - 1) { + // Return amount of time needed for timestamp `ts` to go away. + Instant::now().checked_duration_since(*ts) + .and_then(|passed_dur| self.burst_duration.checked_sub(passed_dur)) + } + // No delay needed. + else { + None + } } fn get_tokens(&self, n: usize) -> bool { diff --git a/src/req/token_bucket.test.rs b/src/req/token_bucket.test.rs index 463d25c..fa0009b 100644 --- a/src/req/token_bucket.test.rs +++ b/src/req/token_bucket.test.rs @@ -1,9 +1,8 @@ #![cfg(test)] -use std::time::Duration; - use fake_clock::FakeClock as Instant; +/// This is a hack to test token bucket, substituting FakeClock for Instant. mod token_bucket { include!("token_bucket.rs"); @@ -11,8 +10,95 @@ mod token_bucket { use super::*; #[test] - fn it_works() { - let _x = VectorTokenBucket::new(Duration::from_secs(1), 100); + fn test_basic() { + Instant::set_time(50_000); + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.95); + assert!(bucket.get_tokens(50), "Should have not violated limit."); + assert_eq!(None, bucket.get_delay(), "Can get stuff."); + assert!(!bucket.get_tokens(51), "Should have violated limit."); + } + + #[test] + fn test_internal_constructor() { + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 1.0); + assert_eq!(100, bucket.burst_limit); + + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 1e-6); + assert_eq!(1, bucket.burst_limit); + } + + #[test] + fn test_saturated_100_burst() { + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 1.00); + + Instant::set_time(50_000); + assert!(bucket.get_tokens(100), "All tokens should be immediately available."); + assert!(None != bucket.get_delay(), "Bucket should have delay."); + + Instant::advance_time(1001); // Extra buffer for Duration(0). + assert!(bucket.get_tokens(100), "All tokens should be available after a bucket duration."); + assert!(None != bucket.get_delay(), "Bucket should have delay."); + } + + #[test] + fn test_saturated_95_burst() { + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.50); + + Instant::set_time(50_000); + assert!(bucket.get_tokens(95), "95 tokens should be immediately available."); + assert!(None != bucket.get_delay(), "Bucket should have delay."); + + Instant::advance_time(475); // Total 951. + assert!(None != bucket.get_delay(), "Bucket should have delay."); + + Instant::advance_time(476); // Extra buffer for Duration(0). + assert!(bucket.get_tokens(5), "Last 5 tokens should be available."); + assert!(None != bucket.get_delay(), "Bucket should have delay."); + + Instant::advance_time(51); + assert!(bucket.get_tokens(95), "95 tokens should be available."); + assert!(None != bucket.get_delay(), "Bucket should have delay."); + + Instant::advance_time(951); + assert!(bucket.get_tokens(5)); + assert!(None != bucket.get_delay()); + } + + #[test] + fn test_saturated_50_burst() { + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.5); + + Instant::set_time(50_000); + assert!(bucket.get_tokens(50), "Half the tokens should be immediately available."); + assert!(None != bucket.get_delay(), "Bucket should have delay."); + + Instant::advance_time(501); // Extra buffer for Duration(0). + assert!(bucket.get_tokens(50), "Half the tokens should be available after a half bucket duration."); + assert!(None != bucket.get_delay(), "Bucket should have delay."); + + Instant::advance_time(501); + assert!(bucket.get_tokens(50), "Half the tokens should be available after a full bucket duration."); + assert!(None != bucket.get_delay(), "Bucket should have delay."); + + Instant::advance_time(501); + assert!(bucket.get_tokens(50)); + assert!(None != bucket.get_delay()); + } + + #[test] + fn test_many() { + Instant::set_time(50_000); + let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, 0.95); + assert!(bucket.get_tokens(50), "Should have not violated limit."); + assert_eq!(None, bucket.get_delay(), "Should not be blocked."); + for _ in 0..20_000 { + Instant::advance_time(501); + assert!(bucket.get_tokens(50), "Should have not violated limit."); + assert!(None != bucket.get_delay(), "Should be blocked."); + Instant::advance_time(501); + assert!(bucket.get_tokens(50), "Should have not violated limit."); + assert!(None != bucket.get_delay(), "Should be blocked."); + } } } }