forked from mirror/Riven
1
0
Fork 0

Add exponential backoff fix #38, cleanup some `regional_requester` code

users/mingwei/lints
Mingwei Samuel 2023-01-21 13:45:45 -08:00
parent 6512866c26
commit f51a67c1fc
2 changed files with 84 additions and 63 deletions

View File

@ -96,15 +96,18 @@ impl RateLimit {
self.retry_after.read().and_then(|i| Instant::now().checked_duration_since(i)) self.retry_after.read().and_then(|i| Instant::now().checked_duration_since(i))
} }
pub fn on_response(&self, config: &RiotApiConfig, response: &Response) { /// Update retry-after and rate limits based on an API response.
self.on_response_retry_after(response); /// Returns the retry-after delay if set.
pub fn on_response(&self, config: &RiotApiConfig, response: &Response) -> Option<Duration> {
let retry_after = self.on_response_retry_after(response);
self.on_response_rate_limits(config, response); self.on_response_rate_limits(config, response);
retry_after
} }
/// `on_response` helper for retry after check. /// `on_response` helper for retry after check.
/// Returns the retry-after delay if set.
#[inline] #[inline]
fn on_response_retry_after(&self, response: &Response) { fn on_response_retry_after(&self, response: &Response) -> Option<Duration> {
if let Some(retry_after) = || -> Option<Instant> {
// Only care about 429s. // Only care about 429s.
if StatusCode::TOO_MANY_REQUESTS != response.status() { if StatusCode::TOO_MANY_REQUESTS != response.status() {
return None; return None;
@ -125,39 +128,51 @@ impl RateLimit {
// This match checks the valid possibilities. Otherwise returns none to ignore. // This match checks the valid possibilities. Otherwise returns none to ignore.
// `Application` handles "application", `Method` handles all other values. // `Application` handles "application", `Method` handles all other values.
let application_should_handle = match header_opt { let is_responsible = match header_opt {
Some(Self::HEADER_XRATELIMITTYPE_APPLICATION) => true, Some(Self::HEADER_XRATELIMITTYPE_APPLICATION) => {
Some(Self::HEADER_XRATELIMITTYPE_METHOD | Self::HEADER_XRATELIMITTYPE_SERVICE) => false, self.rate_limit_type == RateLimitType::Application
}
Some(Self::HEADER_XRATELIMITTYPE_METHOD | Self::HEADER_XRATELIMITTYPE_SERVICE) => {
self.rate_limit_type == RateLimitType::Method
}
other => { other => {
// Method handles unknown values. // RateLimitType::Method handles unknown values.
if self.rate_limit_type == RateLimitType::Method {
log::warn!( log::warn!(
"429 response has None (missing or invalid) or unknown {} header value {:?}, {:?} rate limit obeying retry-after.", "429 response has None (missing or invalid) or unknown {} header value {:?}, {:?} rate limit obeying retry-after.",
Self::HEADER_XRATELIMITTYPE, other, self.rate_limit_type); Self::HEADER_XRATELIMITTYPE, other, self.rate_limit_type);
true
} else {
false false
}, }
}
}; };
if !is_responsible {
if (self.rate_limit_type == RateLimitType::Application) != application_should_handle {
return None; return None;
} }
} }
// Get retry after header. Only care if it exists. // Get retry after header. Only care if it exists.
let retry_after_header = response.headers() let retry_after_header = response.headers()
.get(reqwest::header::RETRY_AFTER)?.to_str() .get(reqwest::header::RETRY_AFTER)
.expect("Failed to read retry-after header as string."); .and_then(|h| h
.to_str()
.map_err(|e| log::error!("Failed to read retry-after header as visible ASCII string: {:?}.", e)).ok())?;
log::info!("429 response, rate limit {:?}, retry-after {} secs.", self.rate_limit_type, retry_after_header); log::info!("429 response, rate limit {:?}, retry-after {} secs.", self.rate_limit_type, retry_after_header);
// Header currently only returns ints, but float is more general. Can be zero. // Header currently only returns ints, but float is more general. Can be zero.
let retry_after_secs: f32 = retry_after_header.parse() let retry_after_secs = retry_after_header
.expect("Failed to parse retry-after header as f32."); .parse::<f32>()
.map_err(|e| log::error!("Failed to parse read-after header as f32: {:?}.", e))
.ok()?;
// Add 0.5 seconds to account for rounding, cases when response is zero. // Add 0.5 seconds to account for rounding, cases when response is zero.
let delay = Duration::from_secs_f32(0.5 + retry_after_secs); let delay = Duration::from_secs_f32(0.5 + retry_after_secs);
Some(Instant::now() + delay)
}() { // Set `self.retry_after`.
*self.retry_after.write() = Some(retry_after); *self.retry_after.write() = Some(Instant::now() + delay);
} Some(delay)
} }
#[inline] #[inline]
@ -165,9 +180,9 @@ impl RateLimit {
// Check if rate limits changed. // Check if rate limits changed.
let headers = response.headers(); let headers = response.headers();
let limit_header_opt = headers.get(self.rate_limit_type.limit_header()) let limit_header_opt = headers.get(self.rate_limit_type.limit_header())
.map(|h| h.to_str().expect("Failed to read limit header as string.")); .and_then(|h| h.to_str().map_err(|e| log::error!("Failed to read limit header as visible ASCII string: {:?}.", e)).ok());
let count_header_opt = headers.get(self.rate_limit_type.count_header()) let count_header_opt = headers.get(self.rate_limit_type.count_header())
.map(|h| h.to_str().expect("Failed to read count header as string.")); .and_then(|h| h.to_str().map_err(|e| log::error!("Failed to read count header as visible ASCII string: {:?}.", e)).ok());
if let (Some(limit_header), Some(count_header)) = (limit_header_opt, count_header_opt) { if let (Some(limit_header), Some(count_header)) = (limit_header_opt, count_header_opt) {
{ {
@ -231,13 +246,3 @@ fn buckets_from_header(config: &RiotApiConfig, limit_header: &str, count_header:
log::debug!("Set buckets to {} limit, {} count.", limit_header, count_header); log::debug!("Set buckets to {} limit, {} count.", limit_header, count_header);
out out
} }
#[cfg(test)]
mod tests {
// use super::*;
// fn send_sync() {
// fn is_send_sync<T: Send + Sync>() {}
// is_send_sync::<RateLimit>();
// }
}

View File

@ -69,8 +69,10 @@ impl RegionalRequester {
.map_err(|e| RiotApiError::new(e, retries, None, None))?; .map_err(|e| RiotApiError::new(e, retries, None, None))?;
// Maybe update rate limits (based on response headers). // Maybe update rate limits (based on response headers).
self.app_rate_limit.on_response(config, &response); // Use single bar for no short circuiting.
method_rate_limit.on_response(config, &response); let retry_after_app = self.app_rate_limit.on_response(config, &response);
let retry_after_method = method_rate_limit.on_response(config, &response);
let retry_after = retry_after_app.or(retry_after_method); // Note: Edge case if both are Some(_) not handled.
let status = response.status(); let status = response.status();
// Handle normal success / failure cases. // Handle normal success / failure cases.
@ -96,8 +98,22 @@ impl RegionalRequester {
log::debug!("Response {} (retried {} times), failure, returning error.", status, retries); log::debug!("Response {} (retried {} times), failure, returning error.", status, retries);
break Err(RiotApiError::new(err, retries, Some(response), Some(status))); break Err(RiotApiError::new(err, retries, Some(response), Some(status)));
} }
log::debug!("Response {} (retried {} times), retrying.", status, retries);
// Is retryable, do exponential backoff if retry-after wasn't specified.
// 1 sec, 2 sec, 4 sec, 8 sec.
match retry_after {
None => {
let delay = std::time::Duration::from_secs(2_u64.pow(retries as u32));
log::debug!("Response {} (retried {} times), NO `retry-after`, using exponential backoff, retrying after {:?}.", status, retries, delay);
let backoff = tokio::time::sleep(delay);
#[cfg(feature = "tracing")]
let backoff = backoff.instrument(tracing::info_span!("backoff"));
backoff.await;
}
Some(delay) => {
log::debug!("Response {} (retried {} times), `retry-after` set, retrying after {:?}.", status, retries, delay);
}
}
retries += 1; retries += 1;
} }
} }