From f51a67c1fc7af8f3406762e19bbc39a6b59023ec Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Sat, 21 Jan 2023 13:45:45 -0800 Subject: [PATCH] Add exponential backoff fix #38, cleanup some `regional_requester` code --- riven/src/req/rate_limit.rs | 125 +++++++++++++++------------- riven/src/req/regional_requester.rs | 22 ++++- 2 files changed, 84 insertions(+), 63 deletions(-) diff --git a/riven/src/req/rate_limit.rs b/riven/src/req/rate_limit.rs index 743feab..a7312d8 100644 --- a/riven/src/req/rate_limit.rs +++ b/riven/src/req/rate_limit.rs @@ -96,68 +96,83 @@ impl RateLimit { self.retry_after.read().and_then(|i| Instant::now().checked_duration_since(i)) } - pub fn on_response(&self, config: &RiotApiConfig, response: &Response) { - self.on_response_retry_after(response); + /// Update retry-after and rate limits based on an API response. + /// Returns the retry-after delay if set. + pub fn on_response(&self, config: &RiotApiConfig, response: &Response) -> Option { + let retry_after = self.on_response_retry_after(response); self.on_response_rate_limits(config, response); + retry_after } /// `on_response` helper for retry after check. + /// Returns the retry-after delay if set. #[inline] - fn on_response_retry_after(&self, response: &Response) { - if let Some(retry_after) = || -> Option { - // Only care about 429s. - if StatusCode::TOO_MANY_REQUESTS != response.status() { - return None; - } + fn on_response_retry_after(&self, response: &Response) -> Option { + // Only care about 429s. + if StatusCode::TOO_MANY_REQUESTS != response.status() { + return None; + } - { - // Get the X-Rate-Limit-Type header, `Some("application" | "method" | "service")` or `None`. - let header_opt = response.headers() - .get(Self::HEADER_XRATELIMITTYPE) - .or_else(|| { - log::info!("429 response missing {} header.", Self::HEADER_XRATELIMITTYPE); - None - }) - .and_then(|header_value| header_value.to_str() - .map_err(|e| log::info!("429 response, error parsing '{}' header as string: {}. Header value: {:#?}", - Self::HEADER_XRATELIMITTYPE, e, header_value)) - .ok()); + { + // Get the X-Rate-Limit-Type header, `Some("application" | "method" | "service")` or `None`. + let header_opt = response.headers() + .get(Self::HEADER_XRATELIMITTYPE) + .or_else(|| { + log::info!("429 response missing {} header.", Self::HEADER_XRATELIMITTYPE); + None + }) + .and_then(|header_value| header_value.to_str() + .map_err(|e| log::info!("429 response, error parsing '{}' header as string: {}. Header value: {:#?}", + Self::HEADER_XRATELIMITTYPE, e, header_value)) + .ok()); - // This match checks the valid possibilities. Otherwise returns none to ignore. - // `Application` handles "application", `Method` handles all other values. - let application_should_handle = match header_opt { - Some(Self::HEADER_XRATELIMITTYPE_APPLICATION) => true, - Some(Self::HEADER_XRATELIMITTYPE_METHOD | Self::HEADER_XRATELIMITTYPE_SERVICE) => false, - other => { - // Method handles unknown values. + // This match checks the valid possibilities. Otherwise returns none to ignore. + // `Application` handles "application", `Method` handles all other values. + let is_responsible = match header_opt { + Some(Self::HEADER_XRATELIMITTYPE_APPLICATION) => { + self.rate_limit_type == RateLimitType::Application + } + Some(Self::HEADER_XRATELIMITTYPE_METHOD | Self::HEADER_XRATELIMITTYPE_SERVICE) => { + self.rate_limit_type == RateLimitType::Method + } + other => { + // RateLimitType::Method handles unknown values. + if self.rate_limit_type == RateLimitType::Method { log::warn!( "429 response has None (missing or invalid) or unknown {} header value {:?}, {:?} rate limit obeying retry-after.", Self::HEADER_XRATELIMITTYPE, other, self.rate_limit_type); + true + } else { false - }, - }; - - if (self.rate_limit_type == RateLimitType::Application) != application_should_handle { - return None; + } } + }; + if !is_responsible { + return None; } - - // Get retry after header. Only care if it exists. - let retry_after_header = response.headers() - .get(reqwest::header::RETRY_AFTER)?.to_str() - .expect("Failed to read retry-after header as string."); - - log::info!("429 response, rate limit {:?}, retry-after {} secs.", self.rate_limit_type, retry_after_header); - - // Header currently only returns ints, but float is more general. Can be zero. - let retry_after_secs: f32 = retry_after_header.parse() - .expect("Failed to parse retry-after header as f32."); - // Add 0.5 seconds to account for rounding, cases when response is zero. - let delay = Duration::from_secs_f32(0.5 + retry_after_secs); - Some(Instant::now() + delay) - }() { - *self.retry_after.write() = Some(retry_after); } + + // Get retry after header. Only care if it exists. + let retry_after_header = response.headers() + .get(reqwest::header::RETRY_AFTER) + .and_then(|h| h + .to_str() + .map_err(|e| log::error!("Failed to read retry-after header as visible ASCII string: {:?}.", e)).ok())?; + + log::info!("429 response, rate limit {:?}, retry-after {} secs.", self.rate_limit_type, retry_after_header); + + // Header currently only returns ints, but float is more general. Can be zero. + let retry_after_secs = retry_after_header + .parse::() + .map_err(|e| log::error!("Failed to parse read-after header as f32: {:?}.", e)) + .ok()?; + + // Add 0.5 seconds to account for rounding, cases when response is zero. + let delay = Duration::from_secs_f32(0.5 + retry_after_secs); + + // Set `self.retry_after`. + *self.retry_after.write() = Some(Instant::now() + delay); + Some(delay) } #[inline] @@ -165,9 +180,9 @@ impl RateLimit { // Check if rate limits changed. let headers = response.headers(); let limit_header_opt = headers.get(self.rate_limit_type.limit_header()) - .map(|h| h.to_str().expect("Failed to read limit header as string.")); + .and_then(|h| h.to_str().map_err(|e| log::error!("Failed to read limit header as visible ASCII string: {:?}.", e)).ok()); let count_header_opt = headers.get(self.rate_limit_type.count_header()) - .map(|h| h.to_str().expect("Failed to read count header as string.")); + .and_then(|h| h.to_str().map_err(|e| log::error!("Failed to read count header as visible ASCII string: {:?}.", e)).ok()); if let (Some(limit_header), Some(count_header)) = (limit_header_opt, count_header_opt) { { @@ -231,13 +246,3 @@ fn buckets_from_header(config: &RiotApiConfig, limit_header: &str, count_header: log::debug!("Set buckets to {} limit, {} count.", limit_header, count_header); out } - -#[cfg(test)] -mod tests { - // use super::*; - - // fn send_sync() { - // fn is_send_sync() {} - // is_send_sync::(); - // } -} diff --git a/riven/src/req/regional_requester.rs b/riven/src/req/regional_requester.rs index 8bb2afe..58a6c89 100644 --- a/riven/src/req/regional_requester.rs +++ b/riven/src/req/regional_requester.rs @@ -69,8 +69,10 @@ impl RegionalRequester { .map_err(|e| RiotApiError::new(e, retries, None, None))?; // Maybe update rate limits (based on response headers). - self.app_rate_limit.on_response(config, &response); - method_rate_limit.on_response(config, &response); + // Use single bar for no short circuiting. + let retry_after_app = self.app_rate_limit.on_response(config, &response); + let retry_after_method = method_rate_limit.on_response(config, &response); + let retry_after = retry_after_app.or(retry_after_method); // Note: Edge case if both are Some(_) not handled. let status = response.status(); // Handle normal success / failure cases. @@ -96,8 +98,22 @@ impl RegionalRequester { log::debug!("Response {} (retried {} times), failure, returning error.", status, retries); break Err(RiotApiError::new(err, retries, Some(response), Some(status))); } - log::debug!("Response {} (retried {} times), retrying.", status, retries); + // Is retryable, do exponential backoff if retry-after wasn't specified. + // 1 sec, 2 sec, 4 sec, 8 sec. + match retry_after { + None => { + let delay = std::time::Duration::from_secs(2_u64.pow(retries as u32)); + log::debug!("Response {} (retried {} times), NO `retry-after`, using exponential backoff, retrying after {:?}.", status, retries, delay); + let backoff = tokio::time::sleep(delay); + #[cfg(feature = "tracing")] + let backoff = backoff.instrument(tracing::info_span!("backoff")); + backoff.await; + } + Some(delay) => { + log::debug!("Response {} (retried {} times), `retry-after` set, retrying after {:?}.", status, retries, delay); + } + } retries += 1; } }